[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16560122#comment-16560122 ] Aleksey Yeschenko commented on CASSANDRA-14556: --- Committed to trunk as [47a12c52a313258307ab88392f75c5866d9a2bb1|https://github.com/apache/cassandra/commit/47a12c52a313258307ab88392f75c5866d9a2bb1] and to dtests as [d291b2b90326c62c2df8f49098c6deb915c16460|https://github.com/apache/cassandra-dtest/commit/d291b2b90326c62c2df8f49098c6deb915c16460]. Thanks all. > Optimize streaming path in Cassandra > > > Key: CASSANDRA-14556 > URL: https://issues.apache.org/jira/browse/CASSANDRA-14556 > Project: Cassandra > Issue Type: Improvement > Components: Streaming and Messaging >Reporter: Dinesh Joshi >Assignee: Dinesh Joshi >Priority: Major > Labels: Performance > Fix For: 4.x > > > During streaming, Cassandra reifies the sstables into objects. This creates > unnecessary garbage and slows down the whole streaming process as some > sstables can be transferred as a whole file rather than individual > partitions. The objective of the ticket is to detect when a whole sstable can > be transferred and skip the object reification. We can also use a zero-copy > path to avoid bringing data into user-space on both sending and receiving > side. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16559632#comment-16559632 ] ASF GitHub Bot commented on CASSANDRA-14556: Github user iamaleksey commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r205746338 --- Diff: src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java --- @@ -18,51 +18,102 @@ package org.apache.cassandra.db.streaming; +import java.io.File; import java.io.IOException; +import java.util.Collection; +import java.util.LinkedHashMap; import java.util.List; import java.util.Objects; import java.util.UUID; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.compaction.AbstractCompactionStrategy; +import org.apache.cassandra.db.compaction.LeveledCompactionStrategy; +import org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.KeyIterator; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.util.DataOutputStreamPlus; +import org.apache.cassandra.net.async.ByteBufDataOutputStreamPlus; import org.apache.cassandra.schema.TableId; import org.apache.cassandra.streaming.OutgoingStream; import org.apache.cassandra.streaming.StreamOperation; import org.apache.cassandra.streaming.StreamSession; import org.apache.cassandra.utils.concurrent.Ref; +import static org.apache.cassandra.db.compaction.Verifier.RangeOwnHelper; + /** * used to transfer the part(or whole) of a SSTable data file */ public class CassandraOutgoingFile implements OutgoingStream { +private static final boolean isZeroCopySSTableTransfersEnabled = DatabaseDescriptor.isZeroCopySSTableTransfersEnabled(); --- End diff -- Can get rid of this field now that `DatabaseDescriptor.isZeroCopySSTableTransferEnabled()` just reads a field from `Config`. > Optimize streaming path in Cassandra > > > Key: CASSANDRA-14556 > URL: https://issues.apache.org/jira/browse/CASSANDRA-14556 > Project: Cassandra > Issue Type: Improvement > Components: Streaming and Messaging >Reporter: Dinesh Joshi >Assignee: Dinesh Joshi >Priority: Major > Labels: Performance > Fix For: 4.x > > > During streaming, Cassandra reifies the sstables into objects. This creates > unnecessary garbage and slows down the whole streaming process as some > sstables can be transferred as a whole file rather than individual > partitions. The objective of the ticket is to detect when a whole sstable can > be transferred and skip the object reification. We can also use a zero-copy > path to avoid bringing data into user-space on both sending and receiving > side. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16559633#comment-16559633 ] ASF GitHub Bot commented on CASSANDRA-14556: Github user iamaleksey commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r205747034 --- Diff: src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java --- @@ -114,11 +165,77 @@ public void write(StreamSession session, DataOutputStreamPlus out, int version) CassandraStreamHeader.serializer.serialize(header, out, version); out.flush(); -CassandraStreamWriter writer = header.compressionInfo == null ? - new CassandraStreamWriter(sstable, header.sections, session) : - new CompressedCassandraStreamWriter(sstable, header.sections, - header.compressionInfo, session); -writer.write(out); +if (shouldStreamEntireSSTable() && out instanceof ByteBufDataOutputStreamPlus) +{ +CassandraBlockStreamWriter writer = new CassandraBlockStreamWriter(sstable, session, manifest); +writer.write((ByteBufDataOutputStreamPlus) out); +} +else +{ +CassandraStreamWriter writer = (header.compressionInfo == null) ? + new CassandraStreamWriter(sstable, header.sections, session) : + new CompressedCassandraStreamWriter(sstable, header.sections, + header.compressionInfo, session); +writer.write(out); +} +} + +@VisibleForTesting +public boolean shouldStreamEntireSSTable() +{ +// don't stream if full sstable transfers are disabled or legacy counter shards are present +if (!isZeroCopySSTableTransfersEnabled || ref.get().getSSTableMetadata().hasLegacyCounterShards) +return false; + +ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(getTableId()); + +if (cfs == null) +return false; + +AbstractCompactionStrategy compactionStrategy = cfs.getCompactionStrategyManager() + .getCompactionStrategyFor(ref.get()); + +if (compactionStrategy instanceof LeveledCompactionStrategy) +return contained(ranges, ref.get()); + +if (compactionStrategy instanceof SizeTieredCompactionStrategy) +{ +return (ranges != null +&& ranges.size() == 1 +&& ranges.get(0) + .contains(new Range<>(ref.get().first.getToken(), --- End diff -- Should be Bounds for the sstable, since range is (first, last] and sstable contains [first, last] (thanks @krummas). But on a higher level note, maybe just stick to LCS only for now, and enable STCS after we commit CASSANDRA-14568? We are going to do that in 4.0, so this temporary special case won't make it to a release anyway ¯\_(ツ)_/¯ > Optimize streaming path in Cassandra > > > Key: CASSANDRA-14556 > URL: https://issues.apache.org/jira/browse/CASSANDRA-14556 > Project: Cassandra > Issue Type: Improvement > Components: Streaming and Messaging >Reporter: Dinesh Joshi >Assignee: Dinesh Joshi >Priority: Major > Labels: Performance > Fix For: 4.x > > > During streaming, Cassandra reifies the sstables into objects. This creates > unnecessary garbage and slows down the whole streaming process as some > sstables can be transferred as a whole file rather than individual > partitions. The objective of the ticket is to detect when a whole sstable can > be transferred and skip the object reification. We can also use a zero-copy > path to avoid bringing data into user-space on both sending and receiving > side. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16559631#comment-16559631 ] ASF GitHub Bot commented on CASSANDRA-14556: Github user iamaleksey commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r205745293 --- Diff: src/java/org/apache/cassandra/db/streaming/CassandraBlockStreamWriter.java --- @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.streaming; + +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.channels.FileChannel; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.util.DataOutputStreamPlus; --- End diff -- Nit: import now unused. > Optimize streaming path in Cassandra > > > Key: CASSANDRA-14556 > URL: https://issues.apache.org/jira/browse/CASSANDRA-14556 > Project: Cassandra > Issue Type: Improvement > Components: Streaming and Messaging >Reporter: Dinesh Joshi >Assignee: Dinesh Joshi >Priority: Major > Labels: Performance > Fix For: 4.x > > > During streaming, Cassandra reifies the sstables into objects. This creates > unnecessary garbage and slows down the whole streaming process as some > sstables can be transferred as a whole file rather than individual > partitions. The objective of the ticket is to detect when a whole sstable can > be transferred and skip the object reification. We can also use a zero-copy > path to avoid bringing data into user-space on both sending and receiving > side. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16559530#comment-16559530 ] ASF GitHub Bot commented on CASSANDRA-14556: Github user iamaleksey commented on the issue: https://github.com/apache/cassandra/pull/239 @dineshjoshi nice round of changes! I think we are there, or almost-almost there, at worst. Let me do one last pass before we go ahead and commit. > Optimize streaming path in Cassandra > > > Key: CASSANDRA-14556 > URL: https://issues.apache.org/jira/browse/CASSANDRA-14556 > Project: Cassandra > Issue Type: Improvement > Components: Streaming and Messaging >Reporter: Dinesh Joshi >Assignee: Dinesh Joshi >Priority: Major > Labels: Performance > Fix For: 4.x > > > During streaming, Cassandra reifies the sstables into objects. This creates > unnecessary garbage and slows down the whole streaming process as some > sstables can be transferred as a whole file rather than individual > partitions. The objective of the ticket is to detect when a whole sstable can > be transferred and skip the object reification. We can also use a zero-copy > path to avoid bringing data into user-space on both sending and receiving > side. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16559118#comment-16559118 ] ASF GitHub Bot commented on CASSANDRA-14556: Github user dineshjoshi commented on the issue: https://github.com/apache/cassandra/pull/239 @iamaleksey made a few more changes - 1. Got rid of `IStreamWriter` 2. Ensured we're logging the configuration warning only once at start up iff zero copy streaming is enabled 3. Few stylistic changes > Optimize streaming path in Cassandra > > > Key: CASSANDRA-14556 > URL: https://issues.apache.org/jira/browse/CASSANDRA-14556 > Project: Cassandra > Issue Type: Improvement > Components: Streaming and Messaging >Reporter: Dinesh Joshi >Assignee: Dinesh Joshi >Priority: Major > Labels: Performance > Fix For: 4.x > > > During streaming, Cassandra reifies the sstables into objects. This creates > unnecessary garbage and slows down the whole streaming process as some > sstables can be transferred as a whole file rather than individual > partitions. The objective of the ticket is to detect when a whole sstable can > be transferred and skip the object reification. We can also use a zero-copy > path to avoid bringing data into user-space on both sending and receiving > side. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16559116#comment-16559116 ] ASF GitHub Bot commented on CASSANDRA-14556: Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r205646170 --- Diff: src/java/org/apache/cassandra/db/streaming/ComponentManifest.java --- @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.streaming; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +import com.google.common.collect.Iterators; + +import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; + +public final class ComponentManifest implements Iterable +{ +private final LinkedHashMap components; + +public ComponentManifest(Map components) +{ +this.components = new LinkedHashMap<>(components); +} + +public long sizeOf(Component component) +{ +Long size = components.get(component); +if (size == null) +throw new IllegalArgumentException("Component " + component + " is not present in the manifest"); +return size; +} + +public long totalSize() +{ +long totalSize = 0; +for (Long size : components.values()) +totalSize += size; +return totalSize; +} + +public List components() +{ +return new ArrayList<>(components.keySet()); +} + +@Override +public boolean equals(Object o) +{ +if (this == o) +return true; + +if (!(o instanceof ComponentManifest)) +return false; + +ComponentManifest that = (ComponentManifest) o; +return components.equals(that.components); +} + +@Override +public int hashCode() +{ +return components.hashCode(); +} + +public static final IVersionedSerializer serializer = new IVersionedSerializer() +{ +public void serialize(ComponentManifest manifest, DataOutputPlus out, int version) throws IOException +{ +out.writeUnsignedVInt(manifest.components.size()); +for (Map.Entry entry : manifest.components.entrySet()) +{ +out.writeByte(entry.getKey().type.id); --- End diff -- Done. I'm just using `component.name`. I think this should be sufficient for this PR's scope. > Optimize streaming path in Cassandra > > > Key: CASSANDRA-14556 > URL: https://issues.apache.org/jira/browse/CASSANDRA-14556 > Project: Cassandra > Issue Type: Improvement > Components: Streaming and Messaging >Reporter: Dinesh Joshi >Assignee: Dinesh Joshi >Priority: Major > Labels: Performance > Fix For: 4.x > > > During streaming, Cassandra reifies the sstables into objects. This creates > unnecessary garbage and slows down the whole streaming process as some > sstables can be transferred as a whole file rather than individual > partitions. The objective of the ticket is to detect when a whole sstable can > be transferred and skip the object reification. We can also use a zero-copy > path to avoid bringing data into user-space on both sending and receiving > side. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16559084#comment-16559084 ] ASF GitHub Bot commented on CASSANDRA-14556: Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r205639791 --- Diff: src/java/org/apache/cassandra/db/streaming/CassandraBlockStreamReader.java --- @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.streaming; + +import java.io.File; +import java.io.IOException; +import java.util.Collection; + +import com.google.common.base.Throwables; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Directories; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.SSTableMultiWriter; +import org.apache.cassandra.io.sstable.format.big.BigTableBlockWriter; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.streaming.ProgressInfo; +import org.apache.cassandra.streaming.StreamReceiver; +import org.apache.cassandra.streaming.StreamSession; +import org.apache.cassandra.streaming.messages.StreamMessageHeader; + +import static java.lang.String.format; +import static org.apache.cassandra.utils.FBUtilities.prettyPrintMemory; + +/** + * CassandraBlockStreamReader reads SSTable off the wire and writes it to disk. + */ +public class CassandraBlockStreamReader implements IStreamReader +{ +private static final Logger logger = LoggerFactory.getLogger(CassandraBlockStreamReader.class); + +private final TableId tableId; +private final StreamSession session; +private final CassandraStreamHeader header; +private final int fileSequenceNumber; + +public CassandraBlockStreamReader(StreamMessageHeader messageHeader, CassandraStreamHeader streamHeader, StreamSession session) +{ +if (session.getPendingRepair() != null) +{ +// we should only ever be streaming pending repair sstables if the session has a pending repair id +if (!session.getPendingRepair().equals(messageHeader.pendingRepair)) +throw new IllegalStateException(format("Stream Session & SSTable (%s) pendingRepair UUID mismatch.", messageHeader.tableId)); +} + +this.header = streamHeader; +this.session = session; +this.tableId = messageHeader.tableId; +this.fileSequenceNumber = messageHeader.sequenceNumber; +} + +/** + * @param inputPlus where this reads data from + * @return SSTable transferred + * @throws IOException if reading the remote sstable fails. Will throw an RTE if local write fails. + */ +@SuppressWarnings("resource") // input needs to remain open, streams on top of it can't be closed +@Override +public SSTableMultiWriter read(DataInputPlus inputPlus) throws IOException +{ +ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(tableId); +if (cfs == null) +{ +// schema was dropped during streaming +throw new IOException("Table " + tableId + " was dropped during streaming"); +} + +ComponentManifest manifest = header.componentManifest; +long totalSize = manifest.totalSize(); + +logger.debug("[Stream #{}] Started receiving sstable #{} from {}, size = {}, table = {}", + session.planId(), + fileSequenceNumber, + session.peer, +
[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16558881#comment-16558881 ] ASF GitHub Bot commented on CASSANDRA-14556: Github user iamaleksey commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r205599465 --- Diff: src/java/org/apache/cassandra/db/streaming/ComponentManifest.java --- @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.streaming; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +import com.google.common.collect.Iterators; + +import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; + +public final class ComponentManifest implements Iterable +{ +private final LinkedHashMap components; + +public ComponentManifest(Map components) +{ +this.components = new LinkedHashMap<>(components); +} + +public long sizeOf(Component component) +{ +Long size = components.get(component); +if (size == null) +throw new IllegalArgumentException("Component " + component + " is not present in the manifest"); +return size; +} + +public long totalSize() +{ +long totalSize = 0; +for (Long size : components.values()) +totalSize += size; +return totalSize; +} + +public List components() +{ +return new ArrayList<>(components.keySet()); +} + +@Override +public boolean equals(Object o) +{ +if (this == o) +return true; + +if (!(o instanceof ComponentManifest)) +return false; + +ComponentManifest that = (ComponentManifest) o; +return components.equals(that.components); +} + +@Override +public int hashCode() +{ +return components.hashCode(); +} + +public static final IVersionedSerializer serializer = new IVersionedSerializer() +{ +public void serialize(ComponentManifest manifest, DataOutputPlus out, int version) throws IOException +{ +out.writeUnsignedVInt(manifest.components.size()); +for (Map.Entry entry : manifest.components.entrySet()) +{ +out.writeByte(entry.getKey().type.id); --- End diff -- FWIW, I realize that for most components this will be a bit redundant. Technically it's sufficient to just store `component.name`, and get the full `Component` via `Component.parse()`. If you don't like redundancy and want to do it that way, that's perfectly fine too - I'm cool with either option. > Optimize streaming path in Cassandra > > > Key: CASSANDRA-14556 > URL: https://issues.apache.org/jira/browse/CASSANDRA-14556 > Project: Cassandra > Issue Type: Improvement > Components: Streaming and Messaging >Reporter: Dinesh Joshi >Assignee: Dinesh Joshi >Priority: Major > Labels: Performance > Fix For: 4.x > > > During streaming, Cassandra reifies the sstables into objects. This creates > unnecessary garbage and slows down the whole streaming process as some > sstables can be transferred as a whole file rather than individual > partitions. The objective of the ticket is to detect when a whole sstable can > be transferred and skip the object reification. We can also use a
[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16558844#comment-16558844 ] ASF GitHub Bot commented on CASSANDRA-14556: Github user iamaleksey commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r205587936 --- Diff: src/java/org/apache/cassandra/db/streaming/CassandraBlockStreamReader.java --- @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.streaming; + +import java.io.File; +import java.io.IOException; +import java.util.Collection; + +import com.google.common.base.Throwables; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Directories; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.SSTableMultiWriter; +import org.apache.cassandra.io.sstable.format.big.BigTableBlockWriter; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.streaming.ProgressInfo; +import org.apache.cassandra.streaming.StreamReceiver; +import org.apache.cassandra.streaming.StreamSession; +import org.apache.cassandra.streaming.messages.StreamMessageHeader; + +import static java.lang.String.format; +import static org.apache.cassandra.utils.FBUtilities.prettyPrintMemory; + +/** + * CassandraBlockStreamReader reads SSTable off the wire and writes it to disk. + */ +public class CassandraBlockStreamReader implements IStreamReader +{ +private static final Logger logger = LoggerFactory.getLogger(CassandraBlockStreamReader.class); + +private final TableId tableId; +private final StreamSession session; +private final CassandraStreamHeader header; +private final int fileSequenceNumber; + +public CassandraBlockStreamReader(StreamMessageHeader messageHeader, CassandraStreamHeader streamHeader, StreamSession session) +{ +if (session.getPendingRepair() != null) +{ +// we should only ever be streaming pending repair sstables if the session has a pending repair id +if (!session.getPendingRepair().equals(messageHeader.pendingRepair)) +throw new IllegalStateException(format("Stream Session & SSTable (%s) pendingRepair UUID mismatch.", messageHeader.tableId)); +} + +this.header = streamHeader; +this.session = session; +this.tableId = messageHeader.tableId; +this.fileSequenceNumber = messageHeader.sequenceNumber; +} + +/** + * @param inputPlus where this reads data from + * @return SSTable transferred + * @throws IOException if reading the remote sstable fails. Will throw an RTE if local write fails. + */ +@SuppressWarnings("resource") // input needs to remain open, streams on top of it can't be closed +@Override +public SSTableMultiWriter read(DataInputPlus inputPlus) throws IOException +{ +ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(tableId); +if (cfs == null) +{ +// schema was dropped during streaming +throw new IOException("Table " + tableId + " was dropped during streaming"); +} + +ComponentManifest manifest = header.componentManifest; +long totalSize = manifest.totalSize(); + +logger.debug("[Stream #{}] Started receiving sstable #{} from {}, size = {}, table = {}", + session.planId(), + fileSequenceNumber, + session.peer, +
[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16558840#comment-16558840 ] ASF GitHub Bot commented on CASSANDRA-14556: Github user iamaleksey commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r205586651 --- Diff: src/java/org/apache/cassandra/db/streaming/ComponentManifest.java --- @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.streaming; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +import com.google.common.collect.Iterators; + +import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; + +public final class ComponentManifest implements Iterable +{ +private final LinkedHashMap components; + +public ComponentManifest(Map components) +{ +this.components = new LinkedHashMap<>(components); +} + +public long sizeOf(Component component) +{ +Long size = components.get(component); +if (size == null) +throw new IllegalArgumentException("Component " + component + " is not present in the manifest"); +return size; +} + +public long totalSize() +{ +long totalSize = 0; +for (Long size : components.values()) +totalSize += size; +return totalSize; +} + +public List components() +{ +return new ArrayList<>(components.keySet()); +} + +@Override +public boolean equals(Object o) +{ +if (this == o) +return true; + +if (!(o instanceof ComponentManifest)) +return false; + +ComponentManifest that = (ComponentManifest) o; +return components.equals(that.components); +} + +@Override +public int hashCode() +{ +return components.hashCode(); +} + +public static final IVersionedSerializer serializer = new IVersionedSerializer() +{ +public void serialize(ComponentManifest manifest, DataOutputPlus out, int version) throws IOException +{ +out.writeUnsignedVInt(manifest.components.size()); +for (Map.Entry entry : manifest.components.entrySet()) +{ +out.writeByte(entry.getKey().type.id); --- End diff -- Talked to @dineshjoshi offline, and we realised that this is incomplete - and neither was my proposed version. For completeness, when want to serialize the whole component info, not just its type. And it has two important fields - type and name. Name will usually be derived from the type, but not always. And even though we don't support streaming those components (custom and SI), we might want to change it in the future, and the protocol should allow it. So I suggest we encode`component.type.name()`, the full enum name, followed by `component.name()`. It's a little heavier, but this is completely irrelevant in the big picture, size-wise. The upside is that we can handle encode/decode any component necessary in the future, loss-free. And, again, we don't really need to assign ids. `valueOf()` is plenty good, and allows extension without overlap risk like in `Verb`. > Optimize streaming path in Cassandra > > > Key: CASSANDRA-14556 > URL: https://issues.apache.org/jira/browse/CASSANDRA-14556 > Project: Cassandra > Issue
[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16558616#comment-16558616 ] ASF GitHub Bot commented on CASSANDRA-14556: Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r205532095 --- Diff: src/java/org/apache/cassandra/db/streaming/CassandraBlockStreamWriter.java --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.streaming; + +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.channels.FileChannel; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.util.DataOutputStreamPlus; +import org.apache.cassandra.net.async.ByteBufDataOutputStreamPlus; +import org.apache.cassandra.streaming.ProgressInfo; +import org.apache.cassandra.streaming.StreamManager; +import org.apache.cassandra.streaming.StreamSession; + +import static org.apache.cassandra.streaming.StreamManager.StreamRateLimiter; +import static org.apache.cassandra.utils.FBUtilities.prettyPrintMemory; + +/** + * CassandraBlockStreamWriter streams the entire SSTable to given channel. + */ +public class CassandraBlockStreamWriter implements IStreamWriter +{ +private static final Logger logger = LoggerFactory.getLogger(CassandraBlockStreamWriter.class); + +private final SSTableReader sstable; +private final ComponentManifest manifest; +private final StreamSession session; +private final StreamRateLimiter limiter; + +public CassandraBlockStreamWriter(SSTableReader sstable, StreamSession session, ComponentManifest manifest) +{ +this.session = session; +this.sstable = sstable; +this.manifest = manifest; +this.limiter = StreamManager.getRateLimiter(session.peer); +} + +/** + * Stream the entire file to given channel. + * + * + * @param output where this writes data to + * @throws IOException on any I/O error + */ +@Override +public void write(DataOutputStreamPlus output) throws IOException +{ +long totalSize = manifest.totalSize(); +logger.debug("[Stream #{}] Start streaming sstable {} to {}, repairedAt = {}, totalSize = {}", + session.planId(), + sstable.getFilename(), + session.peer, + sstable.getSSTableMetadata().repairedAt, + prettyPrintMemory(totalSize)); + +long progress = 0L; +ByteBufDataOutputStreamPlus byteBufDataOutputStreamPlus = (ByteBufDataOutputStreamPlus) output; + +for (Component component : manifest.components()) +{ +@SuppressWarnings("resource") // this is closed after the file is transferred by ByteBufDataOutputStreamPlus +FileChannel in = new RandomAccessFile(sstable.descriptor.filenameFor(component), "r").getChannel(); + +// Total Length to transmit for this file +long length = in.size(); + +// tracks write progress +logger.debug("[Stream #{}] Block streaming {}.{} gen {} component {} size {}", session.planId(), + sstable.getKeyspaceName(), + sstable.getColumnFamilyName(), + sstable.descriptor.generation, + component, length); --- End diff -- Fixed. > Optimize streaming path in Cassandra > > > Key: CASSANDRA-14556 > URL:
[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16558615#comment-16558615 ] ASF GitHub Bot commented on CASSANDRA-14556: Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r205532049 --- Diff: test/unit/org/apache/cassandra/db/streaming/CassandraStreamHeaderTest.java --- @@ -43,8 +51,38 @@ public void serializerTest() new ArrayList<>(), ((CompressionMetadata) null), 0, - SerializationHeader.makeWithoutStats(metadata).toComponent()); + SerializationHeader.makeWithoutStats(metadata).toComponent(), + metadata.id); SerializationUtils.assertSerializationCycle(header, CassandraStreamHeader.serializer); } + +@Test +public void serializerTest_FullSSTableTransfer() +{ +String ddl = "CREATE TABLE tbl (k INT PRIMARY KEY, v INT)"; +TableMetadata metadata = CreateTableStatement.parse(ddl, "ks").build(); + +ComponentManifest manifest = new ComponentManifest(new HashMap(ImmutableMap.of(Component.DATA, 100L))); --- End diff -- Fixed. Not sure why I did this in the first place. > Optimize streaming path in Cassandra > > > Key: CASSANDRA-14556 > URL: https://issues.apache.org/jira/browse/CASSANDRA-14556 > Project: Cassandra > Issue Type: Improvement > Components: Streaming and Messaging >Reporter: Dinesh Joshi >Assignee: Dinesh Joshi >Priority: Major > Labels: Performance > Fix For: 4.x > > > During streaming, Cassandra reifies the sstables into objects. This creates > unnecessary garbage and slows down the whole streaming process as some > sstables can be transferred as a whole file rather than individual > partitions. The objective of the ticket is to detect when a whole sstable can > be transferred and skip the object reification. We can also use a zero-copy > path to avoid bringing data into user-space on both sending and receiving > side. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16558613#comment-16558613 ] ASF GitHub Bot commented on CASSANDRA-14556: Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r205532001 --- Diff: src/java/org/apache/cassandra/db/streaming/CassandraStreamHeader.java --- @@ -183,9 +261,26 @@ public CassandraStreamHeader deserialize(DataInputPlus in, int version) throws I sections.add(new SSTableReader.PartitionPositionBounds(in.readLong(), in.readLong())); CompressionInfo compressionInfo = CompressionInfo.serializer.deserialize(in, version); int sstableLevel = in.readInt(); + SerializationHeader.Component header = SerializationHeader.serializer.deserialize(sstableVersion, in); -return new CassandraStreamHeader(sstableVersion, format, estimatedKeys, sections, compressionInfo, sstableLevel, header); +TableId tableId = TableId.deserialize(in); +boolean fullStream = in.readBoolean(); +ComponentManifest manifest = null; +DecoratedKey firstKey = null; + +if (fullStream) +{ +manifest = ComponentManifest.serializer.deserialize(in, version); +ByteBuffer keyBuf = ByteBufferUtil.readWithVIntLength(in); +IPartitioner partitioner = partitionerMapper.apply(tableId); +if (partitioner == null) +throw new IllegalArgumentException(String.format("Could not determine partitioner for tableId {}", tableId)); --- End diff -- Fixed. > Optimize streaming path in Cassandra > > > Key: CASSANDRA-14556 > URL: https://issues.apache.org/jira/browse/CASSANDRA-14556 > Project: Cassandra > Issue Type: Improvement > Components: Streaming and Messaging >Reporter: Dinesh Joshi >Assignee: Dinesh Joshi >Priority: Major > Labels: Performance > Fix For: 4.x > > > During streaming, Cassandra reifies the sstables into objects. This creates > unnecessary garbage and slows down the whole streaming process as some > sstables can be transferred as a whole file rather than individual > partitions. The objective of the ticket is to detect when a whole sstable can > be transferred and skip the object reification. We can also use a zero-copy > path to avoid bringing data into user-space on both sending and receiving > side. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16558557#comment-16558557 ] ASF GitHub Bot commented on CASSANDRA-14556: Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r205526764 --- Diff: src/java/org/apache/cassandra/db/streaming/CassandraStreamHeader.java --- @@ -65,18 +85,43 @@ private CassandraStreamHeader(Version version, SSTableFormat.Type format, long e this.compressionInfo = compressionInfo; this.sstableLevel = sstableLevel; this.header = header; - +this.fullStream = fullStream; +this.componentManifest = componentManifest; +this.firstKey = firstKey; +this.tableId = tableId; this.size = calculateSize(); } -public CassandraStreamHeader(Version version, SSTableFormat.Type format, long estimatedKeys, List sections, CompressionMetadata compressionMetadata, int sstableLevel, SerializationHeader.Component header) +private CassandraStreamHeader(Version version, SSTableFormat.Type format, long estimatedKeys, --- End diff -- I have cherry picked this change. Thanks! > Optimize streaming path in Cassandra > > > Key: CASSANDRA-14556 > URL: https://issues.apache.org/jira/browse/CASSANDRA-14556 > Project: Cassandra > Issue Type: Improvement > Components: Streaming and Messaging >Reporter: Dinesh Joshi >Assignee: Dinesh Joshi >Priority: Major > Labels: Performance > Fix For: 4.x > > > During streaming, Cassandra reifies the sstables into objects. This creates > unnecessary garbage and slows down the whole streaming process as some > sstables can be transferred as a whole file rather than individual > partitions. The objective of the ticket is to detect when a whole sstable can > be transferred and skip the object reification. We can also use a zero-copy > path to avoid bringing data into user-space on both sending and receiving > side. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16558442#comment-16558442 ] ASF GitHub Bot commented on CASSANDRA-14556: Github user iamaleksey commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r205504297 --- Diff: src/java/org/apache/cassandra/db/streaming/CassandraStreamHeader.java --- @@ -65,18 +85,43 @@ private CassandraStreamHeader(Version version, SSTableFormat.Type format, long e this.compressionInfo = compressionInfo; this.sstableLevel = sstableLevel; this.header = header; - +this.fullStream = fullStream; +this.componentManifest = componentManifest; +this.firstKey = firstKey; +this.tableId = tableId; this.size = calculateSize(); } -public CassandraStreamHeader(Version version, SSTableFormat.Type format, long estimatedKeys, List sections, CompressionMetadata compressionMetadata, int sstableLevel, SerializationHeader.Component header) +private CassandraStreamHeader(Version version, SSTableFormat.Type format, long estimatedKeys, --- End diff -- The introduction of the new fields and constructors got us to 5 constructors total with up to 10 arguments, which is no longer manageable, and calls for a builder. It's boring and tedious work, so I did it myself and pushed here - https://github.com/iamaleksey/cassandra/commit/321d21747faa46afcf34518ebdeb811f2a805de8 - please feel free to cherry-pick. In addition to introducing the builder, the commit renames `fullStream` to something a bit more meaningful (`isEntireSSTable`) that clearly reflects what's actually happening, fixes a bug in `serializedSize()` where compression info isn't initialized, and removes some fields without `toString()` implementations from header's own `toString()`. > Optimize streaming path in Cassandra > > > Key: CASSANDRA-14556 > URL: https://issues.apache.org/jira/browse/CASSANDRA-14556 > Project: Cassandra > Issue Type: Improvement > Components: Streaming and Messaging >Reporter: Dinesh Joshi >Assignee: Dinesh Joshi >Priority: Major > Labels: Performance > Fix For: 4.x > > > During streaming, Cassandra reifies the sstables into objects. This creates > unnecessary garbage and slows down the whole streaming process as some > sstables can be transferred as a whole file rather than individual > partitions. The objective of the ticket is to detect when a whole sstable can > be transferred and skip the object reification. We can also use a zero-copy > path to avoid bringing data into user-space on both sending and receiving > side. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16558428#comment-16558428 ] ASF GitHub Bot commented on CASSANDRA-14556: Github user iamaleksey commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r205445354 --- Diff: src/java/org/apache/cassandra/db/streaming/CassandraStreamHeader.java --- @@ -183,9 +261,26 @@ public CassandraStreamHeader deserialize(DataInputPlus in, int version) throws I sections.add(new SSTableReader.PartitionPositionBounds(in.readLong(), in.readLong())); CompressionInfo compressionInfo = CompressionInfo.serializer.deserialize(in, version); int sstableLevel = in.readInt(); + SerializationHeader.Component header = SerializationHeader.serializer.deserialize(sstableVersion, in); -return new CassandraStreamHeader(sstableVersion, format, estimatedKeys, sections, compressionInfo, sstableLevel, header); +TableId tableId = TableId.deserialize(in); +boolean fullStream = in.readBoolean(); +ComponentManifest manifest = null; +DecoratedKey firstKey = null; + +if (fullStream) +{ +manifest = ComponentManifest.serializer.deserialize(in, version); +ByteBuffer keyBuf = ByteBufferUtil.readWithVIntLength(in); +IPartitioner partitioner = partitionerMapper.apply(tableId); +if (partitioner == null) +throw new IllegalArgumentException(String.format("Could not determine partitioner for tableId {}", tableId)); --- End diff -- Another instance of `String.format()` format string with `{}` instead of `%s`, looks like. > Optimize streaming path in Cassandra > > > Key: CASSANDRA-14556 > URL: https://issues.apache.org/jira/browse/CASSANDRA-14556 > Project: Cassandra > Issue Type: Improvement > Components: Streaming and Messaging >Reporter: Dinesh Joshi >Assignee: Dinesh Joshi >Priority: Major > Labels: Performance > Fix For: 4.x > > > During streaming, Cassandra reifies the sstables into objects. This creates > unnecessary garbage and slows down the whole streaming process as some > sstables can be transferred as a whole file rather than individual > partitions. The objective of the ticket is to detect when a whole sstable can > be transferred and skip the object reification. We can also use a zero-copy > path to avoid bringing data into user-space on both sending and receiving > side. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16558427#comment-16558427 ] ASF GitHub Bot commented on CASSANDRA-14556: Github user iamaleksey commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r205416649 --- Diff: src/java/org/apache/cassandra/io/sstable/format/big/BigTableBlockWriter.java --- @@ -48,51 +47,61 @@ import org.apache.cassandra.schema.TableId; import org.apache.cassandra.schema.TableMetadataRef; +import static java.lang.String.format; +import static org.apache.cassandra.utils.FBUtilities.prettyPrintMemory; + public class BigTableBlockWriter extends SSTable implements SSTableMultiWriter { +private static final Logger logger = LoggerFactory.getLogger(BigTableBlockWriter.class); + private final TableMetadataRef metadata; -private final LifecycleTransaction txn; private volatile SSTableReader finalReader; private final Map componentWriters; -private final Logger logger = LoggerFactory.getLogger(BigTableBlockWriter.class); - -private final SequentialWriterOption writerOption = SequentialWriterOption.newBuilder() - .trickleFsync(false) - .bufferSize(2 * 1024 * 1024) - .bufferType(BufferType.OFF_HEAP) - .build(); -public static final ImmutableSet supportedComponents = ImmutableSet.of(Component.DATA, Component.PRIMARY_INDEX, Component.STATS, - Component.COMPRESSION_INFO, Component.FILTER, Component.SUMMARY, - Component.DIGEST, Component.CRC); +private static final SequentialWriterOption WRITER_OPTION = +SequentialWriterOption.newBuilder() + .trickleFsync(false) + .bufferSize(2 << 20) + .bufferType(BufferType.OFF_HEAP) + .build(); + +private static final ImmutableSet SUPPORTED_COMPONENTS = +ImmutableSet.of(Component.DATA, +Component.PRIMARY_INDEX, +Component.SUMMARY, +Component.STATS, +Component.COMPRESSION_INFO, +Component.FILTER, +Component.DIGEST, +Component.CRC); public BigTableBlockWriter(Descriptor descriptor, TableMetadataRef metadata, LifecycleTransaction txn, final Set components) { -super(descriptor, ImmutableSet.copyOf(components), metadata, - DatabaseDescriptor.getDiskOptimizationStrategy()); +super(descriptor, ImmutableSet.copyOf(components), metadata, DatabaseDescriptor.getDiskOptimizationStrategy()); + txn.trackNew(this); this.metadata = metadata; -this.txn = txn; -this.componentWriters = new HashMap<>(components.size()); +this.componentWriters = new EnumMap<>(Component.Type.class); -assert supportedComponents.containsAll(components) : String.format("Unsupported streaming component detected %s", - new HashSet(components).removeAll(supportedComponents)); +if (!SUPPORTED_COMPONENTS.containsAll(components)) +throw new AssertionError(format("Unsupported streaming component detected %s", +Sets.difference(components, SUPPORTED_COMPONENTS))); --- End diff -- Neat. I either forgot, or didn't know that `Sets.difference()` was a thing. This is nicer than the way I proposed (: > Optimize streaming path in Cassandra > > > Key: CASSANDRA-14556 > URL: https://issues.apache.org/jira/browse/CASSANDRA-14556 > Project: Cassandra > Issue Type: Improvement > Components: Streaming and Messaging >Reporter: Dinesh Joshi >Assignee: Dinesh Joshi >Priority: Major > Labels: Performance > Fix For: 4.x > > > During streaming, Cassandra reifies the sstables into
[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16558430#comment-16558430 ] ASF GitHub Bot commented on CASSANDRA-14556: Github user iamaleksey commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r205450061 --- Diff: src/java/org/apache/cassandra/db/streaming/CassandraBlockStreamWriter.java --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.streaming; + +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.channels.FileChannel; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.util.DataOutputStreamPlus; +import org.apache.cassandra.net.async.ByteBufDataOutputStreamPlus; +import org.apache.cassandra.streaming.ProgressInfo; +import org.apache.cassandra.streaming.StreamManager; +import org.apache.cassandra.streaming.StreamSession; + +import static org.apache.cassandra.streaming.StreamManager.StreamRateLimiter; +import static org.apache.cassandra.utils.FBUtilities.prettyPrintMemory; + +/** + * CassandraBlockStreamWriter streams the entire SSTable to given channel. + */ +public class CassandraBlockStreamWriter implements IStreamWriter +{ +private static final Logger logger = LoggerFactory.getLogger(CassandraBlockStreamWriter.class); + +private final SSTableReader sstable; +private final ComponentManifest manifest; +private final StreamSession session; +private final StreamRateLimiter limiter; + +public CassandraBlockStreamWriter(SSTableReader sstable, StreamSession session, ComponentManifest manifest) +{ +this.session = session; +this.sstable = sstable; +this.manifest = manifest; +this.limiter = StreamManager.getRateLimiter(session.peer); +} + +/** + * Stream the entire file to given channel. + * + * + * @param output where this writes data to + * @throws IOException on any I/O error + */ +@Override +public void write(DataOutputStreamPlus output) throws IOException +{ +long totalSize = manifest.totalSize(); +logger.debug("[Stream #{}] Start streaming sstable {} to {}, repairedAt = {}, totalSize = {}", + session.planId(), + sstable.getFilename(), + session.peer, + sstable.getSSTableMetadata().repairedAt, + prettyPrintMemory(totalSize)); + +long progress = 0L; +ByteBufDataOutputStreamPlus byteBufDataOutputStreamPlus = (ByteBufDataOutputStreamPlus) output; + +for (Component component : manifest.components()) +{ +@SuppressWarnings("resource") // this is closed after the file is transferred by ByteBufDataOutputStreamPlus +FileChannel in = new RandomAccessFile(sstable.descriptor.filenameFor(component), "r").getChannel(); + +// Total Length to transmit for this file +long length = in.size(); + +// tracks write progress +logger.debug("[Stream #{}] Block streaming {}.{} gen {} component {} size {}", session.planId(), + sstable.getKeyspaceName(), + sstable.getColumnFamilyName(), + sstable.descriptor.generation, + component, length); --- End diff -- `prettyPrintMemory()` missing here for `length`. > Optimize streaming path in Cassandra > > > Key: CASSANDRA-14556 >
[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16558429#comment-16558429 ] ASF GitHub Bot commented on CASSANDRA-14556: Github user iamaleksey commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r205445770 --- Diff: test/unit/org/apache/cassandra/db/streaming/CassandraStreamHeaderTest.java --- @@ -43,8 +51,38 @@ public void serializerTest() new ArrayList<>(), ((CompressionMetadata) null), 0, - SerializationHeader.makeWithoutStats(metadata).toComponent()); + SerializationHeader.makeWithoutStats(metadata).toComponent(), + metadata.id); SerializationUtils.assertSerializationCycle(header, CassandraStreamHeader.serializer); } + +@Test +public void serializerTest_FullSSTableTransfer() +{ +String ddl = "CREATE TABLE tbl (k INT PRIMARY KEY, v INT)"; +TableMetadata metadata = CreateTableStatement.parse(ddl, "ks").build(); + +ComponentManifest manifest = new ComponentManifest(new HashMap(ImmutableMap.of(Component.DATA, 100L))); --- End diff -- No need to wrap the immutable map in a hashmap here (: > Optimize streaming path in Cassandra > > > Key: CASSANDRA-14556 > URL: https://issues.apache.org/jira/browse/CASSANDRA-14556 > Project: Cassandra > Issue Type: Improvement > Components: Streaming and Messaging >Reporter: Dinesh Joshi >Assignee: Dinesh Joshi >Priority: Major > Labels: Performance > Fix For: 4.x > > > During streaming, Cassandra reifies the sstables into objects. This creates > unnecessary garbage and slows down the whole streaming process as some > sstables can be transferred as a whole file rather than individual > partitions. The objective of the ticket is to detect when a whole sstable can > be transferred and skip the object reification. We can also use a zero-copy > path to avoid bringing data into user-space on both sending and receiving > side. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16558047#comment-16558047 ] ASF GitHub Bot commented on CASSANDRA-14556: Github user dineshjoshi commented on the issue: https://github.com/apache/cassandra/pull/239 @iamaleksey I've addressed your comments including the one about disabling faster streaming for legacy counter shards. I did add a much less expensive check for STCS. It won't get all SSTables accurately but it is way cheaper than what I have for LCS. Let me know your thoughts. > Optimize streaming path in Cassandra > > > Key: CASSANDRA-14556 > URL: https://issues.apache.org/jira/browse/CASSANDRA-14556 > Project: Cassandra > Issue Type: Improvement > Components: Streaming and Messaging >Reporter: Dinesh Joshi >Assignee: Dinesh Joshi >Priority: Major > Labels: Performance > Fix For: 4.x > > > During streaming, Cassandra reifies the sstables into objects. This creates > unnecessary garbage and slows down the whole streaming process as some > sstables can be transferred as a whole file rather than individual > partitions. The objective of the ticket is to detect when a whole sstable can > be transferred and skip the object reification. We can also use a zero-copy > path to avoid bringing data into user-space on both sending and receiving > side. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556939#comment-16556939 ] ASF GitHub Bot commented on CASSANDRA-14556: Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r205339231 --- Diff: src/java/org/apache/cassandra/db/streaming/CassandraBlockStreamWriter.java --- @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.streaming; + +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.channels.FileChannel; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.util.DataOutputStreamPlus; +import org.apache.cassandra.net.async.ByteBufDataOutputStreamPlus; +import org.apache.cassandra.streaming.ProgressInfo; +import org.apache.cassandra.streaming.StreamManager; +import org.apache.cassandra.streaming.StreamSession; +import org.apache.cassandra.utils.FBUtilities; + +import static org.apache.cassandra.streaming.StreamManager.StreamRateLimiter; + +/** + * CassandraBlockStreamWriter streams the entire SSTable to given channel. + */ +public class CassandraBlockStreamWriter implements IStreamWriter +{ +private static final Logger logger = LoggerFactory.getLogger(CassandraBlockStreamWriter.class); + +private final SSTableReader sstable; +private final ComponentManifest manifest; +private final StreamSession session; +private final StreamRateLimiter limiter; + +public CassandraBlockStreamWriter(SSTableReader sstable, StreamSession session, ComponentManifest manifest) +{ +this.session = session; +this.sstable = sstable; +this.manifest = manifest; +this.limiter = StreamManager.getRateLimiter(session.peer); +} + +/** + * Stream the entire file to given channel. + * + * + * @param output where this writes data to + * @throws IOException on any I/O error + */ +@Override +public void write(DataOutputStreamPlus output) throws IOException +{ +long totalSize = manifest.getTotalSize(); +logger.debug("[Stream #{}] Start streaming sstable {} to {}, repairedAt = {}, totalSize = {}", session.planId(), + sstable.getFilename(), session.peer, sstable.getSSTableMetadata().repairedAt, totalSize); + +long progress = 0L; +ByteBufDataOutputStreamPlus byteBufDataOutputStreamPlus = (ByteBufDataOutputStreamPlus) output; + +for (Component component : manifest.getComponents()) +{ +@SuppressWarnings("resource") // this is closed after the file is transferred by ByteBufDataOutputStreamPlus +FileChannel in = new RandomAccessFile(sstable.descriptor.filenameFor(component), "r").getChannel(); + +// Total Length to transmit for this file +long length = in.size(); + +// tracks write progress +long bytesRead = 0; --- End diff -- I did rename this. > Optimize streaming path in Cassandra > > > Key: CASSANDRA-14556 > URL: https://issues.apache.org/jira/browse/CASSANDRA-14556 > Project: Cassandra > Issue Type: Improvement > Components: Streaming and Messaging >Reporter: Dinesh Joshi >Assignee: Dinesh Joshi >Priority: Major > Labels: Performance > Fix For: 4.x > > > During streaming, Cassandra reifies the sstables into objects. This creates > unnecessary garbage and slows
[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556934#comment-16556934 ] ASF GitHub Bot commented on CASSANDRA-14556: Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r205337510 --- Diff: src/java/org/apache/cassandra/db/compaction/Verifier.java --- @@ -361,12 +361,26 @@ public RangeOwnHelper(List> normalizedRanges) * @throws RuntimeException if the key is not contained */ public void check(DecoratedKey key) +{ +if (!checkBoolean(key)) --- End diff -- Thanks, I will bear that in mind for future patches. > Optimize streaming path in Cassandra > > > Key: CASSANDRA-14556 > URL: https://issues.apache.org/jira/browse/CASSANDRA-14556 > Project: Cassandra > Issue Type: Improvement > Components: Streaming and Messaging >Reporter: Dinesh Joshi >Assignee: Dinesh Joshi >Priority: Major > Labels: Performance > Fix For: 4.x > > > During streaming, Cassandra reifies the sstables into objects. This creates > unnecessary garbage and slows down the whole streaming process as some > sstables can be transferred as a whole file rather than individual > partitions. The objective of the ticket is to detect when a whole sstable can > be transferred and skip the object reification. We can also use a zero-copy > path to avoid bringing data into user-space on both sending and receiving > side. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556442#comment-16556442 ] ASF GitHub Bot commented on CASSANDRA-14556: Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r205299016 --- Diff: src/java/org/apache/cassandra/db/streaming/CassandraBlockStreamWriter.java --- @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.streaming; + +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.channels.FileChannel; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.util.DataOutputStreamPlus; +import org.apache.cassandra.net.async.ByteBufDataOutputStreamPlus; +import org.apache.cassandra.streaming.ProgressInfo; +import org.apache.cassandra.streaming.StreamManager; +import org.apache.cassandra.streaming.StreamSession; +import org.apache.cassandra.utils.FBUtilities; + +import static org.apache.cassandra.streaming.StreamManager.StreamRateLimiter; + +/** + * CassandraBlockStreamWriter streams the entire SSTable to given channel. + */ +public class CassandraBlockStreamWriter implements IStreamWriter +{ +private static final Logger logger = LoggerFactory.getLogger(CassandraBlockStreamWriter.class); + +private final SSTableReader sstable; +private final ComponentManifest manifest; +private final StreamSession session; +private final StreamRateLimiter limiter; + +public CassandraBlockStreamWriter(SSTableReader sstable, StreamSession session, ComponentManifest manifest) +{ +this.session = session; +this.sstable = sstable; +this.manifest = manifest; +this.limiter = StreamManager.getRateLimiter(session.peer); +} + +/** + * Stream the entire file to given channel. + * + * + * @param output where this writes data to + * @throws IOException on any I/O error + */ +@Override +public void write(DataOutputStreamPlus output) throws IOException +{ +long totalSize = manifest.getTotalSize(); +logger.debug("[Stream #{}] Start streaming sstable {} to {}, repairedAt = {}, totalSize = {}", session.planId(), + sstable.getFilename(), session.peer, sstable.getSSTableMetadata().repairedAt, totalSize); + +long progress = 0L; +ByteBufDataOutputStreamPlus byteBufDataOutputStreamPlus = (ByteBufDataOutputStreamPlus) output; + +for (Component component : manifest.getComponents()) +{ +@SuppressWarnings("resource") // this is closed after the file is transferred by ByteBufDataOutputStreamPlus +FileChannel in = new RandomAccessFile(sstable.descriptor.filenameFor(component), "r").getChannel(); + +// Total Length to transmit for this file +long length = in.size(); + +// tracks write progress +long bytesRead = 0; +logger.debug("[Stream #{}] Block streaming {}.{} gen {} component {} size {}", session.planId(), +sstable.getKeyspaceName(), sstable.getColumnFamilyName(), sstable.descriptor.generation, component, length); + +bytesRead += byteBufDataOutputStreamPlus.writeToChannel(in, limiter); --- End diff -- Yes, this *should* be the same. I avoided making that assumption and instead use what `writeToChannel()` returns. It is also used to update the `session.progress()`. Ideally, `bytesRead == length`, else we have some bug. I did rewrite this to make it
[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556436#comment-16556436 ] ASF GitHub Bot commented on CASSANDRA-14556: Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r205298316 --- Diff: src/java/org/apache/cassandra/db/streaming/CassandraBlockStreamWriter.java --- @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.streaming; + +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.channels.FileChannel; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.util.DataOutputStreamPlus; +import org.apache.cassandra.net.async.ByteBufDataOutputStreamPlus; +import org.apache.cassandra.streaming.ProgressInfo; +import org.apache.cassandra.streaming.StreamManager; +import org.apache.cassandra.streaming.StreamSession; +import org.apache.cassandra.utils.FBUtilities; + +import static org.apache.cassandra.streaming.StreamManager.StreamRateLimiter; + +/** + * CassandraBlockStreamWriter streams the entire SSTable to given channel. + */ +public class CassandraBlockStreamWriter implements IStreamWriter +{ +private static final Logger logger = LoggerFactory.getLogger(CassandraBlockStreamWriter.class); + +private final SSTableReader sstable; +private final ComponentManifest manifest; +private final StreamSession session; +private final StreamRateLimiter limiter; + +public CassandraBlockStreamWriter(SSTableReader sstable, StreamSession session, ComponentManifest manifest) +{ +this.session = session; +this.sstable = sstable; +this.manifest = manifest; +this.limiter = StreamManager.getRateLimiter(session.peer); +} + +/** + * Stream the entire file to given channel. + * + * + * @param output where this writes data to + * @throws IOException on any I/O error + */ +@Override +public void write(DataOutputStreamPlus output) throws IOException +{ +long totalSize = manifest.getTotalSize(); +logger.debug("[Stream #{}] Start streaming sstable {} to {}, repairedAt = {}, totalSize = {}", session.planId(), + sstable.getFilename(), session.peer, sstable.getSSTableMetadata().repairedAt, totalSize); --- End diff -- Fixed. > Optimize streaming path in Cassandra > > > Key: CASSANDRA-14556 > URL: https://issues.apache.org/jira/browse/CASSANDRA-14556 > Project: Cassandra > Issue Type: Improvement > Components: Streaming and Messaging >Reporter: Dinesh Joshi >Assignee: Dinesh Joshi >Priority: Major > Labels: Performance > Fix For: 4.x > > > During streaming, Cassandra reifies the sstables into objects. This creates > unnecessary garbage and slows down the whole streaming process as some > sstables can be transferred as a whole file rather than individual > partitions. The objective of the ticket is to detect when a whole sstable can > be transferred and skip the object reification. We can also use a zero-copy > path to avoid bringing data into user-space on both sending and receiving > side. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556431#comment-16556431 ] ASF GitHub Bot commented on CASSANDRA-14556: Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r205298028 --- Diff: src/java/org/apache/cassandra/db/streaming/CassandraBlockStreamWriter.java --- @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.streaming; + +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.channels.FileChannel; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.util.DataOutputStreamPlus; +import org.apache.cassandra.net.async.ByteBufDataOutputStreamPlus; +import org.apache.cassandra.streaming.ProgressInfo; +import org.apache.cassandra.streaming.StreamManager; +import org.apache.cassandra.streaming.StreamSession; +import org.apache.cassandra.utils.FBUtilities; + +import static org.apache.cassandra.streaming.StreamManager.StreamRateLimiter; + +/** + * CassandraBlockStreamWriter streams the entire SSTable to given channel. + */ +public class CassandraBlockStreamWriter implements IStreamWriter +{ +private static final Logger logger = LoggerFactory.getLogger(CassandraBlockStreamWriter.class); + +private final SSTableReader sstable; +private final ComponentManifest manifest; +private final StreamSession session; +private final StreamRateLimiter limiter; + +public CassandraBlockStreamWriter(SSTableReader sstable, StreamSession session, ComponentManifest manifest) +{ +this.session = session; +this.sstable = sstable; +this.manifest = manifest; +this.limiter = StreamManager.getRateLimiter(session.peer); +} + +/** + * Stream the entire file to given channel. + * + * + * @param output where this writes data to + * @throws IOException on any I/O error + */ +@Override +public void write(DataOutputStreamPlus output) throws IOException +{ +long totalSize = manifest.getTotalSize(); +logger.debug("[Stream #{}] Start streaming sstable {} to {}, repairedAt = {}, totalSize = {}", session.planId(), + sstable.getFilename(), session.peer, sstable.getSSTableMetadata().repairedAt, totalSize); + +long progress = 0L; +ByteBufDataOutputStreamPlus byteBufDataOutputStreamPlus = (ByteBufDataOutputStreamPlus) output; + +for (Component component : manifest.getComponents()) +{ +@SuppressWarnings("resource") // this is closed after the file is transferred by ByteBufDataOutputStreamPlus +FileChannel in = new RandomAccessFile(sstable.descriptor.filenameFor(component), "r").getChannel(); + +// Total Length to transmit for this file +long length = in.size(); + +// tracks write progress +long bytesRead = 0; +logger.debug("[Stream #{}] Block streaming {}.{} gen {} component {} size {}", session.planId(), +sstable.getKeyspaceName(), sstable.getColumnFamilyName(), sstable.descriptor.generation, component, length); + +bytesRead += byteBufDataOutputStreamPlus.writeToChannel(in, limiter); +progress += bytesRead; + +session.progress(sstable.descriptor.filenameFor(component), ProgressInfo.Direction.OUT, bytesRead, + length); + +logger.debug("[Stream #{}] Finished block streaming {}.{}
[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556430#comment-16556430 ] ASF GitHub Bot commented on CASSANDRA-14556: Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r205297956 --- Diff: src/java/org/apache/cassandra/db/streaming/CassandraBlockStreamWriter.java --- @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.streaming; + +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.channels.FileChannel; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.util.DataOutputStreamPlus; +import org.apache.cassandra.net.async.ByteBufDataOutputStreamPlus; +import org.apache.cassandra.streaming.ProgressInfo; +import org.apache.cassandra.streaming.StreamManager; +import org.apache.cassandra.streaming.StreamSession; +import org.apache.cassandra.utils.FBUtilities; + +import static org.apache.cassandra.streaming.StreamManager.StreamRateLimiter; + +/** + * CassandraBlockStreamWriter streams the entire SSTable to given channel. + */ +public class CassandraBlockStreamWriter implements IStreamWriter +{ +private static final Logger logger = LoggerFactory.getLogger(CassandraBlockStreamWriter.class); + +private final SSTableReader sstable; +private final ComponentManifest manifest; +private final StreamSession session; +private final StreamRateLimiter limiter; + +public CassandraBlockStreamWriter(SSTableReader sstable, StreamSession session, ComponentManifest manifest) +{ +this.session = session; +this.sstable = sstable; +this.manifest = manifest; +this.limiter = StreamManager.getRateLimiter(session.peer); +} + +/** + * Stream the entire file to given channel. + * + * + * @param output where this writes data to + * @throws IOException on any I/O error + */ +@Override +public void write(DataOutputStreamPlus output) throws IOException +{ +long totalSize = manifest.getTotalSize(); +logger.debug("[Stream #{}] Start streaming sstable {} to {}, repairedAt = {}, totalSize = {}", session.planId(), + sstable.getFilename(), session.peer, sstable.getSSTableMetadata().repairedAt, totalSize); + +long progress = 0L; +ByteBufDataOutputStreamPlus byteBufDataOutputStreamPlus = (ByteBufDataOutputStreamPlus) output; + +for (Component component : manifest.getComponents()) +{ +@SuppressWarnings("resource") // this is closed after the file is transferred by ByteBufDataOutputStreamPlus +FileChannel in = new RandomAccessFile(sstable.descriptor.filenameFor(component), "r").getChannel(); + +// Total Length to transmit for this file +long length = in.size(); + +// tracks write progress +long bytesRead = 0; +logger.debug("[Stream #{}] Block streaming {}.{} gen {} component {} size {}", session.planId(), +sstable.getKeyspaceName(), sstable.getColumnFamilyName(), sstable.descriptor.generation, component, length); + +bytesRead += byteBufDataOutputStreamPlus.writeToChannel(in, limiter); +progress += bytesRead; + +session.progress(sstable.descriptor.filenameFor(component), ProgressInfo.Direction.OUT, bytesRead, + length); + +logger.debug("[Stream #{}] Finished block streaming {}.{}
[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556429#comment-16556429 ] ASF GitHub Bot commented on CASSANDRA-14556: Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r205297558 --- Diff: src/java/org/apache/cassandra/db/streaming/ComponentManifest.java --- @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.streaming; + +import java.io.IOException; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; + +public class ComponentManifest +{ +private final LinkedHashMap manifest; +private final Set components = new LinkedHashSet<>(Component.Type.values().length); +private final long totalSize; + +public ComponentManifest(Map componentManifest) +{ +this.manifest = new LinkedHashMap<>(componentManifest); + +long size = 0; +for (Map.Entry entry : this.manifest.entrySet()) +{ +size += entry.getValue(); +this.components.add(Component.parse(entry.getKey().repr)); +} + +this.totalSize = size; +} + +public Long getSizeForType(Component.Type type) +{ +return manifest.get(type); +} + +public long getTotalSize() +{ +return totalSize; +} + +public Set getComponents() +{ +return Collections.unmodifiableSet(components); +} + +public boolean equals(Object o) +{ +if (this == o) return true; +if (o == null || getClass() != o.getClass()) return false; +ComponentManifest that = (ComponentManifest) o; +return totalSize == that.totalSize && + Objects.equals(manifest, that.manifest); +} + +public int hashCode() +{ + +return Objects.hash(manifest, totalSize); +} + +public static final IVersionedSerializer serializer = new IVersionedSerializer() +{ +public void serialize(ComponentManifest manifest, DataOutputPlus out, int version) throws IOException +{ +out.writeInt(manifest.manifest.size()); +for (Map.Entry entry : manifest.manifest.entrySet()) +serialize(entry.getKey(), entry.getValue(), out); +} + +public ComponentManifest deserialize(DataInputPlus in, int version) throws IOException +{ +LinkedHashMap components = new LinkedHashMap<>(Component.Type.values().length); + +int size = in.readInt(); +assert size >= 0 : "Invalid number of components"; + +for (int i = 0; i < size; i++) +{ +Component.Type type = Component.Type.fromRepresentation(in.readByte()); +long length = in.readLong(); +components.put(type, length); +} + +return new ComponentManifest(components); +} + +public long serializedSize(ComponentManifest manifest, int version) +{ +long size = 0; +size += TypeSizes.sizeof(manifest.manifest.size()); +for (Map.Entry entry : manifest.manifest.entrySet()) +{ +size += TypeSizes.sizeof(entry.getKey().id); +size +=
[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556425#comment-16556425 ] ASF GitHub Bot commented on CASSANDRA-14556: Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r205297184 --- Diff: src/java/org/apache/cassandra/db/streaming/ComponentManifest.java --- @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.streaming; + +import java.io.IOException; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; + +public class ComponentManifest +{ +private final LinkedHashMap manifest; +private final Set components = new LinkedHashSet<>(Component.Type.values().length); --- End diff -- Done. > Optimize streaming path in Cassandra > > > Key: CASSANDRA-14556 > URL: https://issues.apache.org/jira/browse/CASSANDRA-14556 > Project: Cassandra > Issue Type: Improvement > Components: Streaming and Messaging >Reporter: Dinesh Joshi >Assignee: Dinesh Joshi >Priority: Major > Labels: Performance > Fix For: 4.x > > > During streaming, Cassandra reifies the sstables into objects. This creates > unnecessary garbage and slows down the whole streaming process as some > sstables can be transferred as a whole file rather than individual > partitions. The objective of the ticket is to detect when a whole sstable can > be transferred and skip the object reification. We can also use a zero-copy > path to avoid bringing data into user-space on both sending and receiving > side. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556424#comment-16556424 ] ASF GitHub Bot commented on CASSANDRA-14556: Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r205297151 --- Diff: src/java/org/apache/cassandra/db/streaming/ComponentManifest.java --- @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.streaming; + +import java.io.IOException; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; + +public class ComponentManifest +{ +private final LinkedHashMap manifest; --- End diff -- I had originally thought of going in this direction. I have rewritten this class and incorporated your suggestion including most stylistic changes. > Optimize streaming path in Cassandra > > > Key: CASSANDRA-14556 > URL: https://issues.apache.org/jira/browse/CASSANDRA-14556 > Project: Cassandra > Issue Type: Improvement > Components: Streaming and Messaging >Reporter: Dinesh Joshi >Assignee: Dinesh Joshi >Priority: Major > Labels: Performance > Fix For: 4.x > > > During streaming, Cassandra reifies the sstables into objects. This creates > unnecessary garbage and slows down the whole streaming process as some > sstables can be transferred as a whole file rather than individual > partitions. The objective of the ticket is to detect when a whole sstable can > be transferred and skip the object reification. We can also use a zero-copy > path to avoid bringing data into user-space on both sending and receiving > side. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556363#comment-16556363 ] ASF GitHub Bot commented on CASSANDRA-14556: Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r205283093 --- Diff: src/java/org/apache/cassandra/io/sstable/format/big/BigTableBlockWriter.java --- @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io.sstable.format.big; + +import java.io.File; +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.io.FSWriteError; +import org.apache.cassandra.io.compress.BufferType; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.SSTable; +import org.apache.cassandra.io.sstable.SSTableMultiWriter; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.SequentialWriter; +import org.apache.cassandra.io.util.SequentialWriterOption; +import org.apache.cassandra.net.async.RebufferingByteBufDataInputPlus; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.schema.TableMetadataRef; + +public class BigTableBlockWriter extends SSTable implements SSTableMultiWriter +{ +private final TableMetadataRef metadata; +private final LifecycleTransaction txn; +private volatile SSTableReader finalReader; +private final Map componentWriters; + +private final Logger logger = LoggerFactory.getLogger(BigTableBlockWriter.class); + +private final SequentialWriterOption writerOption = SequentialWriterOption.newBuilder() + .trickleFsync(false) + .bufferSize(2 * 1024 * 1024) + .bufferType(BufferType.OFF_HEAP) + .build(); +public static final ImmutableSet supportedComponents = ImmutableSet.of(Component.DATA, Component.PRIMARY_INDEX, Component.STATS, + Component.COMPRESSION_INFO, Component.FILTER, Component.SUMMARY, + Component.DIGEST, Component.CRC); + +public BigTableBlockWriter(Descriptor descriptor, + TableMetadataRef metadata, + LifecycleTransaction txn, + final Set components) +{ +super(descriptor, ImmutableSet.copyOf(components), metadata, + DatabaseDescriptor.getDiskOptimizationStrategy()); +txn.trackNew(this); +this.metadata = metadata; +this.txn = txn; +this.componentWriters = new HashMap<>(components.size()); + +assert supportedComponents.containsAll(components) : String.format("Unsupported streaming component detected %s", + new
[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556361#comment-16556361 ] ASF GitHub Bot commented on CASSANDRA-14556: Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r205282784 --- Diff: src/java/org/apache/cassandra/io/sstable/format/big/BigTableBlockWriter.java --- @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io.sstable.format.big; + +import java.io.File; +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.io.FSWriteError; +import org.apache.cassandra.io.compress.BufferType; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.SSTable; +import org.apache.cassandra.io.sstable.SSTableMultiWriter; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.SequentialWriter; +import org.apache.cassandra.io.util.SequentialWriterOption; +import org.apache.cassandra.net.async.RebufferingByteBufDataInputPlus; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.schema.TableMetadataRef; + +public class BigTableBlockWriter extends SSTable implements SSTableMultiWriter +{ +private final TableMetadataRef metadata; +private final LifecycleTransaction txn; +private volatile SSTableReader finalReader; +private final Map componentWriters; + +private final Logger logger = LoggerFactory.getLogger(BigTableBlockWriter.class); + +private final SequentialWriterOption writerOption = SequentialWriterOption.newBuilder() + .trickleFsync(false) + .bufferSize(2 * 1024 * 1024) + .bufferType(BufferType.OFF_HEAP) + .build(); +public static final ImmutableSet supportedComponents = ImmutableSet.of(Component.DATA, Component.PRIMARY_INDEX, Component.STATS, + Component.COMPRESSION_INFO, Component.FILTER, Component.SUMMARY, + Component.DIGEST, Component.CRC); + +public BigTableBlockWriter(Descriptor descriptor, + TableMetadataRef metadata, + LifecycleTransaction txn, + final Set components) +{ +super(descriptor, ImmutableSet.copyOf(components), metadata, + DatabaseDescriptor.getDiskOptimizationStrategy()); +txn.trackNew(this); +this.metadata = metadata; +this.txn = txn; --- End diff -- Removed. > Optimize streaming path in Cassandra > > > Key: CASSANDRA-14556 > URL: https://issues.apache.org/jira/browse/CASSANDRA-14556 > Project: Cassandra > Issue Type: Improvement > Components: Streaming
[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556362#comment-16556362 ] ASF GitHub Bot commented on CASSANDRA-14556: Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r205282807 --- Diff: src/java/org/apache/cassandra/io/sstable/format/big/BigTableBlockWriter.java --- @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io.sstable.format.big; + +import java.io.File; +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.io.FSWriteError; +import org.apache.cassandra.io.compress.BufferType; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.SSTable; +import org.apache.cassandra.io.sstable.SSTableMultiWriter; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.SequentialWriter; +import org.apache.cassandra.io.util.SequentialWriterOption; +import org.apache.cassandra.net.async.RebufferingByteBufDataInputPlus; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.schema.TableMetadataRef; + +public class BigTableBlockWriter extends SSTable implements SSTableMultiWriter +{ +private final TableMetadataRef metadata; +private final LifecycleTransaction txn; +private volatile SSTableReader finalReader; +private final Map componentWriters; + +private final Logger logger = LoggerFactory.getLogger(BigTableBlockWriter.class); + +private final SequentialWriterOption writerOption = SequentialWriterOption.newBuilder() + .trickleFsync(false) + .bufferSize(2 * 1024 * 1024) + .bufferType(BufferType.OFF_HEAP) + .build(); +public static final ImmutableSet supportedComponents = ImmutableSet.of(Component.DATA, Component.PRIMARY_INDEX, Component.STATS, + Component.COMPRESSION_INFO, Component.FILTER, Component.SUMMARY, + Component.DIGEST, Component.CRC); + +public BigTableBlockWriter(Descriptor descriptor, + TableMetadataRef metadata, + LifecycleTransaction txn, + final Set components) +{ +super(descriptor, ImmutableSet.copyOf(components), metadata, + DatabaseDescriptor.getDiskOptimizationStrategy()); +txn.trackNew(this); +this.metadata = metadata; +this.txn = txn; +this.componentWriters = new HashMap<>(components.size()); + +assert supportedComponents.containsAll(components) : String.format("Unsupported streaming component detected %s", + new
[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556345#comment-16556345 ] ASF GitHub Bot commented on CASSANDRA-14556: Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r205277660 --- Diff: src/java/org/apache/cassandra/db/streaming/CassandraBlockStreamReader.java --- @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.streaming; + +import java.io.File; +import java.io.IOException; +import java.util.Set; + +import com.google.common.base.Throwables; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.Directories; +import org.apache.cassandra.db.SerializationHeader; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.SSTableMultiWriter; +import org.apache.cassandra.io.sstable.format.SSTableFormat; +import org.apache.cassandra.io.sstable.format.Version; +import org.apache.cassandra.io.sstable.format.big.BigTableBlockWriter; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.streaming.ProgressInfo; +import org.apache.cassandra.streaming.StreamReceiver; +import org.apache.cassandra.streaming.StreamSession; +import org.apache.cassandra.streaming.messages.StreamMessageHeader; +import org.apache.cassandra.utils.FBUtilities; + +/** + * CassandraBlockStreamReader reads SSTable off the wire and writes it to disk. + */ +public class CassandraBlockStreamReader implements IStreamReader +{ +private static final Logger logger = LoggerFactory.getLogger(CassandraBlockStreamReader.class); +private final TableId tableId; +private final StreamSession session; +private final int sstableLevel; +private final SerializationHeader.Component header; +private final int fileSeqNum; +private final ComponentManifest manifest; +private final SSTableFormat.Type format; +private final Version version; +private final DecoratedKey firstKey; + +public CassandraBlockStreamReader(StreamMessageHeader header, CassandraStreamHeader streamHeader, StreamSession session) +{ +if (session.getPendingRepair() != null) +{ +// we should only ever be streaming pending repair +// sstables if the session has a pending repair id +if (!session.getPendingRepair().equals(header.pendingRepair)) +throw new IllegalStateException(String.format("Stream Session & SSTable ({}) pendingRepair UUID mismatch.", + header.tableId)); +} +this.session = session; +this.tableId = header.tableId; +this.manifest = streamHeader.componentManifest; +this.sstableLevel = streamHeader.sstableLevel; +this.header = streamHeader.header; +this.format = streamHeader.format; +this.fileSeqNum = header.sequenceNumber; +this.version = streamHeader.version; +this.firstKey = streamHeader.firstKey; +} + +/** + * @param inputPlus where this reads data from + * @return SSTable transferred + * @throws IOException if reading the remote sstable fails. Will throw an RTE if local write fails. + */ +@SuppressWarnings("resource") // input needs to remain open, streams on top of it can't be closed +@Override +public SSTableMultiWriter read(DataInputPlus inputPlus)
[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556342#comment-16556342 ] ASF GitHub Bot commented on CASSANDRA-14556: Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r205277492 --- Diff: src/java/org/apache/cassandra/db/streaming/CassandraBlockStreamReader.java --- @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.streaming; + +import java.io.File; +import java.io.IOException; +import java.util.Set; + +import com.google.common.base.Throwables; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.Directories; +import org.apache.cassandra.db.SerializationHeader; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.SSTableMultiWriter; +import org.apache.cassandra.io.sstable.format.SSTableFormat; +import org.apache.cassandra.io.sstable.format.Version; +import org.apache.cassandra.io.sstable.format.big.BigTableBlockWriter; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.streaming.ProgressInfo; +import org.apache.cassandra.streaming.StreamReceiver; +import org.apache.cassandra.streaming.StreamSession; +import org.apache.cassandra.streaming.messages.StreamMessageHeader; +import org.apache.cassandra.utils.FBUtilities; + +/** + * CassandraBlockStreamReader reads SSTable off the wire and writes it to disk. + */ +public class CassandraBlockStreamReader implements IStreamReader +{ +private static final Logger logger = LoggerFactory.getLogger(CassandraBlockStreamReader.class); +private final TableId tableId; +private final StreamSession session; +private final int sstableLevel; +private final SerializationHeader.Component header; +private final int fileSeqNum; +private final ComponentManifest manifest; +private final SSTableFormat.Type format; +private final Version version; +private final DecoratedKey firstKey; + +public CassandraBlockStreamReader(StreamMessageHeader header, CassandraStreamHeader streamHeader, StreamSession session) +{ +if (session.getPendingRepair() != null) +{ +// we should only ever be streaming pending repair +// sstables if the session has a pending repair id +if (!session.getPendingRepair().equals(header.pendingRepair)) +throw new IllegalStateException(String.format("Stream Session & SSTable ({}) pendingRepair UUID mismatch.", + header.tableId)); +} +this.session = session; +this.tableId = header.tableId; +this.manifest = streamHeader.componentManifest; +this.sstableLevel = streamHeader.sstableLevel; +this.header = streamHeader.header; +this.format = streamHeader.format; +this.fileSeqNum = header.sequenceNumber; +this.version = streamHeader.version; +this.firstKey = streamHeader.firstKey; +} + +/** + * @param inputPlus where this reads data from + * @return SSTable transferred + * @throws IOException if reading the remote sstable fails. Will throw an RTE if local write fails. + */ +@SuppressWarnings("resource") // input needs to remain open, streams on top of it can't be closed +@Override +public SSTableMultiWriter read(DataInputPlus inputPlus)
[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556344#comment-16556344 ] ASF GitHub Bot commented on CASSANDRA-14556: Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r205277562 --- Diff: src/java/org/apache/cassandra/db/streaming/CassandraBlockStreamReader.java --- @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.streaming; + +import java.io.File; +import java.io.IOException; +import java.util.Set; + +import com.google.common.base.Throwables; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.Directories; +import org.apache.cassandra.db.SerializationHeader; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.SSTableMultiWriter; +import org.apache.cassandra.io.sstable.format.SSTableFormat; +import org.apache.cassandra.io.sstable.format.Version; +import org.apache.cassandra.io.sstable.format.big.BigTableBlockWriter; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.streaming.ProgressInfo; +import org.apache.cassandra.streaming.StreamReceiver; +import org.apache.cassandra.streaming.StreamSession; +import org.apache.cassandra.streaming.messages.StreamMessageHeader; +import org.apache.cassandra.utils.FBUtilities; + +/** + * CassandraBlockStreamReader reads SSTable off the wire and writes it to disk. + */ +public class CassandraBlockStreamReader implements IStreamReader +{ +private static final Logger logger = LoggerFactory.getLogger(CassandraBlockStreamReader.class); +private final TableId tableId; +private final StreamSession session; +private final int sstableLevel; +private final SerializationHeader.Component header; +private final int fileSeqNum; +private final ComponentManifest manifest; +private final SSTableFormat.Type format; +private final Version version; +private final DecoratedKey firstKey; + +public CassandraBlockStreamReader(StreamMessageHeader header, CassandraStreamHeader streamHeader, StreamSession session) +{ +if (session.getPendingRepair() != null) +{ +// we should only ever be streaming pending repair +// sstables if the session has a pending repair id +if (!session.getPendingRepair().equals(header.pendingRepair)) +throw new IllegalStateException(String.format("Stream Session & SSTable ({}) pendingRepair UUID mismatch.", + header.tableId)); +} +this.session = session; +this.tableId = header.tableId; +this.manifest = streamHeader.componentManifest; +this.sstableLevel = streamHeader.sstableLevel; +this.header = streamHeader.header; +this.format = streamHeader.format; +this.fileSeqNum = header.sequenceNumber; +this.version = streamHeader.version; +this.firstKey = streamHeader.firstKey; +} + +/** + * @param inputPlus where this reads data from + * @return SSTable transferred + * @throws IOException if reading the remote sstable fails. Will throw an RTE if local write fails. + */ +@SuppressWarnings("resource") // input needs to remain open, streams on top of it can't be closed +@Override +public SSTableMultiWriter read(DataInputPlus inputPlus)
[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556343#comment-16556343 ] ASF GitHub Bot commented on CASSANDRA-14556: Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r205277545 --- Diff: src/java/org/apache/cassandra/db/streaming/CassandraBlockStreamReader.java --- @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.streaming; + +import java.io.File; +import java.io.IOException; +import java.util.Set; + +import com.google.common.base.Throwables; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.Directories; +import org.apache.cassandra.db.SerializationHeader; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.SSTableMultiWriter; +import org.apache.cassandra.io.sstable.format.SSTableFormat; +import org.apache.cassandra.io.sstable.format.Version; +import org.apache.cassandra.io.sstable.format.big.BigTableBlockWriter; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.streaming.ProgressInfo; +import org.apache.cassandra.streaming.StreamReceiver; +import org.apache.cassandra.streaming.StreamSession; +import org.apache.cassandra.streaming.messages.StreamMessageHeader; +import org.apache.cassandra.utils.FBUtilities; + +/** + * CassandraBlockStreamReader reads SSTable off the wire and writes it to disk. + */ +public class CassandraBlockStreamReader implements IStreamReader +{ +private static final Logger logger = LoggerFactory.getLogger(CassandraBlockStreamReader.class); +private final TableId tableId; +private final StreamSession session; +private final int sstableLevel; +private final SerializationHeader.Component header; --- End diff -- Cherry picked. > Optimize streaming path in Cassandra > > > Key: CASSANDRA-14556 > URL: https://issues.apache.org/jira/browse/CASSANDRA-14556 > Project: Cassandra > Issue Type: Improvement > Components: Streaming and Messaging >Reporter: Dinesh Joshi >Assignee: Dinesh Joshi >Priority: Major > Labels: Performance > Fix For: 4.x > > > During streaming, Cassandra reifies the sstables into objects. This creates > unnecessary garbage and slows down the whole streaming process as some > sstables can be transferred as a whole file rather than individual > partitions. The objective of the ticket is to detect when a whole sstable can > be transferred and skip the object reification. We can also use a zero-copy > path to avoid bringing data into user-space on both sending and receiving > side. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556341#comment-16556341 ] ASF GitHub Bot commented on CASSANDRA-14556: Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r205276519 --- Diff: src/java/org/apache/cassandra/db/streaming/CassandraBlockStreamReader.java --- @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.streaming; + +import java.io.File; +import java.io.IOException; +import java.util.Set; + +import com.google.common.base.Throwables; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.Directories; +import org.apache.cassandra.db.SerializationHeader; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.SSTableMultiWriter; +import org.apache.cassandra.io.sstable.format.SSTableFormat; +import org.apache.cassandra.io.sstable.format.Version; +import org.apache.cassandra.io.sstable.format.big.BigTableBlockWriter; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.streaming.ProgressInfo; +import org.apache.cassandra.streaming.StreamReceiver; +import org.apache.cassandra.streaming.StreamSession; +import org.apache.cassandra.streaming.messages.StreamMessageHeader; +import org.apache.cassandra.utils.FBUtilities; + +/** + * CassandraBlockStreamReader reads SSTable off the wire and writes it to disk. + */ +public class CassandraBlockStreamReader implements IStreamReader +{ +private static final Logger logger = LoggerFactory.getLogger(CassandraBlockStreamReader.class); +private final TableId tableId; +private final StreamSession session; +private final int sstableLevel; +private final SerializationHeader.Component header; +private final int fileSeqNum; +private final ComponentManifest manifest; +private final SSTableFormat.Type format; +private final Version version; +private final DecoratedKey firstKey; + +public CassandraBlockStreamReader(StreamMessageHeader header, CassandraStreamHeader streamHeader, StreamSession session) +{ +if (session.getPendingRepair() != null) +{ +// we should only ever be streaming pending repair +// sstables if the session has a pending repair id +if (!session.getPendingRepair().equals(header.pendingRepair)) +throw new IllegalStateException(String.format("Stream Session & SSTable ({}) pendingRepair UUID mismatch.", --- End diff -- Great catch. Yes, it used to be a logging call. > Optimize streaming path in Cassandra > > > Key: CASSANDRA-14556 > URL: https://issues.apache.org/jira/browse/CASSANDRA-14556 > Project: Cassandra > Issue Type: Improvement > Components: Streaming and Messaging >Reporter: Dinesh Joshi >Assignee: Dinesh Joshi >Priority: Major > Labels: Performance > Fix For: 4.x > > > During streaming, Cassandra reifies the sstables into objects. This creates > unnecessary garbage and slows down the whole streaming process as some > sstables can be transferred as a whole file rather than individual > partitions. The objective of the ticket is to detect when a whole sstable can > be transferred and skip the object reification. We can also use a zero-copy > path to avoid bringing data into user-space on both
[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556336#comment-16556336 ] ASF GitHub Bot commented on CASSANDRA-14556: Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r205276269 --- Diff: src/java/org/apache/cassandra/io/sstable/format/big/BigTableBlockWriter.java --- @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io.sstable.format.big; + +import java.io.File; +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.io.FSWriteError; +import org.apache.cassandra.io.compress.BufferType; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.SSTable; +import org.apache.cassandra.io.sstable.SSTableMultiWriter; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.SequentialWriter; +import org.apache.cassandra.io.util.SequentialWriterOption; +import org.apache.cassandra.net.async.RebufferingByteBufDataInputPlus; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.schema.TableMetadataRef; + +public class BigTableBlockWriter extends SSTable implements SSTableMultiWriter +{ +private final TableMetadataRef metadata; +private final LifecycleTransaction txn; +private volatile SSTableReader finalReader; +private final Map componentWriters; + +private final Logger logger = LoggerFactory.getLogger(BigTableBlockWriter.class); + +private final SequentialWriterOption writerOption = SequentialWriterOption.newBuilder() + .trickleFsync(false) + .bufferSize(2 * 1024 * 1024) + .bufferType(BufferType.OFF_HEAP) + .build(); +public static final ImmutableSet supportedComponents = ImmutableSet.of(Component.DATA, Component.PRIMARY_INDEX, Component.STATS, + Component.COMPRESSION_INFO, Component.FILTER, Component.SUMMARY, + Component.DIGEST, Component.CRC); + +public BigTableBlockWriter(Descriptor descriptor, + TableMetadataRef metadata, + LifecycleTransaction txn, + final Set components) +{ +super(descriptor, ImmutableSet.copyOf(components), metadata, + DatabaseDescriptor.getDiskOptimizationStrategy()); +txn.trackNew(this); +this.metadata = metadata; +this.txn = txn; +this.componentWriters = new HashMap<>(components.size()); + +assert supportedComponents.containsAll(components) : String.format("Unsupported streaming component detected %s", + new
[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556335#comment-16556335 ] ASF GitHub Bot commented on CASSANDRA-14556: Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r205276260 --- Diff: src/java/org/apache/cassandra/io/sstable/format/big/BigTableBlockWriter.java --- @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io.sstable.format.big; + +import java.io.File; +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.io.FSWriteError; +import org.apache.cassandra.io.compress.BufferType; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.SSTable; +import org.apache.cassandra.io.sstable.SSTableMultiWriter; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.SequentialWriter; +import org.apache.cassandra.io.util.SequentialWriterOption; +import org.apache.cassandra.net.async.RebufferingByteBufDataInputPlus; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.schema.TableMetadataRef; + +public class BigTableBlockWriter extends SSTable implements SSTableMultiWriter +{ +private final TableMetadataRef metadata; +private final LifecycleTransaction txn; +private volatile SSTableReader finalReader; +private final Map componentWriters; + +private final Logger logger = LoggerFactory.getLogger(BigTableBlockWriter.class); + +private final SequentialWriterOption writerOption = SequentialWriterOption.newBuilder() + .trickleFsync(false) + .bufferSize(2 * 1024 * 1024) + .bufferType(BufferType.OFF_HEAP) + .build(); +public static final ImmutableSet supportedComponents = ImmutableSet.of(Component.DATA, Component.PRIMARY_INDEX, Component.STATS, + Component.COMPRESSION_INFO, Component.FILTER, Component.SUMMARY, + Component.DIGEST, Component.CRC); + +public BigTableBlockWriter(Descriptor descriptor, + TableMetadataRef metadata, + LifecycleTransaction txn, + final Set components) +{ +super(descriptor, ImmutableSet.copyOf(components), metadata, + DatabaseDescriptor.getDiskOptimizationStrategy()); +txn.trackNew(this); +this.metadata = metadata; +this.txn = txn; +this.componentWriters = new HashMap<>(components.size()); + +assert supportedComponents.containsAll(components) : String.format("Unsupported streaming component detected %s", + new
[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556333#comment-16556333 ] ASF GitHub Bot commented on CASSANDRA-14556: Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r205276122 --- Diff: src/java/org/apache/cassandra/db/streaming/CassandraBlockStreamReader.java --- @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.streaming; + +import java.io.File; +import java.io.IOException; +import java.util.Set; + +import com.google.common.base.Throwables; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.Directories; +import org.apache.cassandra.db.SerializationHeader; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.SSTableMultiWriter; +import org.apache.cassandra.io.sstable.format.SSTableFormat; +import org.apache.cassandra.io.sstable.format.Version; +import org.apache.cassandra.io.sstable.format.big.BigTableBlockWriter; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.streaming.ProgressInfo; +import org.apache.cassandra.streaming.StreamReceiver; +import org.apache.cassandra.streaming.StreamSession; +import org.apache.cassandra.streaming.messages.StreamMessageHeader; +import org.apache.cassandra.utils.FBUtilities; + +/** + * CassandraBlockStreamReader reads SSTable off the wire and writes it to disk. + */ +public class CassandraBlockStreamReader implements IStreamReader +{ +private static final Logger logger = LoggerFactory.getLogger(CassandraBlockStreamReader.class); +private final TableId tableId; +private final StreamSession session; +private final int sstableLevel; +private final SerializationHeader.Component header; +private final int fileSeqNum; +private final ComponentManifest manifest; +private final SSTableFormat.Type format; +private final Version version; +private final DecoratedKey firstKey; + +public CassandraBlockStreamReader(StreamMessageHeader header, CassandraStreamHeader streamHeader, StreamSession session) +{ +if (session.getPendingRepair() != null) +{ +// we should only ever be streaming pending repair +// sstables if the session has a pending repair id +if (!session.getPendingRepair().equals(header.pendingRepair)) +throw new IllegalStateException(String.format("Stream Session & SSTable ({}) pendingRepair UUID mismatch.", + header.tableId)); +} +this.session = session; +this.tableId = header.tableId; +this.manifest = streamHeader.componentManifest; +this.sstableLevel = streamHeader.sstableLevel; +this.header = streamHeader.header; +this.format = streamHeader.format; +this.fileSeqNum = header.sequenceNumber; +this.version = streamHeader.version; +this.firstKey = streamHeader.firstKey; +} + +/** + * @param inputPlus where this reads data from + * @return SSTable transferred + * @throws IOException if reading the remote sstable fails. Will throw an RTE if local write fails. + */ +@SuppressWarnings("resource") // input needs to remain open, streams on top of it can't be closed +@Override +public SSTableMultiWriter read(DataInputPlus inputPlus)
[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556329#comment-16556329 ] ASF GitHub Bot commented on CASSANDRA-14556: Github user iamaleksey commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r205275733 --- Diff: src/java/org/apache/cassandra/db/compaction/Verifier.java --- @@ -361,12 +361,26 @@ public RangeOwnHelper(List> normalizedRanges) * @throws RuntimeException if the key is not contained */ public void check(DecoratedKey key) +{ +if (!checkBoolean(key)) --- End diff -- Please don't avoid it - unless it's in public API. Change as many layers as necessary, in general, until the new code fits into the big picture. We allow and welcome it here. > Optimize streaming path in Cassandra > > > Key: CASSANDRA-14556 > URL: https://issues.apache.org/jira/browse/CASSANDRA-14556 > Project: Cassandra > Issue Type: Improvement > Components: Streaming and Messaging >Reporter: Dinesh Joshi >Assignee: Dinesh Joshi >Priority: Major > Labels: Performance > Fix For: 4.x > > > During streaming, Cassandra reifies the sstables into objects. This creates > unnecessary garbage and slows down the whole streaming process as some > sstables can be transferred as a whole file rather than individual > partitions. The objective of the ticket is to detect when a whole sstable can > be transferred and skip the object reification. We can also use a zero-copy > path to avoid bringing data into user-space on both sending and receiving > side. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16556326#comment-16556326 ] ASF GitHub Bot commented on CASSANDRA-14556: Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r205275130 --- Diff: src/java/org/apache/cassandra/db/compaction/Verifier.java --- @@ -361,12 +361,26 @@ public RangeOwnHelper(List> normalizedRanges) * @throws RuntimeException if the key is not contained */ public void check(DecoratedKey key) +{ +if (!checkBoolean(key)) --- End diff -- I am cherry picking this change. The only concern I have is we're now changing the signature and behavior of a pre-existing method. I generally try to avoid such changes to avoid confusing other authors. > Optimize streaming path in Cassandra > > > Key: CASSANDRA-14556 > URL: https://issues.apache.org/jira/browse/CASSANDRA-14556 > Project: Cassandra > Issue Type: Improvement > Components: Streaming and Messaging >Reporter: Dinesh Joshi >Assignee: Dinesh Joshi >Priority: Major > Labels: Performance > Fix For: 4.x > > > During streaming, Cassandra reifies the sstables into objects. This creates > unnecessary garbage and slows down the whole streaming process as some > sstables can be transferred as a whole file rather than individual > partitions. The objective of the ticket is to detect when a whole sstable can > be transferred and skip the object reification. We can also use a zero-copy > path to avoid bringing data into user-space on both sending and receiving > side. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16555913#comment-16555913 ] ASF GitHub Bot commented on CASSANDRA-14556: Github user iamaleksey commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r205174576 --- Diff: src/java/org/apache/cassandra/db/streaming/CassandraBlockStreamWriter.java --- @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.streaming; + +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.channels.FileChannel; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.util.DataOutputStreamPlus; +import org.apache.cassandra.net.async.ByteBufDataOutputStreamPlus; +import org.apache.cassandra.streaming.ProgressInfo; +import org.apache.cassandra.streaming.StreamManager; +import org.apache.cassandra.streaming.StreamSession; +import org.apache.cassandra.utils.FBUtilities; + +import static org.apache.cassandra.streaming.StreamManager.StreamRateLimiter; + +/** + * CassandraBlockStreamWriter streams the entire SSTable to given channel. + */ +public class CassandraBlockStreamWriter implements IStreamWriter +{ +private static final Logger logger = LoggerFactory.getLogger(CassandraBlockStreamWriter.class); + +private final SSTableReader sstable; +private final ComponentManifest manifest; +private final StreamSession session; +private final StreamRateLimiter limiter; + +public CassandraBlockStreamWriter(SSTableReader sstable, StreamSession session, ComponentManifest manifest) +{ +this.session = session; +this.sstable = sstable; +this.manifest = manifest; +this.limiter = StreamManager.getRateLimiter(session.peer); +} + +/** + * Stream the entire file to given channel. + * + * + * @param output where this writes data to + * @throws IOException on any I/O error + */ +@Override +public void write(DataOutputStreamPlus output) throws IOException +{ +long totalSize = manifest.getTotalSize(); +logger.debug("[Stream #{}] Start streaming sstable {} to {}, repairedAt = {}, totalSize = {}", session.planId(), + sstable.getFilename(), session.peer, sstable.getSSTableMetadata().repairedAt, totalSize); + +long progress = 0L; +ByteBufDataOutputStreamPlus byteBufDataOutputStreamPlus = (ByteBufDataOutputStreamPlus) output; + +for (Component component : manifest.getComponents()) +{ +@SuppressWarnings("resource") // this is closed after the file is transferred by ByteBufDataOutputStreamPlus +FileChannel in = new RandomAccessFile(sstable.descriptor.filenameFor(component), "r").getChannel(); + +// Total Length to transmit for this file +long length = in.size(); + +// tracks write progress +long bytesRead = 0; +logger.debug("[Stream #{}] Block streaming {}.{} gen {} component {} size {}", session.planId(), +sstable.getKeyspaceName(), sstable.getColumnFamilyName(), sstable.descriptor.generation, component, length); + +bytesRead += byteBufDataOutputStreamPlus.writeToChannel(in, limiter); +progress += bytesRead; + +session.progress(sstable.descriptor.filenameFor(component), ProgressInfo.Direction.OUT, bytesRead, + length); + +logger.debug("[Stream #{}] Finished block streaming {}.{} gen
[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16555864#comment-16555864 ] ASF GitHub Bot commented on CASSANDRA-14556: Github user iamaleksey commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r205162501 --- Diff: src/java/org/apache/cassandra/db/streaming/ComponentManifest.java --- @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.streaming; + +import java.io.IOException; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; + +public class ComponentManifest +{ +private final LinkedHashMap manifest; +private final Set components = new LinkedHashSet<>(Component.Type.values().length); --- End diff -- `.values()` allocates an array on each call, so any benefit we get from presizing the hashset is probably lost. Nor does it really matter here (: > Optimize streaming path in Cassandra > > > Key: CASSANDRA-14556 > URL: https://issues.apache.org/jira/browse/CASSANDRA-14556 > Project: Cassandra > Issue Type: Improvement > Components: Streaming and Messaging >Reporter: Dinesh Joshi >Assignee: Dinesh Joshi >Priority: Major > Labels: Performance > Fix For: 4.x > > > During streaming, Cassandra reifies the sstables into objects. This creates > unnecessary garbage and slows down the whole streaming process as some > sstables can be transferred as a whole file rather than individual > partitions. The objective of the ticket is to detect when a whole sstable can > be transferred and skip the object reification. We can also use a zero-copy > path to avoid bringing data into user-space on both sending and receiving > side. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16555705#comment-16555705 ] ASF GitHub Bot commented on CASSANDRA-14556: Github user iamaleksey commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r205114146 --- Diff: src/java/org/apache/cassandra/io/sstable/format/big/BigTableBlockWriter.java --- @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io.sstable.format.big; + +import java.io.File; +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.io.FSWriteError; +import org.apache.cassandra.io.compress.BufferType; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.SSTable; +import org.apache.cassandra.io.sstable.SSTableMultiWriter; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.SequentialWriter; +import org.apache.cassandra.io.util.SequentialWriterOption; +import org.apache.cassandra.net.async.RebufferingByteBufDataInputPlus; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.schema.TableMetadataRef; + +public class BigTableBlockWriter extends SSTable implements SSTableMultiWriter +{ +private final TableMetadataRef metadata; +private final LifecycleTransaction txn; +private volatile SSTableReader finalReader; +private final Map componentWriters; + +private final Logger logger = LoggerFactory.getLogger(BigTableBlockWriter.class); + +private final SequentialWriterOption writerOption = SequentialWriterOption.newBuilder() + .trickleFsync(false) + .bufferSize(2 * 1024 * 1024) + .bufferType(BufferType.OFF_HEAP) + .build(); +public static final ImmutableSet supportedComponents = ImmutableSet.of(Component.DATA, Component.PRIMARY_INDEX, Component.STATS, + Component.COMPRESSION_INFO, Component.FILTER, Component.SUMMARY, + Component.DIGEST, Component.CRC); + +public BigTableBlockWriter(Descriptor descriptor, + TableMetadataRef metadata, + LifecycleTransaction txn, + final Set components) +{ +super(descriptor, ImmutableSet.copyOf(components), metadata, + DatabaseDescriptor.getDiskOptimizationStrategy()); +txn.trackNew(this); +this.metadata = metadata; +this.txn = txn; +this.componentWriters = new HashMap<>(components.size()); + +assert supportedComponents.containsAll(components) : String.format("Unsupported streaming component detected %s", + new
[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16555692#comment-16555692 ] ASF GitHub Bot commented on CASSANDRA-14556: Github user iamaleksey commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r205108131 --- Diff: src/java/org/apache/cassandra/io/sstable/format/big/BigTableBlockWriter.java --- @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io.sstable.format.big; + +import java.io.File; +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.io.FSWriteError; +import org.apache.cassandra.io.compress.BufferType; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.SSTable; +import org.apache.cassandra.io.sstable.SSTableMultiWriter; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.SequentialWriter; +import org.apache.cassandra.io.util.SequentialWriterOption; +import org.apache.cassandra.net.async.RebufferingByteBufDataInputPlus; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.schema.TableMetadataRef; + +public class BigTableBlockWriter extends SSTable implements SSTableMultiWriter +{ +private final TableMetadataRef metadata; +private final LifecycleTransaction txn; +private volatile SSTableReader finalReader; +private final Map componentWriters; + +private final Logger logger = LoggerFactory.getLogger(BigTableBlockWriter.class); + +private final SequentialWriterOption writerOption = SequentialWriterOption.newBuilder() + .trickleFsync(false) + .bufferSize(2 * 1024 * 1024) + .bufferType(BufferType.OFF_HEAP) + .build(); +public static final ImmutableSet supportedComponents = ImmutableSet.of(Component.DATA, Component.PRIMARY_INDEX, Component.STATS, + Component.COMPRESSION_INFO, Component.FILTER, Component.SUMMARY, + Component.DIGEST, Component.CRC); + +public BigTableBlockWriter(Descriptor descriptor, + TableMetadataRef metadata, + LifecycleTransaction txn, + final Set components) +{ +super(descriptor, ImmutableSet.copyOf(components), metadata, + DatabaseDescriptor.getDiskOptimizationStrategy()); +txn.trackNew(this); +this.metadata = metadata; +this.txn = txn; +this.componentWriters = new HashMap<>(components.size()); + +assert supportedComponents.containsAll(components) : String.format("Unsupported streaming component detected %s", + new
[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16555702#comment-16555702 ] ASF GitHub Bot commented on CASSANDRA-14556: Github user iamaleksey commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r20523 --- Diff: src/java/org/apache/cassandra/db/streaming/CassandraBlockStreamReader.java --- @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.streaming; + +import java.io.File; +import java.io.IOException; +import java.util.Set; + +import com.google.common.base.Throwables; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.Directories; +import org.apache.cassandra.db.SerializationHeader; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.SSTableMultiWriter; +import org.apache.cassandra.io.sstable.format.SSTableFormat; +import org.apache.cassandra.io.sstable.format.Version; +import org.apache.cassandra.io.sstable.format.big.BigTableBlockWriter; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.streaming.ProgressInfo; +import org.apache.cassandra.streaming.StreamReceiver; +import org.apache.cassandra.streaming.StreamSession; +import org.apache.cassandra.streaming.messages.StreamMessageHeader; +import org.apache.cassandra.utils.FBUtilities; + +/** + * CassandraBlockStreamReader reads SSTable off the wire and writes it to disk. + */ +public class CassandraBlockStreamReader implements IStreamReader +{ +private static final Logger logger = LoggerFactory.getLogger(CassandraBlockStreamReader.class); +private final TableId tableId; +private final StreamSession session; +private final int sstableLevel; +private final SerializationHeader.Component header; +private final int fileSeqNum; +private final ComponentManifest manifest; +private final SSTableFormat.Type format; +private final Version version; +private final DecoratedKey firstKey; + +public CassandraBlockStreamReader(StreamMessageHeader header, CassandraStreamHeader streamHeader, StreamSession session) +{ +if (session.getPendingRepair() != null) +{ +// we should only ever be streaming pending repair +// sstables if the session has a pending repair id +if (!session.getPendingRepair().equals(header.pendingRepair)) +throw new IllegalStateException(String.format("Stream Session & SSTable ({}) pendingRepair UUID mismatch.", + header.tableId)); +} +this.session = session; +this.tableId = header.tableId; +this.manifest = streamHeader.componentManifest; +this.sstableLevel = streamHeader.sstableLevel; +this.header = streamHeader.header; +this.format = streamHeader.format; +this.fileSeqNum = header.sequenceNumber; +this.version = streamHeader.version; +this.firstKey = streamHeader.firstKey; +} + +/** + * @param inputPlus where this reads data from + * @return SSTable transferred + * @throws IOException if reading the remote sstable fails. Will throw an RTE if local write fails. + */ +@SuppressWarnings("resource") // input needs to remain open, streams on top of it can't be closed +@Override +public SSTableMultiWriter read(DataInputPlus inputPlus)
[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16555916#comment-16555916 ] ASF GitHub Bot commented on CASSANDRA-14556: Github user iamaleksey commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r205174727 --- Diff: src/java/org/apache/cassandra/db/streaming/CassandraBlockStreamWriter.java --- @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.streaming; + +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.channels.FileChannel; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.util.DataOutputStreamPlus; +import org.apache.cassandra.net.async.ByteBufDataOutputStreamPlus; +import org.apache.cassandra.streaming.ProgressInfo; +import org.apache.cassandra.streaming.StreamManager; +import org.apache.cassandra.streaming.StreamSession; +import org.apache.cassandra.utils.FBUtilities; + +import static org.apache.cassandra.streaming.StreamManager.StreamRateLimiter; + +/** + * CassandraBlockStreamWriter streams the entire SSTable to given channel. + */ +public class CassandraBlockStreamWriter implements IStreamWriter +{ +private static final Logger logger = LoggerFactory.getLogger(CassandraBlockStreamWriter.class); + +private final SSTableReader sstable; +private final ComponentManifest manifest; +private final StreamSession session; +private final StreamRateLimiter limiter; + +public CassandraBlockStreamWriter(SSTableReader sstable, StreamSession session, ComponentManifest manifest) +{ +this.session = session; +this.sstable = sstable; +this.manifest = manifest; +this.limiter = StreamManager.getRateLimiter(session.peer); +} + +/** + * Stream the entire file to given channel. + * + * + * @param output where this writes data to + * @throws IOException on any I/O error + */ +@Override +public void write(DataOutputStreamPlus output) throws IOException +{ +long totalSize = manifest.getTotalSize(); +logger.debug("[Stream #{}] Start streaming sstable {} to {}, repairedAt = {}, totalSize = {}", session.planId(), + sstable.getFilename(), session.peer, sstable.getSSTableMetadata().repairedAt, totalSize); --- End diff -- Missed `prettyPrintMemory()` on `totalSize`? > Optimize streaming path in Cassandra > > > Key: CASSANDRA-14556 > URL: https://issues.apache.org/jira/browse/CASSANDRA-14556 > Project: Cassandra > Issue Type: Improvement > Components: Streaming and Messaging >Reporter: Dinesh Joshi >Assignee: Dinesh Joshi >Priority: Major > Labels: Performance > Fix For: 4.x > > > During streaming, Cassandra reifies the sstables into objects. This creates > unnecessary garbage and slows down the whole streaming process as some > sstables can be transferred as a whole file rather than individual > partitions. The objective of the ticket is to detect when a whole sstable can > be transferred and skip the object reification. We can also use a zero-copy > path to avoid bringing data into user-space on both sending and receiving > side. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16555914#comment-16555914 ] ASF GitHub Bot commented on CASSANDRA-14556: Github user iamaleksey commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r205175204 --- Diff: src/java/org/apache/cassandra/db/streaming/CassandraBlockStreamWriter.java --- @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.streaming; + +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.channels.FileChannel; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.util.DataOutputStreamPlus; +import org.apache.cassandra.net.async.ByteBufDataOutputStreamPlus; +import org.apache.cassandra.streaming.ProgressInfo; +import org.apache.cassandra.streaming.StreamManager; +import org.apache.cassandra.streaming.StreamSession; +import org.apache.cassandra.utils.FBUtilities; + +import static org.apache.cassandra.streaming.StreamManager.StreamRateLimiter; + +/** + * CassandraBlockStreamWriter streams the entire SSTable to given channel. + */ +public class CassandraBlockStreamWriter implements IStreamWriter +{ +private static final Logger logger = LoggerFactory.getLogger(CassandraBlockStreamWriter.class); + +private final SSTableReader sstable; +private final ComponentManifest manifest; +private final StreamSession session; +private final StreamRateLimiter limiter; + +public CassandraBlockStreamWriter(SSTableReader sstable, StreamSession session, ComponentManifest manifest) +{ +this.session = session; +this.sstable = sstable; +this.manifest = manifest; +this.limiter = StreamManager.getRateLimiter(session.peer); +} + +/** + * Stream the entire file to given channel. + * + * + * @param output where this writes data to + * @throws IOException on any I/O error + */ +@Override +public void write(DataOutputStreamPlus output) throws IOException +{ +long totalSize = manifest.getTotalSize(); +logger.debug("[Stream #{}] Start streaming sstable {} to {}, repairedAt = {}, totalSize = {}", session.planId(), + sstable.getFilename(), session.peer, sstable.getSSTableMetadata().repairedAt, totalSize); + +long progress = 0L; +ByteBufDataOutputStreamPlus byteBufDataOutputStreamPlus = (ByteBufDataOutputStreamPlus) output; + +for (Component component : manifest.getComponents()) +{ +@SuppressWarnings("resource") // this is closed after the file is transferred by ByteBufDataOutputStreamPlus +FileChannel in = new RandomAccessFile(sstable.descriptor.filenameFor(component), "r").getChannel(); + +// Total Length to transmit for this file +long length = in.size(); + +// tracks write progress +long bytesRead = 0; +logger.debug("[Stream #{}] Block streaming {}.{} gen {} component {} size {}", session.planId(), +sstable.getKeyspaceName(), sstable.getColumnFamilyName(), sstable.descriptor.generation, component, length); + +bytesRead += byteBufDataOutputStreamPlus.writeToChannel(in, limiter); --- End diff -- So this should, essentially, be same as `length`, correct? > Optimize streaming path in Cassandra > > > Key: CASSANDRA-14556 > URL: https://issues.apache.org/jira/browse/CASSANDRA-14556 >
[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16555915#comment-16555915 ] ASF GitHub Bot commented on CASSANDRA-14556: Github user iamaleksey commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r205175074 --- Diff: src/java/org/apache/cassandra/db/streaming/CassandraBlockStreamWriter.java --- @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.streaming; + +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.channels.FileChannel; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.util.DataOutputStreamPlus; +import org.apache.cassandra.net.async.ByteBufDataOutputStreamPlus; +import org.apache.cassandra.streaming.ProgressInfo; +import org.apache.cassandra.streaming.StreamManager; +import org.apache.cassandra.streaming.StreamSession; +import org.apache.cassandra.utils.FBUtilities; + +import static org.apache.cassandra.streaming.StreamManager.StreamRateLimiter; + +/** + * CassandraBlockStreamWriter streams the entire SSTable to given channel. + */ +public class CassandraBlockStreamWriter implements IStreamWriter +{ +private static final Logger logger = LoggerFactory.getLogger(CassandraBlockStreamWriter.class); + +private final SSTableReader sstable; +private final ComponentManifest manifest; +private final StreamSession session; +private final StreamRateLimiter limiter; + +public CassandraBlockStreamWriter(SSTableReader sstable, StreamSession session, ComponentManifest manifest) +{ +this.session = session; +this.sstable = sstable; +this.manifest = manifest; +this.limiter = StreamManager.getRateLimiter(session.peer); +} + +/** + * Stream the entire file to given channel. + * + * + * @param output where this writes data to + * @throws IOException on any I/O error + */ +@Override +public void write(DataOutputStreamPlus output) throws IOException +{ +long totalSize = manifest.getTotalSize(); +logger.debug("[Stream #{}] Start streaming sstable {} to {}, repairedAt = {}, totalSize = {}", session.planId(), + sstable.getFilename(), session.peer, sstable.getSSTableMetadata().repairedAt, totalSize); + +long progress = 0L; +ByteBufDataOutputStreamPlus byteBufDataOutputStreamPlus = (ByteBufDataOutputStreamPlus) output; + +for (Component component : manifest.getComponents()) +{ +@SuppressWarnings("resource") // this is closed after the file is transferred by ByteBufDataOutputStreamPlus +FileChannel in = new RandomAccessFile(sstable.descriptor.filenameFor(component), "r").getChannel(); + +// Total Length to transmit for this file +long length = in.size(); + +// tracks write progress +long bytesRead = 0; --- End diff -- Is this a variable left from some previous iteration of the class? Because we only update it once.. perhaps should be called `bytesWritten` (I assume it was copy-pasted from the read impl?) > Optimize streaming path in Cassandra > > > Key: CASSANDRA-14556 > URL: https://issues.apache.org/jira/browse/CASSANDRA-14556 > Project: Cassandra > Issue Type: Improvement > Components: Streaming and Messaging >Reporter: Dinesh Joshi >Assignee: Dinesh Joshi >Priority: Major >
[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16555917#comment-16555917 ] ASF GitHub Bot commented on CASSANDRA-14556: Github user iamaleksey commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r205174453 --- Diff: src/java/org/apache/cassandra/db/streaming/CassandraBlockStreamWriter.java --- @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.streaming; + +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.channels.FileChannel; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.util.DataOutputStreamPlus; +import org.apache.cassandra.net.async.ByteBufDataOutputStreamPlus; +import org.apache.cassandra.streaming.ProgressInfo; +import org.apache.cassandra.streaming.StreamManager; +import org.apache.cassandra.streaming.StreamSession; +import org.apache.cassandra.utils.FBUtilities; + +import static org.apache.cassandra.streaming.StreamManager.StreamRateLimiter; + +/** + * CassandraBlockStreamWriter streams the entire SSTable to given channel. + */ +public class CassandraBlockStreamWriter implements IStreamWriter +{ +private static final Logger logger = LoggerFactory.getLogger(CassandraBlockStreamWriter.class); + +private final SSTableReader sstable; +private final ComponentManifest manifest; +private final StreamSession session; +private final StreamRateLimiter limiter; + +public CassandraBlockStreamWriter(SSTableReader sstable, StreamSession session, ComponentManifest manifest) +{ +this.session = session; +this.sstable = sstable; +this.manifest = manifest; +this.limiter = StreamManager.getRateLimiter(session.peer); +} + +/** + * Stream the entire file to given channel. + * + * + * @param output where this writes data to + * @throws IOException on any I/O error + */ +@Override +public void write(DataOutputStreamPlus output) throws IOException +{ +long totalSize = manifest.getTotalSize(); +logger.debug("[Stream #{}] Start streaming sstable {} to {}, repairedAt = {}, totalSize = {}", session.planId(), + sstable.getFilename(), session.peer, sstable.getSSTableMetadata().repairedAt, totalSize); + +long progress = 0L; +ByteBufDataOutputStreamPlus byteBufDataOutputStreamPlus = (ByteBufDataOutputStreamPlus) output; + +for (Component component : manifest.getComponents()) +{ +@SuppressWarnings("resource") // this is closed after the file is transferred by ByteBufDataOutputStreamPlus +FileChannel in = new RandomAccessFile(sstable.descriptor.filenameFor(component), "r").getChannel(); + +// Total Length to transmit for this file +long length = in.size(); + +// tracks write progress +long bytesRead = 0; +logger.debug("[Stream #{}] Block streaming {}.{} gen {} component {} size {}", session.planId(), +sstable.getKeyspaceName(), sstable.getColumnFamilyName(), sstable.descriptor.generation, component, length); + +bytesRead += byteBufDataOutputStreamPlus.writeToChannel(in, limiter); +progress += bytesRead; + +session.progress(sstable.descriptor.filenameFor(component), ProgressInfo.Direction.OUT, bytesRead, + length); + +logger.debug("[Stream #{}] Finished block streaming {}.{} gen
[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16555863#comment-16555863 ] ASF GitHub Bot commented on CASSANDRA-14556: Github user iamaleksey commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r205163354 --- Diff: src/java/org/apache/cassandra/db/streaming/ComponentManifest.java --- @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.streaming; + +import java.io.IOException; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; + +public class ComponentManifest +{ +private final LinkedHashMap manifest; +private final Set components = new LinkedHashSet<>(Component.Type.values().length); +private final long totalSize; + +public ComponentManifest(Map componentManifest) +{ +this.manifest = new LinkedHashMap<>(componentManifest); + +long size = 0; +for (Map.Entry entry : this.manifest.entrySet()) +{ +size += entry.getValue(); +this.components.add(Component.parse(entry.getKey().repr)); +} + +this.totalSize = size; +} + +public Long getSizeForType(Component.Type type) +{ +return manifest.get(type); +} + +public long getTotalSize() +{ +return totalSize; +} + +public Set getComponents() +{ +return Collections.unmodifiableSet(components); +} + +public boolean equals(Object o) +{ +if (this == o) return true; +if (o == null || getClass() != o.getClass()) return false; +ComponentManifest that = (ComponentManifest) o; +return totalSize == that.totalSize && + Objects.equals(manifest, that.manifest); +} + +public int hashCode() +{ + +return Objects.hash(manifest, totalSize); +} + +public static final IVersionedSerializer serializer = new IVersionedSerializer() +{ +public void serialize(ComponentManifest manifest, DataOutputPlus out, int version) throws IOException +{ +out.writeInt(manifest.manifest.size()); +for (Map.Entry entry : manifest.manifest.entrySet()) +serialize(entry.getKey(), entry.getValue(), out); +} + +public ComponentManifest deserialize(DataInputPlus in, int version) throws IOException +{ +LinkedHashMap components = new LinkedHashMap<>(Component.Type.values().length); + +int size = in.readInt(); +assert size >= 0 : "Invalid number of components"; + +for (int i = 0; i < size; i++) +{ +Component.Type type = Component.Type.fromRepresentation(in.readByte()); +long length = in.readLong(); +components.put(type, length); +} + +return new ComponentManifest(components); +} + +public long serializedSize(ComponentManifest manifest, int version) +{ +long size = 0; +size += TypeSizes.sizeof(manifest.manifest.size()); +for (Map.Entry entry : manifest.manifest.entrySet()) +{ +size += TypeSizes.sizeof(entry.getKey().id); +size +=
[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16555862#comment-16555862 ] ASF GitHub Bot commented on CASSANDRA-14556: Github user iamaleksey commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r205161880 --- Diff: src/java/org/apache/cassandra/db/streaming/ComponentManifest.java --- @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.streaming; + +import java.io.IOException; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; + +public class ComponentManifest +{ +private final LinkedHashMap manifest; --- End diff -- Instead of having a map of type to size with a separate set of components, it'd be nicer to just have a map of `Component` -> `Long`, I think. One is definitely unnecessary. Nor do we save a lot be caching the total size. > Optimize streaming path in Cassandra > > > Key: CASSANDRA-14556 > URL: https://issues.apache.org/jira/browse/CASSANDRA-14556 > Project: Cassandra > Issue Type: Improvement > Components: Streaming and Messaging >Reporter: Dinesh Joshi >Assignee: Dinesh Joshi >Priority: Major > Labels: Performance > Fix For: 4.x > > > During streaming, Cassandra reifies the sstables into objects. This creates > unnecessary garbage and slows down the whole streaming process as some > sstables can be transferred as a whole file rather than individual > partitions. The objective of the ticket is to detect when a whole sstable can > be transferred and skip the object reification. We can also use a zero-copy > path to avoid bringing data into user-space on both sending and receiving > side. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16555703#comment-16555703 ] ASF GitHub Bot commented on CASSANDRA-14556: Github user iamaleksey commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r205113992 --- Diff: src/java/org/apache/cassandra/io/sstable/format/big/BigTableBlockWriter.java --- @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io.sstable.format.big; + +import java.io.File; +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.io.FSWriteError; +import org.apache.cassandra.io.compress.BufferType; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.SSTable; +import org.apache.cassandra.io.sstable.SSTableMultiWriter; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.SequentialWriter; +import org.apache.cassandra.io.util.SequentialWriterOption; +import org.apache.cassandra.net.async.RebufferingByteBufDataInputPlus; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.schema.TableMetadataRef; + +public class BigTableBlockWriter extends SSTable implements SSTableMultiWriter +{ +private final TableMetadataRef metadata; +private final LifecycleTransaction txn; +private volatile SSTableReader finalReader; +private final Map componentWriters; + +private final Logger logger = LoggerFactory.getLogger(BigTableBlockWriter.class); + +private final SequentialWriterOption writerOption = SequentialWriterOption.newBuilder() + .trickleFsync(false) + .bufferSize(2 * 1024 * 1024) + .bufferType(BufferType.OFF_HEAP) + .build(); +public static final ImmutableSet supportedComponents = ImmutableSet.of(Component.DATA, Component.PRIMARY_INDEX, Component.STATS, + Component.COMPRESSION_INFO, Component.FILTER, Component.SUMMARY, + Component.DIGEST, Component.CRC); + +public BigTableBlockWriter(Descriptor descriptor, + TableMetadataRef metadata, + LifecycleTransaction txn, + final Set components) +{ +super(descriptor, ImmutableSet.copyOf(components), metadata, + DatabaseDescriptor.getDiskOptimizationStrategy()); +txn.trackNew(this); +this.metadata = metadata; +this.txn = txn; +this.componentWriters = new HashMap<>(components.size()); + +assert supportedComponents.containsAll(components) : String.format("Unsupported streaming component detected %s", + new
[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16555704#comment-16555704 ] ASF GitHub Bot commented on CASSANDRA-14556: Github user iamaleksey commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r205113636 --- Diff: src/java/org/apache/cassandra/io/sstable/format/big/BigTableBlockWriter.java --- @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io.sstable.format.big; + +import java.io.File; +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.io.FSWriteError; +import org.apache.cassandra.io.compress.BufferType; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.SSTable; +import org.apache.cassandra.io.sstable.SSTableMultiWriter; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.SequentialWriter; +import org.apache.cassandra.io.util.SequentialWriterOption; +import org.apache.cassandra.net.async.RebufferingByteBufDataInputPlus; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.schema.TableMetadataRef; + +public class BigTableBlockWriter extends SSTable implements SSTableMultiWriter +{ +private final TableMetadataRef metadata; +private final LifecycleTransaction txn; +private volatile SSTableReader finalReader; +private final Map componentWriters; + +private final Logger logger = LoggerFactory.getLogger(BigTableBlockWriter.class); + +private final SequentialWriterOption writerOption = SequentialWriterOption.newBuilder() --- End diff -- Presumably should be static too. > Optimize streaming path in Cassandra > > > Key: CASSANDRA-14556 > URL: https://issues.apache.org/jira/browse/CASSANDRA-14556 > Project: Cassandra > Issue Type: Improvement > Components: Streaming and Messaging >Reporter: Dinesh Joshi >Assignee: Dinesh Joshi >Priority: Major > Labels: Performance > Fix For: 4.x > > > During streaming, Cassandra reifies the sstables into objects. This creates > unnecessary garbage and slows down the whole streaming process as some > sstables can be transferred as a whole file rather than individual > partitions. The objective of the ticket is to detect when a whole sstable can > be transferred and skip the object reification. We can also use a zero-copy > path to avoid bringing data into user-space on both sending and receiving > side. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16555700#comment-16555700 ] ASF GitHub Bot commented on CASSANDRA-14556: Github user iamaleksey commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r205112906 --- Diff: src/java/org/apache/cassandra/io/sstable/format/big/BigTableBlockWriter.java --- @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io.sstable.format.big; + +import java.io.File; +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.io.FSWriteError; +import org.apache.cassandra.io.compress.BufferType; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.SSTable; +import org.apache.cassandra.io.sstable.SSTableMultiWriter; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.SequentialWriter; +import org.apache.cassandra.io.util.SequentialWriterOption; +import org.apache.cassandra.net.async.RebufferingByteBufDataInputPlus; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.schema.TableMetadataRef; + +public class BigTableBlockWriter extends SSTable implements SSTableMultiWriter +{ +private final TableMetadataRef metadata; +private final LifecycleTransaction txn; +private volatile SSTableReader finalReader; +private final Map componentWriters; + +private final Logger logger = LoggerFactory.getLogger(BigTableBlockWriter.class); + +private final SequentialWriterOption writerOption = SequentialWriterOption.newBuilder() + .trickleFsync(false) + .bufferSize(2 * 1024 * 1024) + .bufferType(BufferType.OFF_HEAP) + .build(); +public static final ImmutableSet supportedComponents = ImmutableSet.of(Component.DATA, Component.PRIMARY_INDEX, Component.STATS, + Component.COMPRESSION_INFO, Component.FILTER, Component.SUMMARY, + Component.DIGEST, Component.CRC); + +public BigTableBlockWriter(Descriptor descriptor, + TableMetadataRef metadata, + LifecycleTransaction txn, + final Set components) +{ +super(descriptor, ImmutableSet.copyOf(components), metadata, + DatabaseDescriptor.getDiskOptimizationStrategy()); +txn.trackNew(this); +this.metadata = metadata; +this.txn = txn; +this.componentWriters = new HashMap<>(components.size()); + +assert supportedComponents.containsAll(components) : String.format("Unsupported streaming component detected %s", + new
[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16555701#comment-16555701 ] ASF GitHub Bot commented on CASSANDRA-14556: Github user iamaleksey commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r205113409 --- Diff: src/java/org/apache/cassandra/io/sstable/format/big/BigTableBlockWriter.java --- @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io.sstable.format.big; + +import java.io.File; +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.io.FSWriteError; +import org.apache.cassandra.io.compress.BufferType; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.SSTable; +import org.apache.cassandra.io.sstable.SSTableMultiWriter; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.SequentialWriter; +import org.apache.cassandra.io.util.SequentialWriterOption; +import org.apache.cassandra.net.async.RebufferingByteBufDataInputPlus; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.schema.TableMetadataRef; + +public class BigTableBlockWriter extends SSTable implements SSTableMultiWriter +{ +private final TableMetadataRef metadata; +private final LifecycleTransaction txn; +private volatile SSTableReader finalReader; +private final Map componentWriters; + +private final Logger logger = LoggerFactory.getLogger(BigTableBlockWriter.class); + +private final SequentialWriterOption writerOption = SequentialWriterOption.newBuilder() + .trickleFsync(false) + .bufferSize(2 * 1024 * 1024) + .bufferType(BufferType.OFF_HEAP) + .build(); +public static final ImmutableSet supportedComponents = ImmutableSet.of(Component.DATA, Component.PRIMARY_INDEX, Component.STATS, + Component.COMPRESSION_INFO, Component.FILTER, Component.SUMMARY, + Component.DIGEST, Component.CRC); + +public BigTableBlockWriter(Descriptor descriptor, + TableMetadataRef metadata, + LifecycleTransaction txn, + final Set components) +{ +super(descriptor, ImmutableSet.copyOf(components), metadata, + DatabaseDescriptor.getDiskOptimizationStrategy()); +txn.trackNew(this); +this.metadata = metadata; +this.txn = txn; +this.componentWriters = new HashMap<>(components.size()); --- End diff -- Can be a `EnumMap` (: > Optimize streaming path in Cassandra > > > Key: CASSANDRA-14556 > URL: https://issues.apache.org/jira/browse/CASSANDRA-14556 >
[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16555699#comment-16555699 ] ASF GitHub Bot commented on CASSANDRA-14556: Github user iamaleksey commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r205110273 --- Diff: src/java/org/apache/cassandra/db/streaming/CassandraBlockStreamReader.java --- @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.streaming; + +import java.io.File; +import java.io.IOException; +import java.util.Set; + +import com.google.common.base.Throwables; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.Directories; +import org.apache.cassandra.db.SerializationHeader; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.SSTableMultiWriter; +import org.apache.cassandra.io.sstable.format.SSTableFormat; +import org.apache.cassandra.io.sstable.format.Version; +import org.apache.cassandra.io.sstable.format.big.BigTableBlockWriter; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.streaming.ProgressInfo; +import org.apache.cassandra.streaming.StreamReceiver; +import org.apache.cassandra.streaming.StreamSession; +import org.apache.cassandra.streaming.messages.StreamMessageHeader; +import org.apache.cassandra.utils.FBUtilities; + +/** + * CassandraBlockStreamReader reads SSTable off the wire and writes it to disk. + */ +public class CassandraBlockStreamReader implements IStreamReader +{ +private static final Logger logger = LoggerFactory.getLogger(CassandraBlockStreamReader.class); +private final TableId tableId; +private final StreamSession session; +private final int sstableLevel; +private final SerializationHeader.Component header; +private final int fileSeqNum; +private final ComponentManifest manifest; +private final SSTableFormat.Type format; +private final Version version; +private final DecoratedKey firstKey; + +public CassandraBlockStreamReader(StreamMessageHeader header, CassandraStreamHeader streamHeader, StreamSession session) +{ +if (session.getPendingRepair() != null) +{ +// we should only ever be streaming pending repair +// sstables if the session has a pending repair id +if (!session.getPendingRepair().equals(header.pendingRepair)) +throw new IllegalStateException(String.format("Stream Session & SSTable ({}) pendingRepair UUID mismatch.", --- End diff -- {} isn't a valid placeholder for `String.format()`. Maybe this used to be a logging call? > Optimize streaming path in Cassandra > > > Key: CASSANDRA-14556 > URL: https://issues.apache.org/jira/browse/CASSANDRA-14556 > Project: Cassandra > Issue Type: Improvement > Components: Streaming and Messaging >Reporter: Dinesh Joshi >Assignee: Dinesh Joshi >Priority: Major > Labels: Performance > Fix For: 4.x > > > During streaming, Cassandra reifies the sstables into objects. This creates > unnecessary garbage and slows down the whole streaming process as some > sstables can be transferred as a whole file rather than individual > partitions. The objective of the ticket is to detect when a whole sstable can > be transferred and skip the object reification. We can also use a zero-copy > path to
[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16555695#comment-16555695 ] ASF GitHub Bot commented on CASSANDRA-14556: Github user iamaleksey commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r205112542 --- Diff: src/java/org/apache/cassandra/io/sstable/format/big/BigTableBlockWriter.java --- @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io.sstable.format.big; + +import java.io.File; +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.io.FSWriteError; +import org.apache.cassandra.io.compress.BufferType; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.SSTable; +import org.apache.cassandra.io.sstable.SSTableMultiWriter; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.SequentialWriter; +import org.apache.cassandra.io.util.SequentialWriterOption; +import org.apache.cassandra.net.async.RebufferingByteBufDataInputPlus; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.schema.TableMetadataRef; + +public class BigTableBlockWriter extends SSTable implements SSTableMultiWriter +{ +private final TableMetadataRef metadata; +private final LifecycleTransaction txn; +private volatile SSTableReader finalReader; +private final Map componentWriters; + +private final Logger logger = LoggerFactory.getLogger(BigTableBlockWriter.class); + +private final SequentialWriterOption writerOption = SequentialWriterOption.newBuilder() + .trickleFsync(false) + .bufferSize(2 * 1024 * 1024) + .bufferType(BufferType.OFF_HEAP) + .build(); +public static final ImmutableSet supportedComponents = ImmutableSet.of(Component.DATA, Component.PRIMARY_INDEX, Component.STATS, + Component.COMPRESSION_INFO, Component.FILTER, Component.SUMMARY, + Component.DIGEST, Component.CRC); + +public BigTableBlockWriter(Descriptor descriptor, + TableMetadataRef metadata, + LifecycleTransaction txn, + final Set components) +{ +super(descriptor, ImmutableSet.copyOf(components), metadata, + DatabaseDescriptor.getDiskOptimizationStrategy()); +txn.trackNew(this); +this.metadata = metadata; +this.txn = txn; --- End diff -- The field is unused. > Optimize streaming path in Cassandra > > > Key: CASSANDRA-14556 > URL: https://issues.apache.org/jira/browse/CASSANDRA-14556 > Project: Cassandra > Issue Type: Improvement > Components:
[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16555698#comment-16555698 ] ASF GitHub Bot commented on CASSANDRA-14556: Github user iamaleksey commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r205113518 --- Diff: src/java/org/apache/cassandra/io/sstable/format/big/BigTableBlockWriter.java --- @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io.sstable.format.big; + +import java.io.File; +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.io.FSWriteError; +import org.apache.cassandra.io.compress.BufferType; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.SSTable; +import org.apache.cassandra.io.sstable.SSTableMultiWriter; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.SequentialWriter; +import org.apache.cassandra.io.util.SequentialWriterOption; +import org.apache.cassandra.net.async.RebufferingByteBufDataInputPlus; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.schema.TableMetadataRef; + +public class BigTableBlockWriter extends SSTable implements SSTableMultiWriter +{ +private final TableMetadataRef metadata; +private final LifecycleTransaction txn; +private volatile SSTableReader finalReader; +private final Map componentWriters; + +private final Logger logger = LoggerFactory.getLogger(BigTableBlockWriter.class); --- End diff -- Logger field should be static. > Optimize streaming path in Cassandra > > > Key: CASSANDRA-14556 > URL: https://issues.apache.org/jira/browse/CASSANDRA-14556 > Project: Cassandra > Issue Type: Improvement > Components: Streaming and Messaging >Reporter: Dinesh Joshi >Assignee: Dinesh Joshi >Priority: Major > Labels: Performance > Fix For: 4.x > > > During streaming, Cassandra reifies the sstables into objects. This creates > unnecessary garbage and slows down the whole streaming process as some > sstables can be transferred as a whole file rather than individual > partitions. The objective of the ticket is to detect when a whole sstable can > be transferred and skip the object reification. We can also use a zero-copy > path to avoid bringing data into user-space on both sending and receiving > side. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16555697#comment-16555697 ] ASF GitHub Bot commented on CASSANDRA-14556: Github user iamaleksey commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r205112000 --- Diff: src/java/org/apache/cassandra/db/streaming/CassandraBlockStreamReader.java --- @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.streaming; + +import java.io.File; +import java.io.IOException; +import java.util.Set; + +import com.google.common.base.Throwables; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.Directories; +import org.apache.cassandra.db.SerializationHeader; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.SSTableMultiWriter; +import org.apache.cassandra.io.sstable.format.SSTableFormat; +import org.apache.cassandra.io.sstable.format.Version; +import org.apache.cassandra.io.sstable.format.big.BigTableBlockWriter; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.streaming.ProgressInfo; +import org.apache.cassandra.streaming.StreamReceiver; +import org.apache.cassandra.streaming.StreamSession; +import org.apache.cassandra.streaming.messages.StreamMessageHeader; +import org.apache.cassandra.utils.FBUtilities; + +/** + * CassandraBlockStreamReader reads SSTable off the wire and writes it to disk. + */ +public class CassandraBlockStreamReader implements IStreamReader +{ +private static final Logger logger = LoggerFactory.getLogger(CassandraBlockStreamReader.class); +private final TableId tableId; +private final StreamSession session; +private final int sstableLevel; +private final SerializationHeader.Component header; +private final int fileSeqNum; +private final ComponentManifest manifest; +private final SSTableFormat.Type format; +private final Version version; +private final DecoratedKey firstKey; + +public CassandraBlockStreamReader(StreamMessageHeader header, CassandraStreamHeader streamHeader, StreamSession session) +{ +if (session.getPendingRepair() != null) +{ +// we should only ever be streaming pending repair +// sstables if the session has a pending repair id +if (!session.getPendingRepair().equals(header.pendingRepair)) +throw new IllegalStateException(String.format("Stream Session & SSTable ({}) pendingRepair UUID mismatch.", + header.tableId)); +} +this.session = session; +this.tableId = header.tableId; +this.manifest = streamHeader.componentManifest; +this.sstableLevel = streamHeader.sstableLevel; +this.header = streamHeader.header; +this.format = streamHeader.format; +this.fileSeqNum = header.sequenceNumber; +this.version = streamHeader.version; +this.firstKey = streamHeader.firstKey; +} + +/** + * @param inputPlus where this reads data from + * @return SSTable transferred + * @throws IOException if reading the remote sstable fails. Will throw an RTE if local write fails. + */ +@SuppressWarnings("resource") // input needs to remain open, streams on top of it can't be closed +@Override +public SSTableMultiWriter read(DataInputPlus inputPlus)
[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16555694#comment-16555694 ] ASF GitHub Bot commented on CASSANDRA-14556: Github user iamaleksey commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r205107583 --- Diff: src/java/org/apache/cassandra/db/streaming/CassandraBlockStreamReader.java --- @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.streaming; + +import java.io.File; +import java.io.IOException; +import java.util.Set; + +import com.google.common.base.Throwables; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.Directories; +import org.apache.cassandra.db.SerializationHeader; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.SSTableMultiWriter; +import org.apache.cassandra.io.sstable.format.SSTableFormat; +import org.apache.cassandra.io.sstable.format.Version; +import org.apache.cassandra.io.sstable.format.big.BigTableBlockWriter; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.streaming.ProgressInfo; +import org.apache.cassandra.streaming.StreamReceiver; +import org.apache.cassandra.streaming.StreamSession; +import org.apache.cassandra.streaming.messages.StreamMessageHeader; +import org.apache.cassandra.utils.FBUtilities; + +/** + * CassandraBlockStreamReader reads SSTable off the wire and writes it to disk. + */ +public class CassandraBlockStreamReader implements IStreamReader +{ +private static final Logger logger = LoggerFactory.getLogger(CassandraBlockStreamReader.class); +private final TableId tableId; +private final StreamSession session; +private final int sstableLevel; +private final SerializationHeader.Component header; +private final int fileSeqNum; +private final ComponentManifest manifest; +private final SSTableFormat.Type format; +private final Version version; +private final DecoratedKey firstKey; + +public CassandraBlockStreamReader(StreamMessageHeader header, CassandraStreamHeader streamHeader, StreamSession session) +{ +if (session.getPendingRepair() != null) +{ +// we should only ever be streaming pending repair +// sstables if the session has a pending repair id +if (!session.getPendingRepair().equals(header.pendingRepair)) +throw new IllegalStateException(String.format("Stream Session & SSTable ({}) pendingRepair UUID mismatch.", + header.tableId)); +} +this.session = session; +this.tableId = header.tableId; +this.manifest = streamHeader.componentManifest; +this.sstableLevel = streamHeader.sstableLevel; +this.header = streamHeader.header; +this.format = streamHeader.format; +this.fileSeqNum = header.sequenceNumber; +this.version = streamHeader.version; +this.firstKey = streamHeader.firstKey; +} + +/** + * @param inputPlus where this reads data from + * @return SSTable transferred + * @throws IOException if reading the remote sstable fails. Will throw an RTE if local write fails. + */ +@SuppressWarnings("resource") // input needs to remain open, streams on top of it can't be closed +@Override +public SSTableMultiWriter read(DataInputPlus inputPlus)
[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16555696#comment-16555696 ] ASF GitHub Bot commented on CASSANDRA-14556: Github user iamaleksey commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r205109588 --- Diff: src/java/org/apache/cassandra/db/compaction/Verifier.java --- @@ -361,12 +361,26 @@ public RangeOwnHelper(List> normalizedRanges) * @throws RuntimeException if the key is not contained */ public void check(DecoratedKey key) +{ +if (!checkBoolean(key)) --- End diff -- Yep, this is exactly what I had in mind, thanks. Not sure of the names, though. Would prefer the throwing `check()` to be renamed into `validate()`, and `checkBoolean()` to become `check()`. `validate()` is the name we often give to such methods (that throw on failure). > Optimize streaming path in Cassandra > > > Key: CASSANDRA-14556 > URL: https://issues.apache.org/jira/browse/CASSANDRA-14556 > Project: Cassandra > Issue Type: Improvement > Components: Streaming and Messaging >Reporter: Dinesh Joshi >Assignee: Dinesh Joshi >Priority: Major > Labels: Performance > Fix For: 4.x > > > During streaming, Cassandra reifies the sstables into objects. This creates > unnecessary garbage and slows down the whole streaming process as some > sstables can be transferred as a whole file rather than individual > partitions. The objective of the ticket is to detect when a whole sstable can > be transferred and skip the object reification. We can also use a zero-copy > path to avoid bringing data into user-space on both sending and receiving > side. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16555690#comment-16555690 ] ASF GitHub Bot commented on CASSANDRA-14556: Github user iamaleksey commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r205107157 --- Diff: src/java/org/apache/cassandra/db/streaming/CassandraBlockStreamReader.java --- @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.streaming; + +import java.io.File; +import java.io.IOException; +import java.util.Set; + +import com.google.common.base.Throwables; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.Directories; +import org.apache.cassandra.db.SerializationHeader; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.SSTableMultiWriter; +import org.apache.cassandra.io.sstable.format.SSTableFormat; +import org.apache.cassandra.io.sstable.format.Version; +import org.apache.cassandra.io.sstable.format.big.BigTableBlockWriter; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.streaming.ProgressInfo; +import org.apache.cassandra.streaming.StreamReceiver; +import org.apache.cassandra.streaming.StreamSession; +import org.apache.cassandra.streaming.messages.StreamMessageHeader; +import org.apache.cassandra.utils.FBUtilities; + +/** + * CassandraBlockStreamReader reads SSTable off the wire and writes it to disk. + */ +public class CassandraBlockStreamReader implements IStreamReader +{ +private static final Logger logger = LoggerFactory.getLogger(CassandraBlockStreamReader.class); +private final TableId tableId; +private final StreamSession session; +private final int sstableLevel; +private final SerializationHeader.Component header; +private final int fileSeqNum; +private final ComponentManifest manifest; +private final SSTableFormat.Type format; +private final Version version; +private final DecoratedKey firstKey; + +public CassandraBlockStreamReader(StreamMessageHeader header, CassandraStreamHeader streamHeader, StreamSession session) +{ +if (session.getPendingRepair() != null) +{ +// we should only ever be streaming pending repair +// sstables if the session has a pending repair id +if (!session.getPendingRepair().equals(header.pendingRepair)) +throw new IllegalStateException(String.format("Stream Session & SSTable ({}) pendingRepair UUID mismatch.", + header.tableId)); +} +this.session = session; +this.tableId = header.tableId; +this.manifest = streamHeader.componentManifest; +this.sstableLevel = streamHeader.sstableLevel; +this.header = streamHeader.header; +this.format = streamHeader.format; +this.fileSeqNum = header.sequenceNumber; +this.version = streamHeader.version; +this.firstKey = streamHeader.firstKey; +} + +/** + * @param inputPlus where this reads data from + * @return SSTable transferred + * @throws IOException if reading the remote sstable fails. Will throw an RTE if local write fails. + */ +@SuppressWarnings("resource") // input needs to remain open, streams on top of it can't be closed +@Override +public SSTableMultiWriter read(DataInputPlus inputPlus)
[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16555693#comment-16555693 ] ASF GitHub Bot commented on CASSANDRA-14556: Github user iamaleksey commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r205110785 --- Diff: src/java/org/apache/cassandra/db/streaming/CassandraBlockStreamReader.java --- @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.streaming; + +import java.io.File; +import java.io.IOException; +import java.util.Set; + +import com.google.common.base.Throwables; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.Directories; +import org.apache.cassandra.db.SerializationHeader; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.SSTableMultiWriter; +import org.apache.cassandra.io.sstable.format.SSTableFormat; +import org.apache.cassandra.io.sstable.format.Version; +import org.apache.cassandra.io.sstable.format.big.BigTableBlockWriter; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.streaming.ProgressInfo; +import org.apache.cassandra.streaming.StreamReceiver; +import org.apache.cassandra.streaming.StreamSession; +import org.apache.cassandra.streaming.messages.StreamMessageHeader; +import org.apache.cassandra.utils.FBUtilities; + +/** + * CassandraBlockStreamReader reads SSTable off the wire and writes it to disk. + */ +public class CassandraBlockStreamReader implements IStreamReader +{ +private static final Logger logger = LoggerFactory.getLogger(CassandraBlockStreamReader.class); +private final TableId tableId; +private final StreamSession session; +private final int sstableLevel; +private final SerializationHeader.Component header; --- End diff -- This is an unused field. We'd be better off storing the `CassandraStreamHeader` here, though, and getting rid of all the redundant extracted fields (`version`, `firstKey`, etc). > Optimize streaming path in Cassandra > > > Key: CASSANDRA-14556 > URL: https://issues.apache.org/jira/browse/CASSANDRA-14556 > Project: Cassandra > Issue Type: Improvement > Components: Streaming and Messaging >Reporter: Dinesh Joshi >Assignee: Dinesh Joshi >Priority: Major > Labels: Performance > Fix For: 4.x > > > During streaming, Cassandra reifies the sstables into objects. This creates > unnecessary garbage and slows down the whole streaming process as some > sstables can be transferred as a whole file rather than individual > partitions. The objective of the ticket is to detect when a whole sstable can > be transferred and skip the object reification. We can also use a zero-copy > path to avoid bringing data into user-space on both sending and receiving > side. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16555691#comment-16555691 ] ASF GitHub Bot commented on CASSANDRA-14556: Github user iamaleksey commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r205108073 --- Diff: src/java/org/apache/cassandra/io/sstable/format/big/BigTableBlockWriter.java --- @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io.sstable.format.big; + +import java.io.File; +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.io.FSWriteError; +import org.apache.cassandra.io.compress.BufferType; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.SSTable; +import org.apache.cassandra.io.sstable.SSTableMultiWriter; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.SequentialWriter; +import org.apache.cassandra.io.util.SequentialWriterOption; +import org.apache.cassandra.net.async.RebufferingByteBufDataInputPlus; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.schema.TableMetadataRef; + +public class BigTableBlockWriter extends SSTable implements SSTableMultiWriter +{ +private final TableMetadataRef metadata; +private final LifecycleTransaction txn; +private volatile SSTableReader finalReader; +private final Map componentWriters; + +private final Logger logger = LoggerFactory.getLogger(BigTableBlockWriter.class); + +private final SequentialWriterOption writerOption = SequentialWriterOption.newBuilder() + .trickleFsync(false) + .bufferSize(2 * 1024 * 1024) + .bufferType(BufferType.OFF_HEAP) + .build(); +public static final ImmutableSet supportedComponents = ImmutableSet.of(Component.DATA, Component.PRIMARY_INDEX, Component.STATS, + Component.COMPRESSION_INFO, Component.FILTER, Component.SUMMARY, + Component.DIGEST, Component.CRC); + +public BigTableBlockWriter(Descriptor descriptor, + TableMetadataRef metadata, + LifecycleTransaction txn, + final Set components) +{ +super(descriptor, ImmutableSet.copyOf(components), metadata, + DatabaseDescriptor.getDiskOptimizationStrategy()); +txn.trackNew(this); +this.metadata = metadata; +this.txn = txn; +this.componentWriters = new HashMap<>(components.size()); + +assert supportedComponents.containsAll(components) : String.format("Unsupported streaming component detected %s", + new
[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16555235#comment-16555235 ] ASF GitHub Bot commented on CASSANDRA-14556: Github user dineshjoshi commented on the issue: https://github.com/apache/cassandra/pull/239 @iamaleksey I *think* I've resolved all your comments. The last dtest was all green. I'm not sure about the latest push but I don't forsee any breakages. Please let me know if there are more changes required. > Optimize streaming path in Cassandra > > > Key: CASSANDRA-14556 > URL: https://issues.apache.org/jira/browse/CASSANDRA-14556 > Project: Cassandra > Issue Type: Improvement > Components: Streaming and Messaging >Reporter: Dinesh Joshi >Assignee: Dinesh Joshi >Priority: Major > Labels: Performance > Fix For: 4.x > > > During streaming, Cassandra reifies the sstables into objects. This creates > unnecessary garbage and slows down the whole streaming process as some > sstables can be transferred as a whole file rather than individual > partitions. The objective of the ticket is to detect when a whole sstable can > be transferred and skip the object reification. We can also use a zero-copy > path to avoid bringing data into user-space on both sending and receiving > side. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16555229#comment-16555229 ] ASF GitHub Bot commented on CASSANDRA-14556: Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r204998639 --- Diff: src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java --- @@ -62,7 +90,23 @@ public CassandraOutgoingFile(StreamOperation operation, Ref ref, sections, sstable.compression ? sstable.getCompressionMetadata() : null, keepSSTableLevel ? sstable.getSSTableLevel() : 0, - sstable.header.toComponent()); + sstable.header.toComponent(), manifest, shouldStreamFullSSTable(), +sstable.first, +sstable.metadata().id); +} + +@VisibleForTesting +public static ComponentManifest getComponentManifest(SSTableReader sstable) +{ +LinkedHashMap components = new LinkedHashMap<>(STREAM_COMPONENTS.size()); +for (Component component : STREAM_COMPONENTS) --- End diff -- @iamaleksey would it be worth having a list of required components here? Any one of the missing components will be a fatal? For example, when I reset the level on the receiving side, I expect stats to exist and is sent over. > Optimize streaming path in Cassandra > > > Key: CASSANDRA-14556 > URL: https://issues.apache.org/jira/browse/CASSANDRA-14556 > Project: Cassandra > Issue Type: Improvement > Components: Streaming and Messaging >Reporter: Dinesh Joshi >Assignee: Dinesh Joshi >Priority: Major > Labels: Performance > Fix For: 4.x > > > During streaming, Cassandra reifies the sstables into objects. This creates > unnecessary garbage and slows down the whole streaming process as some > sstables can be transferred as a whole file rather than individual > partitions. The objective of the ticket is to detect when a whole sstable can > be transferred and skip the object reification. We can also use a zero-copy > path to avoid bringing data into user-space on both sending and receiving > side. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16554978#comment-16554978 ] ASF GitHub Bot commented on CASSANDRA-14556: Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r204955316 --- Diff: src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java --- @@ -114,13 +155,51 @@ public void write(StreamSession session, DataOutputStreamPlus out, int version) CassandraStreamHeader.serializer.serialize(header, out, version); out.flush(); -CassandraStreamWriter writer = header.compressionInfo == null ? - new CassandraStreamWriter(sstable, header.sections, session) : - new CompressedCassandraStreamWriter(sstable, header.sections, - header.compressionInfo, session); +IStreamWriter writer; +if (shouldStreamFullSSTable()) +{ +writer = new CassandraBlockStreamWriter(sstable, session, components); +} +else +{ +writer = (header.compressionInfo == null) ? + new CassandraStreamWriter(sstable, header.sections, session) : + new CompressedCassandraStreamWriter(sstable, header.sections, + header.compressionInfo, session); +} writer.write(out); } +@VisibleForTesting +public boolean shouldStreamFullSSTable() +{ +return isFullSSTableTransfersEnabled && isFullyContained; +} + +@VisibleForTesting +public boolean fullyContainedIn(List> normalizedRanges, SSTableReader sstable) +{ +if (normalizedRanges == null) +return false; + +RangeOwnHelper rangeOwnHelper = new RangeOwnHelper(normalizedRanges); +try (KeyIterator iter = new KeyIterator(sstable.descriptor, sstable.metadata())) +{ +while (iter.hasNext()) +{ +DecoratedKey key = iter.next(); +try +{ +rangeOwnHelper.check(key); +} catch(RuntimeException e) --- End diff -- I will disable Zero Copy streaming for STCS and I've created [CASSANDRA-14586](https://issues.apache.org/jira/browse/CASSANDRA-14586) to further discuss fixes. > Optimize streaming path in Cassandra > > > Key: CASSANDRA-14556 > URL: https://issues.apache.org/jira/browse/CASSANDRA-14556 > Project: Cassandra > Issue Type: Improvement > Components: Streaming and Messaging >Reporter: Dinesh Joshi >Assignee: Dinesh Joshi >Priority: Major > Labels: Performance > Fix For: 4.x > > > During streaming, Cassandra reifies the sstables into objects. This creates > unnecessary garbage and slows down the whole streaming process as some > sstables can be transferred as a whole file rather than individual > partitions. The objective of the ticket is to detect when a whole sstable can > be transferred and skip the object reification. We can also use a zero-copy > path to avoid bringing data into user-space on both sending and receiving > side. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16554976#comment-16554976 ] ASF GitHub Bot commented on CASSANDRA-14556: Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r204955090 --- Diff: src/java/org/apache/cassandra/db/streaming/ComponentInfo.java --- @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.streaming; + +import java.io.IOException; + +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; + +public class ComponentInfo --- End diff -- I have refactored this to `ComponentManifest` > Optimize streaming path in Cassandra > > > Key: CASSANDRA-14556 > URL: https://issues.apache.org/jira/browse/CASSANDRA-14556 > Project: Cassandra > Issue Type: Improvement > Components: Streaming and Messaging >Reporter: Dinesh Joshi >Assignee: Dinesh Joshi >Priority: Major > Labels: Performance > Fix For: 4.x > > > During streaming, Cassandra reifies the sstables into objects. This creates > unnecessary garbage and slows down the whole streaming process as some > sstables can be transferred as a whole file rather than individual > partitions. The objective of the ticket is to detect when a whole sstable can > be transferred and skip the object reification. We can also use a zero-copy > path to avoid bringing data into user-space on both sending and receiving > side. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16554975#comment-16554975 ] ASF GitHub Bot commented on CASSANDRA-14556: Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r204955043 --- Diff: src/java/org/apache/cassandra/db/lifecycle/LogFile.java --- @@ -66,7 +66,7 @@ private final LogReplicaSet replicas = new LogReplicaSet(); // The transaction records, this set must be ORDER PRESERVING -private final LinkedHashSet records = new LinkedHashSet<>(); +private final Set records = Collections.synchronizedSet(new LinkedHashSet<>()); // TODO: Hack until we fix CASSANDRA-14554 --- End diff -- I should've worded the comment differently but it is a hack to avoid a `ConcurrentModificationException` ([CASSANDRA-14554](https://issues.apache.org/jira/browse/CASSANDRA-14554)). This currently exists in trunk and was not introduced as part of this PR hence the separate ticket. > Optimize streaming path in Cassandra > > > Key: CASSANDRA-14556 > URL: https://issues.apache.org/jira/browse/CASSANDRA-14556 > Project: Cassandra > Issue Type: Improvement > Components: Streaming and Messaging >Reporter: Dinesh Joshi >Assignee: Dinesh Joshi >Priority: Major > Labels: Performance > Fix For: 4.x > > > During streaming, Cassandra reifies the sstables into objects. This creates > unnecessary garbage and slows down the whole streaming process as some > sstables can be transferred as a whole file rather than individual > partitions. The objective of the ticket is to detect when a whole sstable can > be transferred and skip the object reification. We can also use a zero-copy > path to avoid bringing data into user-space on both sending and receiving > side. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16554973#comment-16554973 ] ASF GitHub Bot commented on CASSANDRA-14556: Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r204954749 --- Diff: src/java/org/apache/cassandra/config/DatabaseDescriptor.java --- @@ -2260,6 +2260,20 @@ public static int getStreamingConnectionsPerHost() return conf.streaming_connections_per_host; } +public static boolean isFullSSTableTransfersEnabled() +{ +if (conf.server_encryption_options.enabled || conf.server_encryption_options.optional) +{ +logger.debug("Internode encryption enabled. Disabling zero copy SSTable transfers for streaming."); --- End diff -- Done > Optimize streaming path in Cassandra > > > Key: CASSANDRA-14556 > URL: https://issues.apache.org/jira/browse/CASSANDRA-14556 > Project: Cassandra > Issue Type: Improvement > Components: Streaming and Messaging >Reporter: Dinesh Joshi >Assignee: Dinesh Joshi >Priority: Major > Labels: Performance > Fix For: 4.x > > > During streaming, Cassandra reifies the sstables into objects. This creates > unnecessary garbage and slows down the whole streaming process as some > sstables can be transferred as a whole file rather than individual > partitions. The objective of the ticket is to detect when a whole sstable can > be transferred and skip the object reification. We can also use a zero-copy > path to avoid bringing data into user-space on both sending and receiving > side. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16554974#comment-16554974 ] ASF GitHub Bot commented on CASSANDRA-14556: Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r204954805 --- Diff: src/java/org/apache/cassandra/db/streaming/CassandraBlockStreamReader.java --- @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.streaming; + +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.Set; + +import com.google.common.base.Throwables; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.Directories; +import org.apache.cassandra.db.SerializationHeader; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.SSTableMultiWriter; +import org.apache.cassandra.io.sstable.format.SSTableFormat; +import org.apache.cassandra.io.sstable.format.Version; +import org.apache.cassandra.io.sstable.format.big.BigTableBlockWriter; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.streaming.ProgressInfo; +import org.apache.cassandra.streaming.StreamReceiver; +import org.apache.cassandra.streaming.StreamSession; +import org.apache.cassandra.streaming.messages.StreamMessageHeader; +import org.apache.cassandra.utils.Collectors3; +import org.apache.cassandra.utils.FBUtilities; + +/** + * CassandraBlockStreamReader reads SSTable off the wire and writes it to disk. + */ +public class CassandraBlockStreamReader implements IStreamReader +{ +private static final Logger logger = LoggerFactory.getLogger(CassandraBlockStreamReader.class); +protected final TableId tableId; +protected final StreamSession session; +protected final int sstableLevel; --- End diff -- Cleaned up the protected fields. I still need to reset the level. > Optimize streaming path in Cassandra > > > Key: CASSANDRA-14556 > URL: https://issues.apache.org/jira/browse/CASSANDRA-14556 > Project: Cassandra > Issue Type: Improvement > Components: Streaming and Messaging >Reporter: Dinesh Joshi >Assignee: Dinesh Joshi >Priority: Major > Labels: Performance > Fix For: 4.x > > > During streaming, Cassandra reifies the sstables into objects. This creates > unnecessary garbage and slows down the whole streaming process as some > sstables can be transferred as a whole file rather than individual > partitions. The objective of the ticket is to detect when a whole sstable can > be transferred and skip the object reification. We can also use a zero-copy > path to avoid bringing data into user-space on both sending and receiving > side. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16554971#comment-16554971 ] ASF GitHub Bot commented on CASSANDRA-14556: Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r204954729 --- Diff: src/java/org/apache/cassandra/db/streaming/ComponentInfo.java --- @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.streaming; + +import java.io.IOException; + +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; + +public class ComponentInfo +{ +final Component.Type type; +final long length; + +public ComponentInfo(Component.Type type, long length) +{ +assert length >= 0 : "Component length cannot be negative"; +this.type = type; +this.length = length; +} + +@Override +public String toString() +{ +return "ComponentInfo{" + + "type=" + type + + ", length=" + length + + '}'; +} + +public boolean equals(Object o) --- End diff -- `CassandraStreamHeader`'s `equals()` and `hashCode()` method are not dead code. They're used in `CassandraStreamHeaderTest::serializerTest` which eventually ends up in [`SerializationUtils::assertSerializationCycle`](https://github.com/apache/cassandra/blob/9714a7c817b64a3358f69e536535c756c5c6df48/test/unit/org/apache/cassandra/serializers/SerializationUtils.java#L60). This is what originally forced me to implement those methods. The only other choice was to test the equality for `ComponentInfo` in `CassandraStreamHeader::equal()` which would have been a bad choice. > Optimize streaming path in Cassandra > > > Key: CASSANDRA-14556 > URL: https://issues.apache.org/jira/browse/CASSANDRA-14556 > Project: Cassandra > Issue Type: Improvement > Components: Streaming and Messaging >Reporter: Dinesh Joshi >Assignee: Dinesh Joshi >Priority: Major > Labels: Performance > Fix For: 4.x > > > During streaming, Cassandra reifies the sstables into objects. This creates > unnecessary garbage and slows down the whole streaming process as some > sstables can be transferred as a whole file rather than individual > partitions. The objective of the ticket is to detect when a whole sstable can > be transferred and skip the object reification. We can also use a zero-copy > path to avoid bringing data into user-space on both sending and receiving > side. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16554962#comment-16554962 ] ASF GitHub Bot commented on CASSANDRA-14556: Github user dineshjoshi commented on the issue: https://github.com/apache/cassandra/pull/239 @iamaleksey I have resolved most of your comments. I still have a couple to go. I will update this PR when I am done with those and get a clean dtest run. > Optimize streaming path in Cassandra > > > Key: CASSANDRA-14556 > URL: https://issues.apache.org/jira/browse/CASSANDRA-14556 > Project: Cassandra > Issue Type: Improvement > Components: Streaming and Messaging >Reporter: Dinesh Joshi >Assignee: Dinesh Joshi >Priority: Major > Labels: Performance > Fix For: 4.x > > > During streaming, Cassandra reifies the sstables into objects. This creates > unnecessary garbage and slows down the whole streaming process as some > sstables can be transferred as a whole file rather than individual > partitions. The objective of the ticket is to detect when a whole sstable can > be transferred and skip the object reification. We can also use a zero-copy > path to avoid bringing data into user-space on both sending and receiving > side. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16554684#comment-16554684 ] ASF GitHub Bot commented on CASSANDRA-14556: Github user iamaleksey commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r204875677 --- Diff: src/java/org/apache/cassandra/db/streaming/ComponentInfo.java --- @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.streaming; + +import java.io.IOException; + +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; + +public class ComponentInfo +{ +final Component.Type type; +final long length; + +public ComponentInfo(Component.Type type, long length) +{ +assert length >= 0 : "Component length cannot be negative"; +this.type = type; +this.length = length; +} + +@Override +public String toString() +{ +return "ComponentInfo{" + + "type=" + type + + ", length=" + length + + '}'; +} + +public boolean equals(Object o) --- End diff -- I shouldn't have said "it's generally considered" here, as apparently it's not quite, as @aweisberg pointed out. It's a subjective preference, and what I'd do, but feel free to disregard this particular comment. > Optimize streaming path in Cassandra > > > Key: CASSANDRA-14556 > URL: https://issues.apache.org/jira/browse/CASSANDRA-14556 > Project: Cassandra > Issue Type: Improvement > Components: Streaming and Messaging >Reporter: Dinesh Joshi >Assignee: Dinesh Joshi >Priority: Major > Labels: Performance > Fix For: 4.x > > > During streaming, Cassandra reifies the sstables into objects. This creates > unnecessary garbage and slows down the whole streaming process as some > sstables can be transferred as a whole file rather than individual > partitions. The objective of the ticket is to detect when a whole sstable can > be transferred and skip the object reification. We can also use a zero-copy > path to avoid bringing data into user-space on both sending and receiving > side. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16554675#comment-16554675 ] ASF GitHub Bot commented on CASSANDRA-14556: Github user iamaleksey commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r204874237 --- Diff: src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java --- @@ -114,13 +155,51 @@ public void write(StreamSession session, DataOutputStreamPlus out, int version) CassandraStreamHeader.serializer.serialize(header, out, version); out.flush(); -CassandraStreamWriter writer = header.compressionInfo == null ? - new CassandraStreamWriter(sstable, header.sections, session) : - new CompressedCassandraStreamWriter(sstable, header.sections, - header.compressionInfo, session); +IStreamWriter writer; +if (shouldStreamFullSSTable()) +{ +writer = new CassandraBlockStreamWriter(sstable, session, components); +} +else +{ +writer = (header.compressionInfo == null) ? + new CassandraStreamWriter(sstable, header.sections, session) : + new CompressedCassandraStreamWriter(sstable, header.sections, + header.compressionInfo, session); +} writer.write(out); } +@VisibleForTesting +public boolean shouldStreamFullSSTable() +{ +return isFullSSTableTransfersEnabled && isFullyContained; +} + +@VisibleForTesting +public boolean fullyContainedIn(List> normalizedRanges, SSTableReader sstable) +{ +if (normalizedRanges == null) +return false; + +RangeOwnHelper rangeOwnHelper = new RangeOwnHelper(normalizedRanges); +try (KeyIterator iter = new KeyIterator(sstable.descriptor, sstable.metadata())) +{ +while (iter.hasNext()) +{ +DecoratedKey key = iter.next(); +try +{ +rangeOwnHelper.check(key); +} catch(RuntimeException e) --- End diff -- @dineshjoshi Duplicating a reply here: I think it's fine to not block this PR on the improvements, but disable zero copy for STCS for now, and commit that way. Once we optimise the containment check - ideally before 4.0 - enable for STCS as well. Encoding information about the effective contained ranges is one way. Alternative way would be to invert the ranges we are checking for, and look for presence of keys in the gaps - but relying on summary and index to efficiently search rather than linearly scan. But again - let's temporarily disable zero copy for STCS for now, to not delay the ticket beyond necessary. > Optimize streaming path in Cassandra > > > Key: CASSANDRA-14556 > URL: https://issues.apache.org/jira/browse/CASSANDRA-14556 > Project: Cassandra > Issue Type: Improvement > Components: Streaming and Messaging >Reporter: Dinesh Joshi >Assignee: Dinesh Joshi >Priority: Major > Labels: Performance > Fix For: 4.x > > > During streaming, Cassandra reifies the sstables into objects. This creates > unnecessary garbage and slows down the whole streaming process as some > sstables can be transferred as a whole file rather than individual > partitions. The objective of the ticket is to detect when a whole sstable can > be transferred and skip the object reification. We can also use a zero-copy > path to avoid bringing data into user-space on both sending and receiving > side. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16554669#comment-16554669 ] ASF GitHub Bot commented on CASSANDRA-14556: Github user iamaleksey commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r204872490 --- Diff: src/java/org/apache/cassandra/db/lifecycle/LogFile.java --- @@ -66,7 +66,7 @@ private final LogReplicaSet replicas = new LogReplicaSet(); // The transaction records, this set must be ORDER PRESERVING -private final LinkedHashSet records = new LinkedHashSet<>(); +private final Set records = Collections.synchronizedSet(new LinkedHashSet<>()); // TODO: Hack until we fix CASSANDRA-14554 --- End diff -- @dineshjoshi Ah. I see, that makes sense. The comment made it seem like this was supposed to be the fix for the issue, but this is just for the tests to pass? I'm good with that, given the extra context. > Optimize streaming path in Cassandra > > > Key: CASSANDRA-14556 > URL: https://issues.apache.org/jira/browse/CASSANDRA-14556 > Project: Cassandra > Issue Type: Improvement > Components: Streaming and Messaging >Reporter: Dinesh Joshi >Assignee: Dinesh Joshi >Priority: Major > Labels: Performance > Fix For: 4.x > > > During streaming, Cassandra reifies the sstables into objects. This creates > unnecessary garbage and slows down the whole streaming process as some > sstables can be transferred as a whole file rather than individual > partitions. The objective of the ticket is to detect when a whole sstable can > be transferred and skip the object reification. We can also use a zero-copy > path to avoid bringing data into user-space on both sending and receiving > side. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16554560#comment-16554560 ] ASF GitHub Bot commented on CASSANDRA-14556: Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r204842484 --- Diff: src/java/org/apache/cassandra/db/lifecycle/LogFile.java --- @@ -66,7 +66,7 @@ private final LogReplicaSet replicas = new LogReplicaSet(); // The transaction records, this set must be ORDER PRESERVING -private final LinkedHashSet records = new LinkedHashSet<>(); +private final Set records = Collections.synchronizedSet(new LinkedHashSet<>()); // TODO: Hack until we fix CASSANDRA-14554 --- End diff -- @iamaleksey I don't think it is sufficient but we have to address this conclusively as part of 14554. I've already spoken to @belliottsmith regarding it and have a plan to fix this. > Optimize streaming path in Cassandra > > > Key: CASSANDRA-14556 > URL: https://issues.apache.org/jira/browse/CASSANDRA-14556 > Project: Cassandra > Issue Type: Improvement > Components: Streaming and Messaging >Reporter: Dinesh Joshi >Assignee: Dinesh Joshi >Priority: Major > Labels: Performance > Fix For: 4.x > > > During streaming, Cassandra reifies the sstables into objects. This creates > unnecessary garbage and slows down the whole streaming process as some > sstables can be transferred as a whole file rather than individual > partitions. The objective of the ticket is to detect when a whole sstable can > be transferred and skip the object reification. We can also use a zero-copy > path to avoid bringing data into user-space on both sending and receiving > side. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16554515#comment-16554515 ] ASF GitHub Bot commented on CASSANDRA-14556: Github user iamaleksey commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r204831709 --- Diff: src/java/org/apache/cassandra/db/lifecycle/LogFile.java --- @@ -66,7 +66,7 @@ private final LogReplicaSet replicas = new LogReplicaSet(); // The transaction records, this set must be ORDER PRESERVING -private final LinkedHashSet records = new LinkedHashSet<>(); +private final Set records = Collections.synchronizedSet(new LinkedHashSet<>()); // TODO: Hack until we fix CASSANDRA-14554 --- End diff -- I'm not very well familiar with `LifeCycleTransaction` code, but I'm not sure this is sufficient as a workaround. @belliottsmith Could you please confirm that this - or isn't - sufficient? Here and the equivalent change to `LogReplicaSet`. > Optimize streaming path in Cassandra > > > Key: CASSANDRA-14556 > URL: https://issues.apache.org/jira/browse/CASSANDRA-14556 > Project: Cassandra > Issue Type: Improvement > Components: Streaming and Messaging >Reporter: Dinesh Joshi >Assignee: Dinesh Joshi >Priority: Major > Labels: Performance > Fix For: 4.x > > > During streaming, Cassandra reifies the sstables into objects. This creates > unnecessary garbage and slows down the whole streaming process as some > sstables can be transferred as a whole file rather than individual > partitions. The objective of the ticket is to detect when a whole sstable can > be transferred and skip the object reification. We can also use a zero-copy > path to avoid bringing data into user-space on both sending and receiving > side. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16553565#comment-16553565 ] ASF GitHub Bot commented on CASSANDRA-14556: Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r204586299 --- Diff: src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java --- @@ -114,13 +155,51 @@ public void write(StreamSession session, DataOutputStreamPlus out, int version) CassandraStreamHeader.serializer.serialize(header, out, version); out.flush(); -CassandraStreamWriter writer = header.compressionInfo == null ? - new CassandraStreamWriter(sstable, header.sections, session) : - new CompressedCassandraStreamWriter(sstable, header.sections, - header.compressionInfo, session); +IStreamWriter writer; +if (shouldStreamFullSSTable()) +{ +writer = new CassandraBlockStreamWriter(sstable, session, components); +} +else +{ +writer = (header.compressionInfo == null) ? + new CassandraStreamWriter(sstable, header.sections, session) : + new CompressedCassandraStreamWriter(sstable, header.sections, + header.compressionInfo, session); +} writer.write(out); } +@VisibleForTesting +public boolean shouldStreamFullSSTable() +{ +return isFullSSTableTransfersEnabled && isFullyContained; +} + +@VisibleForTesting +public boolean fullyContainedIn(List> normalizedRanges, SSTableReader sstable) +{ +if (normalizedRanges == null) +return false; + +RangeOwnHelper rangeOwnHelper = new RangeOwnHelper(normalizedRanges); +try (KeyIterator iter = new KeyIterator(sstable.descriptor, sstable.metadata())) +{ +while (iter.hasNext()) +{ +DecoratedKey key = iter.next(); +try +{ +rangeOwnHelper.check(key); +} catch(RuntimeException e) --- End diff -- @iamaleksey thank you for the useful feedback. I did discuss this with @krummas and I believe while there was a room for improvement, the thinking back then was that the benefits would outweigh the cost. I looked through the codebase and this was the best way to definitely verify range containment as I was going for correctness. That said, what you suggest is obviously better. I am concerned about scope creep in this PR. Would it be ok if we address it as part of a separate PR? It would also be useful, if we could design the effective range computation and storage in the metadata. I am not sure what sort of gotchas I might run into. > Optimize streaming path in Cassandra > > > Key: CASSANDRA-14556 > URL: https://issues.apache.org/jira/browse/CASSANDRA-14556 > Project: Cassandra > Issue Type: Improvement > Components: Streaming and Messaging >Reporter: Dinesh Joshi >Assignee: Dinesh Joshi >Priority: Major > Labels: Performance > Fix For: 4.x > > > During streaming, Cassandra reifies the sstables into objects. This creates > unnecessary garbage and slows down the whole streaming process as some > sstables can be transferred as a whole file rather than individual > partitions. The objective of the ticket is to detect when a whole sstable can > be transferred and skip the object reification. We can also use a zero-copy > path to avoid bringing data into user-space on both sending and receiving > side. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16553429#comment-16553429 ] ASF GitHub Bot commented on CASSANDRA-14556: Github user aweisberg commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r204562307 --- Diff: src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java --- @@ -114,13 +155,51 @@ public void write(StreamSession session, DataOutputStreamPlus out, int version) CassandraStreamHeader.serializer.serialize(header, out, version); out.flush(); -CassandraStreamWriter writer = header.compressionInfo == null ? - new CassandraStreamWriter(sstable, header.sections, session) : - new CompressedCassandraStreamWriter(sstable, header.sections, - header.compressionInfo, session); +IStreamWriter writer; +if (shouldStreamFullSSTable()) +{ +writer = new CassandraBlockStreamWriter(sstable, session, components); +} +else +{ +writer = (header.compressionInfo == null) ? + new CassandraStreamWriter(sstable, header.sections, session) : + new CompressedCassandraStreamWriter(sstable, header.sections, + header.compressionInfo, session); +} writer.write(out); } +@VisibleForTesting +public boolean shouldStreamFullSSTable() +{ +return isFullSSTableTransfersEnabled && isFullyContained; +} + +@VisibleForTesting +public boolean fullyContainedIn(List> normalizedRanges, SSTableReader sstable) +{ +if (normalizedRanges == null) +return false; + +RangeOwnHelper rangeOwnHelper = new RangeOwnHelper(normalizedRanges); +try (KeyIterator iter = new KeyIterator(sstable.descriptor, sstable.metadata())) +{ +while (iter.hasNext()) +{ +DecoratedKey key = iter.next(); +try +{ +rangeOwnHelper.check(key); +} catch(RuntimeException e) --- End diff -- I mistakenly thought this index was a sampled index not a full index. Requiring a comparison of every partition key in every sstable for the entire data set seems like a big regression for some use cases. I was trying and failing to find the reasoning for why we switched to this. > Optimize streaming path in Cassandra > > > Key: CASSANDRA-14556 > URL: https://issues.apache.org/jira/browse/CASSANDRA-14556 > Project: Cassandra > Issue Type: Improvement > Components: Streaming and Messaging >Reporter: Dinesh Joshi >Assignee: Dinesh Joshi >Priority: Major > Labels: Performance > Fix For: 4.x > > > During streaming, Cassandra reifies the sstables into objects. This creates > unnecessary garbage and slows down the whole streaming process as some > sstables can be transferred as a whole file rather than individual > partitions. The objective of the ticket is to detect when a whole sstable can > be transferred and skip the object reification. We can also use a zero-copy > path to avoid bringing data into user-space on both sending and receiving > side. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16553343#comment-16553343 ] ASF GitHub Bot commented on CASSANDRA-14556: Github user iamaleksey commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r204537184 --- Diff: src/java/org/apache/cassandra/db/streaming/ComponentInfo.java --- @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.streaming; + +import java.io.IOException; + +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; + +public class ComponentInfo +{ +final Component.Type type; +final long length; + +public ComponentInfo(Component.Type type, long length) +{ +assert length >= 0 : "Component length cannot be negative"; +this.type = type; +this.length = length; +} + +@Override +public String toString() +{ +return "ComponentInfo{" + + "type=" + type + + ", length=" + length + + '}'; +} + +public boolean equals(Object o) --- End diff -- It's generally considered to be a bad practice to implement `equals()` and `hashCode()` unless that class is stored in a set or a map - or an upstream implementation of such. Otherwise it's just confusing boilerplate (confusing because it implies that the class is used in ways it clearly isn't). In this case, there is a `List` field in `CassandraStreamHeader`, which has an `equals()`/`hashCode()` implementation, which on the surface justifies these. But those, in turn, are actually dead code. So what we should do is remove the implementations of `equals()` and `hashCode()` here, and do the same in `CassandraStreamHeader`, being good citizens. > Optimize streaming path in Cassandra > > > Key: CASSANDRA-14556 > URL: https://issues.apache.org/jira/browse/CASSANDRA-14556 > Project: Cassandra > Issue Type: Improvement > Components: Streaming and Messaging >Reporter: Dinesh Joshi >Assignee: Dinesh Joshi >Priority: Major > Labels: Performance > Fix For: 4.x > > > During streaming, Cassandra reifies the sstables into objects. This creates > unnecessary garbage and slows down the whole streaming process as some > sstables can be transferred as a whole file rather than individual > partitions. The objective of the ticket is to detect when a whole sstable can > be transferred and skip the object reification. We can also use a zero-copy > path to avoid bringing data into user-space on both sending and receiving > side. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16553342#comment-16553342 ] ASF GitHub Bot commented on CASSANDRA-14556: Github user iamaleksey commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r204538045 --- Diff: src/java/org/apache/cassandra/db/streaming/ComponentInfo.java --- @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.streaming; + +import java.io.IOException; + +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; + +public class ComponentInfo --- End diff -- Now, if you look carefully at all current uses of `ComponentInfo`, you'll see that it's only ever being used as an element of lists, and never as a separate entity. As such it would be cleaner - and nicer to work with - to implement a class, say, `ComponentManifest` that would have an ordered map of `Component.Type` to `long` size, and expose the ordered keyset, with serializers for the whole manifest. As it's written now, ser/deser code is leaking outside, while it could instead be nicely encapsulated here. > Optimize streaming path in Cassandra > > > Key: CASSANDRA-14556 > URL: https://issues.apache.org/jira/browse/CASSANDRA-14556 > Project: Cassandra > Issue Type: Improvement > Components: Streaming and Messaging >Reporter: Dinesh Joshi >Assignee: Dinesh Joshi >Priority: Major > Labels: Performance > Fix For: 4.x > > > During streaming, Cassandra reifies the sstables into objects. This creates > unnecessary garbage and slows down the whole streaming process as some > sstables can be transferred as a whole file rather than individual > partitions. The objective of the ticket is to detect when a whole sstable can > be transferred and skip the object reification. We can also use a zero-copy > path to avoid bringing data into user-space on both sending and receiving > side. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16553322#comment-16553322 ] ASF GitHub Bot commented on CASSANDRA-14556: Github user iamaleksey commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r204531887 --- Diff: src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java --- @@ -114,13 +155,51 @@ public void write(StreamSession session, DataOutputStreamPlus out, int version) CassandraStreamHeader.serializer.serialize(header, out, version); out.flush(); -CassandraStreamWriter writer = header.compressionInfo == null ? - new CassandraStreamWriter(sstable, header.sections, session) : - new CompressedCassandraStreamWriter(sstable, header.sections, - header.compressionInfo, session); +IStreamWriter writer; +if (shouldStreamFullSSTable()) +{ +writer = new CassandraBlockStreamWriter(sstable, session, components); +} +else +{ +writer = (header.compressionInfo == null) ? + new CassandraStreamWriter(sstable, header.sections, session) : + new CompressedCassandraStreamWriter(sstable, header.sections, + header.compressionInfo, session); +} writer.write(out); } +@VisibleForTesting +public boolean shouldStreamFullSSTable() +{ +return isFullSSTableTransfersEnabled && isFullyContained; +} + +@VisibleForTesting +public boolean fullyContainedIn(List> normalizedRanges, SSTableReader sstable) +{ +if (normalizedRanges == null) +return false; + +RangeOwnHelper rangeOwnHelper = new RangeOwnHelper(normalizedRanges); +try (KeyIterator iter = new KeyIterator(sstable.descriptor, sstable.metadata())) +{ +while (iter.hasNext()) +{ +DecoratedKey key = iter.next(); +try +{ +rangeOwnHelper.check(key); +} catch(RuntimeException e) --- End diff -- On a more general note, this is potentially quite an expensive thing to do, especially for big sstables with skinny partitions, and in some cases this will introduce a performance regression. The whole optimisation is realistically only useful for bootrstrap, decom, and rebuild, with LCS (which is still plenty useful and impactful and worth having). But it wouldn't normally kick in for regular repairs because of the full-cover requirement, and it won't normally kick in for STCS until CASSANDRA-10540 (range aware compaction) is implemented. In those cases having to read through the whole primary index is a perf regression that we shouldn't allow to happen. The easiest way to avoid it would be to store sstable's effective token ranges in sstable metadata in relation to the node's ranges, making this check essentially free. Otherwise we should probably disable complete sstable streaming for STCS tables, at least until CASSANDRA-10540 is implemented. That however wouldn't address the regression to regular streaming, so keeping ranges in the metadata would be my preferred way to go. > Optimize streaming path in Cassandra > > > Key: CASSANDRA-14556 > URL: https://issues.apache.org/jira/browse/CASSANDRA-14556 > Project: Cassandra > Issue Type: Improvement > Components: Streaming and Messaging >Reporter: Dinesh Joshi >Assignee: Dinesh Joshi >Priority: Major > Labels: Performance > Fix For: 4.x > > > During streaming, Cassandra reifies the sstables into objects. This creates > unnecessary garbage and slows down the whole streaming process as some > sstables can be transferred as a whole file rather than individual > partitions. The objective of the ticket is to detect when a whole sstable can > be transferred and skip the object reification. We can also use a zero-copy > path to avoid bringing data into user-space on both sending and receiving > side. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16553033#comment-16553033 ] ASF GitHub Bot commented on CASSANDRA-14556: Github user iamaleksey commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r204458523 --- Diff: src/java/org/apache/cassandra/config/DatabaseDescriptor.java --- @@ -2260,6 +2260,20 @@ public static int getStreamingConnectionsPerHost() return conf.streaming_connections_per_host; } +public static boolean isFullSSTableTransfersEnabled() +{ +if (conf.server_encryption_options.enabled || conf.server_encryption_options.optional) +{ +logger.debug("Internode encryption enabled. Disabling zero copy SSTable transfers for streaming."); --- End diff -- Nobody will ever see this at `debug` level. We should at minimum `warn` if `streaming_zerocopy_sstables_enabled` is and internode encryption are both enabled at the same time. > Optimize streaming path in Cassandra > > > Key: CASSANDRA-14556 > URL: https://issues.apache.org/jira/browse/CASSANDRA-14556 > Project: Cassandra > Issue Type: Improvement > Components: Streaming and Messaging >Reporter: Dinesh Joshi >Assignee: Dinesh Joshi >Priority: Major > Labels: Performance > Fix For: 4.x > > > During streaming, Cassandra reifies the sstables into objects. This creates > unnecessary garbage and slows down the whole streaming process as some > sstables can be transferred as a whole file rather than individual > partitions. The objective of the ticket is to detect when a whole sstable can > be transferred and skip the object reification. We can also use a zero-copy > path to avoid bringing data into user-space on both sending and receiving > side. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16553030#comment-16553030 ] ASF GitHub Bot commented on CASSANDRA-14556: Github user iamaleksey commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r204457225 --- Diff: src/java/org/apache/cassandra/db/streaming/CassandraBlockStreamReader.java --- @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.streaming; + +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.Set; + +import com.google.common.base.Throwables; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.Directories; +import org.apache.cassandra.db.SerializationHeader; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.SSTableMultiWriter; +import org.apache.cassandra.io.sstable.format.SSTableFormat; +import org.apache.cassandra.io.sstable.format.Version; +import org.apache.cassandra.io.sstable.format.big.BigTableBlockWriter; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.streaming.ProgressInfo; +import org.apache.cassandra.streaming.StreamReceiver; +import org.apache.cassandra.streaming.StreamSession; +import org.apache.cassandra.streaming.messages.StreamMessageHeader; +import org.apache.cassandra.utils.Collectors3; +import org.apache.cassandra.utils.FBUtilities; + +/** + * CassandraBlockStreamReader reads SSTable off the wire and writes it to disk. + */ +public class CassandraBlockStreamReader implements IStreamReader +{ +private static final Logger logger = LoggerFactory.getLogger(CassandraBlockStreamReader.class); +protected final TableId tableId; +protected final StreamSession session; +protected final int sstableLevel; --- End diff -- It has taken me some time (and @krummas's help) to prove that this wasn't a correctness issue, but at its best this is confusing/misleading code. We extract `sstableLevel` from the header, but don't use it anywhere. Instead, since we stream `StatsMetadata` directly, we also inherit the level from there - regardless of whether `CassandraOutgoingStream.keepSSTableLevel` is set to `true`. If `LeveledManifest.canAddSSTable` check didn't exist, we'd be in trouble here. For clarity, I would probably look at that flag, and explicitly reset the level to `L0` if `keepSSTableLevel` is set to `false`. P.S. What's the deal with all these `protected` fields? > Optimize streaming path in Cassandra > > > Key: CASSANDRA-14556 > URL: https://issues.apache.org/jira/browse/CASSANDRA-14556 > Project: Cassandra > Issue Type: Improvement > Components: Streaming and Messaging >Reporter: Dinesh Joshi >Assignee: Dinesh Joshi >Priority: Major > Labels: Performance > Fix For: 4.x > > > During streaming, Cassandra reifies the sstables into objects. This creates > unnecessary garbage and slows down the whole streaming process as some > sstables can be transferred as a whole file rather than individual > partitions. The objective of the ticket is to detect when a whole sstable can > be transferred and skip the object reification. We can also use a zero-copy > path to avoid bringing data into user-space on both sending and receiving > side. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16552971#comment-16552971 ] ASF GitHub Bot commented on CASSANDRA-14556: Github user iamaleksey commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r204437695 --- Diff: src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java --- @@ -114,13 +155,51 @@ public void write(StreamSession session, DataOutputStreamPlus out, int version) CassandraStreamHeader.serializer.serialize(header, out, version); out.flush(); -CassandraStreamWriter writer = header.compressionInfo == null ? - new CassandraStreamWriter(sstable, header.sections, session) : - new CompressedCassandraStreamWriter(sstable, header.sections, - header.compressionInfo, session); +IStreamWriter writer; +if (shouldStreamFullSSTable()) +{ +writer = new CassandraBlockStreamWriter(sstable, session, components); +} +else +{ +writer = (header.compressionInfo == null) ? + new CassandraStreamWriter(sstable, header.sections, session) : + new CompressedCassandraStreamWriter(sstable, header.sections, + header.compressionInfo, session); +} writer.write(out); } +@VisibleForTesting +public boolean shouldStreamFullSSTable() +{ +return isFullSSTableTransfersEnabled && isFullyContained; +} + +@VisibleForTesting +public boolean fullyContainedIn(List> normalizedRanges, SSTableReader sstable) +{ +if (normalizedRanges == null) +return false; + +RangeOwnHelper rangeOwnHelper = new RangeOwnHelper(normalizedRanges); +try (KeyIterator iter = new KeyIterator(sstable.descriptor, sstable.metadata())) +{ +while (iter.hasNext()) +{ +DecoratedKey key = iter.next(); +try +{ +rangeOwnHelper.check(key); +} catch(RuntimeException e) --- End diff -- Catching `RuntimeException` is really not the way we should be using `RangeOwnHelper` here. Can you refactor `RangeOwnHelper` to introduce a method that would return a `boolean` instead? > Optimize streaming path in Cassandra > > > Key: CASSANDRA-14556 > URL: https://issues.apache.org/jira/browse/CASSANDRA-14556 > Project: Cassandra > Issue Type: Improvement > Components: Streaming and Messaging >Reporter: Dinesh Joshi >Assignee: Dinesh Joshi >Priority: Major > Labels: Performance > Fix For: 4.x > > > During streaming, Cassandra reifies the sstables into objects. This creates > unnecessary garbage and slows down the whole streaming process as some > sstables can be transferred as a whole file rather than individual > partitions. The objective of the ticket is to detect when a whole sstable can > be transferred and skip the object reification. We can also use a zero-copy > path to avoid bringing data into user-space on both sending and receiving > side. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16552625#comment-16552625 ] Aleksey Yeschenko commented on CASSANDRA-14556: --- [~jjirsa] It's already there, see {{StatsMetadata.hasLegacyCounterShards}}, as mentioned in the previous comment. > Optimize streaming path in Cassandra > > > Key: CASSANDRA-14556 > URL: https://issues.apache.org/jira/browse/CASSANDRA-14556 > Project: Cassandra > Issue Type: Improvement > Components: Streaming and Messaging >Reporter: Dinesh Joshi >Assignee: Dinesh Joshi >Priority: Major > Labels: Performance > Fix For: 4.x > > > During streaming, Cassandra reifies the sstables into objects. This creates > unnecessary garbage and slows down the whole streaming process as some > sstables can be transferred as a whole file rather than individual > partitions. The objective of the ticket is to detect when a whole sstable can > be transferred and skip the object reification. We can also use a zero-copy > path to avoid bringing data into user-space on both sending and receiving > side. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16552157#comment-16552157 ] Jeff Jirsa commented on CASSANDRA-14556: In the future (not this patch), would it make sense to add presence of legacy shards to sstable metadata? Would let us potentially take this path more often, and maybe we can use it for the eventual ticket where we’ll clean them up. > Optimize streaming path in Cassandra > > > Key: CASSANDRA-14556 > URL: https://issues.apache.org/jira/browse/CASSANDRA-14556 > Project: Cassandra > Issue Type: Improvement > Components: Streaming and Messaging >Reporter: Dinesh Joshi >Assignee: Dinesh Joshi >Priority: Major > Labels: Performance > Fix For: 4.x > > > During streaming, Cassandra reifies the sstables into objects. This creates > unnecessary garbage and slows down the whole streaming process as some > sstables can be transferred as a whole file rather than individual > partitions. The objective of the ticket is to detect when a whole sstable can > be transferred and skip the object reification. We can also use a zero-copy > path to avoid bringing data into user-space on both sending and receiving > side. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16551818#comment-16551818 ] Aleksey Yeschenko commented on CASSANDRA-14556: --- Just to be clear - what I want to look at and verify is that there are no broader interactions missed here, things that someone new to C* codebase wouldn't necessarily notice or consider. I do trust Ariel's review of the lower level logic and don't intend to do a thorough review of that. After a very quick skim, off the top of my head: the patch breaks counters if pre-2.0 shards are still around (and there is nothing there to remove them, so it's just a matter of the table having lived long enough). Receiving side needs to convert acquired received local shards into remote ones, to not break reconcile logic. See [here|https://github.com/apache/cassandra/blob/645d8278bcf6281c8272f82d0d661e386a7cbe7d/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java#L265-L273]. As such, {{shouldStreamFullSSTable()}} should be modified to only return true for counter tables if there are no legacy shards present (see {{StatsMetadata.hasLegacyCounterShards}}). > Optimize streaming path in Cassandra > > > Key: CASSANDRA-14556 > URL: https://issues.apache.org/jira/browse/CASSANDRA-14556 > Project: Cassandra > Issue Type: Improvement > Components: Streaming and Messaging >Reporter: Dinesh Joshi >Assignee: Dinesh Joshi >Priority: Major > Labels: Performance > Fix For: 4.x > > > During streaming, Cassandra reifies the sstables into objects. This creates > unnecessary garbage and slows down the whole streaming process as some > sstables can be transferred as a whole file rather than individual > partitions. The objective of the ticket is to detect when a whole sstable can > be transferred and skip the object reification. We can also use a zero-copy > path to avoid bringing data into user-space on both sending and receiving > side. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16545581#comment-16545581 ] Aleksey Yeschenko commented on CASSANDRA-14556: --- [~aweisberg] sure! > Optimize streaming path in Cassandra > > > Key: CASSANDRA-14556 > URL: https://issues.apache.org/jira/browse/CASSANDRA-14556 > Project: Cassandra > Issue Type: Improvement > Components: Streaming and Messaging >Reporter: Dinesh Joshi >Assignee: Dinesh Joshi >Priority: Major > Labels: Performance > Fix For: 4.x > > > During streaming, Cassandra reifies the sstables into objects. This creates > unnecessary garbage and slows down the whole streaming process as some > sstables can be transferred as a whole file rather than individual > partitions. The objective of the ticket is to detect when a whole sstable can > be transferred and skip the object reification. We can also use a zero-copy > path to avoid bringing data into user-space on both sending and receiving > side. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16545576#comment-16545576 ] Ariel Weisberg commented on CASSANDRA-14556: [~iamaleksey] can you give it a final once over? > Optimize streaming path in Cassandra > > > Key: CASSANDRA-14556 > URL: https://issues.apache.org/jira/browse/CASSANDRA-14556 > Project: Cassandra > Issue Type: Improvement > Components: Streaming and Messaging >Reporter: Dinesh Joshi >Assignee: Dinesh Joshi >Priority: Major > Labels: Performance > Fix For: 4.x > > > During streaming, Cassandra reifies the sstables into objects. This creates > unnecessary garbage and slows down the whole streaming process as some > sstables can be transferred as a whole file rather than individual > partitions. The objective of the ticket is to detect when a whole sstable can > be transferred and skip the object reification. We can also use a zero-copy > path to avoid bringing data into user-space on both sending and receiving > side. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16545510#comment-16545510 ] Ariel Weisberg commented on CASSANDRA-14556: I am +1 on this. I'll commit it tomorrow unless someone else wants to give it another look. FYI the dtest is [here|https://github.com/apache/cassandra-dtest/compare/master...dineshjoshi:faster-streaming-rev2] > Optimize streaming path in Cassandra > > > Key: CASSANDRA-14556 > URL: https://issues.apache.org/jira/browse/CASSANDRA-14556 > Project: Cassandra > Issue Type: Improvement > Components: Streaming and Messaging >Reporter: Dinesh Joshi >Assignee: Dinesh Joshi >Priority: Major > Labels: Performance > Fix For: 4.x > > > During streaming, Cassandra reifies the sstables into objects. This creates > unnecessary garbage and slows down the whole streaming process as some > sstables can be transferred as a whole file rather than individual > partitions. The objective of the ticket is to detect when a whole sstable can > be transferred and skip the object reification. We can also use a zero-copy > path to avoid bringing data into user-space on both sending and receiving > side. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16545474#comment-16545474 ] ASF GitHub Bot commented on CASSANDRA-14556: Github user aweisberg commented on the issue: https://github.com/apache/cassandra/pull/239 Where is the dtest? The existing link doesn't seem to work and when I looked at your repo there are no branches with these tests? > Optimize streaming path in Cassandra > > > Key: CASSANDRA-14556 > URL: https://issues.apache.org/jira/browse/CASSANDRA-14556 > Project: Cassandra > Issue Type: Improvement > Components: Streaming and Messaging >Reporter: Dinesh Joshi >Assignee: Dinesh Joshi >Priority: Major > Labels: Performance > Fix For: 4.x > > > During streaming, Cassandra reifies the sstables into objects. This creates > unnecessary garbage and slows down the whole streaming process as some > sstables can be transferred as a whole file rather than individual > partitions. The objective of the ticket is to detect when a whole sstable can > be transferred and skip the object reification. We can also use a zero-copy > path to avoid bringing data into user-space on both sending and receiving > side. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16543921#comment-16543921 ] ASF GitHub Bot commented on CASSANDRA-14556: Github user dineshjoshi commented on the issue: https://github.com/apache/cassandra/pull/239 @aweisberg I have updated the dtest to test with different compaction strategies and addressed remaining comments. > Optimize streaming path in Cassandra > > > Key: CASSANDRA-14556 > URL: https://issues.apache.org/jira/browse/CASSANDRA-14556 > Project: Cassandra > Issue Type: Improvement > Components: Streaming and Messaging >Reporter: Dinesh Joshi >Assignee: Dinesh Joshi >Priority: Major > Labels: Performance > Fix For: 4.x > > > During streaming, Cassandra reifies the sstables into objects. This creates > unnecessary garbage and slows down the whole streaming process as some > sstables can be transferred as a whole file rather than individual > partitions. The objective of the ticket is to detect when a whole sstable can > be transferred and skip the object reification. We can also use a zero-copy > path to avoid bringing data into user-space on both sending and receiving > side. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16543873#comment-16543873 ] ASF GitHub Bot commented on CASSANDRA-14556: Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r202493566 --- Diff: src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java --- @@ -114,13 +153,54 @@ public void write(StreamSession session, DataOutputStreamPlus out, int version) CassandraStreamHeader.serializer.serialize(header, out, version); out.flush(); -CassandraStreamWriter writer = header.compressionInfo == null ? - new CassandraStreamWriter(sstable, header.sections, session) : - new CompressedCassandraStreamWriter(sstable, header.sections, - header.compressionInfo, session); +IStreamWriter writer; +if (shouldStreamFullSSTable()) +{ +writer = new CassandraBlockStreamWriter(sstable, session, components); +} +else +{ +writer = (header.compressionInfo == null) ? + new CassandraStreamWriter(sstable, header.sections, session) : + new CompressedCassandraStreamWriter(sstable, header.sections, + header.compressionInfo, session); +} writer.write(out); } +@VisibleForTesting +public boolean shouldStreamFullSSTable() +{ +return isFullSSTableTransfersEnabled && isFullyContained; +} + +@VisibleForTesting +public boolean fullyContainedIn(List> requestedRanges, SSTableReader sstable) +{ +if (requestedRanges == null) +return false; +try (KeyIterator iter = new KeyIterator(sstable.descriptor, sstable.metadata())) +{ +while (iter.hasNext()) +{ +DecoratedKey key = iter.next(); +boolean foundFlag = false; +for (Range r : requestedRanges) --- End diff -- I refactored it to use `RangeOwnHelper`. > Optimize streaming path in Cassandra > > > Key: CASSANDRA-14556 > URL: https://issues.apache.org/jira/browse/CASSANDRA-14556 > Project: Cassandra > Issue Type: Improvement > Components: Streaming and Messaging >Reporter: Dinesh Joshi >Assignee: Dinesh Joshi >Priority: Major > Labels: Performance > Fix For: 4.x > > > During streaming, Cassandra reifies the sstables into objects. This creates > unnecessary garbage and slows down the whole streaming process as some > sstables can be transferred as a whole file rather than individual > partitions. The objective of the ticket is to detect when a whole sstable can > be transferred and skip the object reification. We can also use a zero-copy > path to avoid bringing data into user-space on both sending and receiving > side. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16543863#comment-16543863 ] ASF GitHub Bot commented on CASSANDRA-14556: Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r202492216 --- Diff: conf/cassandra.yaml --- @@ -784,6 +784,15 @@ compaction_throughput_mb_per_sec: 16 # between the sstables, reducing page cache churn and keeping hot rows hot sstable_preemptive_open_interval_in_mb: 50 +# When enabled, permits Cassandra to zero copy eligible SSTables between --- End diff -- Updated. > Optimize streaming path in Cassandra > > > Key: CASSANDRA-14556 > URL: https://issues.apache.org/jira/browse/CASSANDRA-14556 > Project: Cassandra > Issue Type: Improvement > Components: Streaming and Messaging >Reporter: Dinesh Joshi >Assignee: Dinesh Joshi >Priority: Major > Labels: Performance > Fix For: 4.x > > > During streaming, Cassandra reifies the sstables into objects. This creates > unnecessary garbage and slows down the whole streaming process as some > sstables can be transferred as a whole file rather than individual > partitions. The objective of the ticket is to detect when a whole sstable can > be transferred and skip the object reification. We can also use a zero-copy > path to avoid bringing data into user-space on both sending and receiving > side. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org
[jira] [Commented] (CASSANDRA-14556) Optimize streaming path in Cassandra
[ https://issues.apache.org/jira/browse/CASSANDRA-14556?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16543857#comment-16543857 ] ASF GitHub Bot commented on CASSANDRA-14556: Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r202491699 --- Diff: src/java/org/apache/cassandra/net/async/RebufferingByteBufDataInputPlus.java --- @@ -249,4 +250,42 @@ public ByteBufAllocator getAllocator() { return channelConfig.getAllocator(); } + +/** + * Consumes bytes in the stream until the given length + * + * @param writer + * @param len + * @return + * @throws IOException + */ +public long consumeUntil(BufferedDataOutputStreamPlus writer, long len) throws IOException +{ +long copied = 0; // number of bytes copied +while (copied < len) +{ +if (buffer.remaining() == 0) +{ +try +{ +reBuffer(); +} +catch (EOFException e) +{ +throw new EOFException("EOF after " + copied + " bytes out of " + len); +} +if (buffer.remaining() == 0) +return copied == 0 ? -1 : copied; --- End diff -- Ok, makes sense. I have updated it to throw an `AssertionError`. > Optimize streaming path in Cassandra > > > Key: CASSANDRA-14556 > URL: https://issues.apache.org/jira/browse/CASSANDRA-14556 > Project: Cassandra > Issue Type: Improvement > Components: Streaming and Messaging >Reporter: Dinesh Joshi >Assignee: Dinesh Joshi >Priority: Major > Labels: Performance > Fix For: 4.x > > > During streaming, Cassandra reifies the sstables into objects. This creates > unnecessary garbage and slows down the whole streaming process as some > sstables can be transferred as a whole file rather than individual > partitions. The objective of the ticket is to detect when a whole sstable can > be transferred and skip the object reification. We can also use a zero-copy > path to avoid bringing data into user-space on both sending and receiving > side. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org