[GitHub] cassandra issue #239: [CASSANDRA-14556] Optimize Streaming
Github user dineshjoshi commented on the issue: https://github.com/apache/cassandra/pull/239 @iamaleksey made a few more changes - 1. Got rid of `IStreamWriter` 2. Ensured we're logging the configuration warning only once at start up iff zero copy streaming is enabled 3. Few stylistic changes --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #239: [CASSANDRA-14556] Optimize Streaming
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r205646170 --- Diff: src/java/org/apache/cassandra/db/streaming/ComponentManifest.java --- @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.streaming; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +import com.google.common.collect.Iterators; + +import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; + +public final class ComponentManifest implements Iterable +{ +private final LinkedHashMap components; + +public ComponentManifest(Map components) +{ +this.components = new LinkedHashMap<>(components); +} + +public long sizeOf(Component component) +{ +Long size = components.get(component); +if (size == null) +throw new IllegalArgumentException("Component " + component + " is not present in the manifest"); +return size; +} + +public long totalSize() +{ +long totalSize = 0; +for (Long size : components.values()) +totalSize += size; +return totalSize; +} + +public List components() +{ +return new ArrayList<>(components.keySet()); +} + +@Override +public boolean equals(Object o) +{ +if (this == o) +return true; + +if (!(o instanceof ComponentManifest)) +return false; + +ComponentManifest that = (ComponentManifest) o; +return components.equals(that.components); +} + +@Override +public int hashCode() +{ +return components.hashCode(); +} + +public static final IVersionedSerializer serializer = new IVersionedSerializer() +{ +public void serialize(ComponentManifest manifest, DataOutputPlus out, int version) throws IOException +{ +out.writeUnsignedVInt(manifest.components.size()); +for (Map.Entry entry : manifest.components.entrySet()) +{ +out.writeByte(entry.getKey().type.id); --- End diff -- Done. I'm just using `component.name`. I think this should be sufficient for this PR's scope. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #239: [CASSANDRA-14556] Optimize Streaming
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r205639791 --- Diff: src/java/org/apache/cassandra/db/streaming/CassandraBlockStreamReader.java --- @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.streaming; + +import java.io.File; +import java.io.IOException; +import java.util.Collection; + +import com.google.common.base.Throwables; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Directories; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.SSTableMultiWriter; +import org.apache.cassandra.io.sstable.format.big.BigTableBlockWriter; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.streaming.ProgressInfo; +import org.apache.cassandra.streaming.StreamReceiver; +import org.apache.cassandra.streaming.StreamSession; +import org.apache.cassandra.streaming.messages.StreamMessageHeader; + +import static java.lang.String.format; +import static org.apache.cassandra.utils.FBUtilities.prettyPrintMemory; + +/** + * CassandraBlockStreamReader reads SSTable off the wire and writes it to disk. + */ +public class CassandraBlockStreamReader implements IStreamReader +{ +private static final Logger logger = LoggerFactory.getLogger(CassandraBlockStreamReader.class); + +private final TableId tableId; +private final StreamSession session; +private final CassandraStreamHeader header; +private final int fileSequenceNumber; + +public CassandraBlockStreamReader(StreamMessageHeader messageHeader, CassandraStreamHeader streamHeader, StreamSession session) +{ +if (session.getPendingRepair() != null) +{ +// we should only ever be streaming pending repair sstables if the session has a pending repair id +if (!session.getPendingRepair().equals(messageHeader.pendingRepair)) +throw new IllegalStateException(format("Stream Session & SSTable (%s) pendingRepair UUID mismatch.", messageHeader.tableId)); +} + +this.header = streamHeader; +this.session = session; +this.tableId = messageHeader.tableId; +this.fileSequenceNumber = messageHeader.sequenceNumber; +} + +/** + * @param inputPlus where this reads data from + * @return SSTable transferred + * @throws IOException if reading the remote sstable fails. Will throw an RTE if local write fails. + */ +@SuppressWarnings("resource") // input needs to remain open, streams on top of it can't be closed +@Override +public SSTableMultiWriter read(DataInputPlus inputPlus) throws IOException +{ +ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(tableId); +if (cfs == null) +{ +// schema was dropped during streaming +throw new IOException("Table " + tableId + " was dropped during streaming"); +} + +ComponentManifest manifest = header.componentManifest; +long totalSize = manifest.totalSize(); + +logger.debug("[Stream #{}] Started receiving sstable #{} from {}, size = {}, table = {}", + session.planId(), + fileSequenceNumber, + session.peer, + prettyPrintMemory(totalSize), + cfs.metadata()); + +BigTableBlockWriter writer = null; --- End diff -- Done --- -
[GitHub] cassandra pull request #239: [CASSANDRA-14556] Optimize Streaming
Github user iamaleksey commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r205599465 --- Diff: src/java/org/apache/cassandra/db/streaming/ComponentManifest.java --- @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.streaming; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +import com.google.common.collect.Iterators; + +import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; + +public final class ComponentManifest implements Iterable +{ +private final LinkedHashMap components; + +public ComponentManifest(Map components) +{ +this.components = new LinkedHashMap<>(components); +} + +public long sizeOf(Component component) +{ +Long size = components.get(component); +if (size == null) +throw new IllegalArgumentException("Component " + component + " is not present in the manifest"); +return size; +} + +public long totalSize() +{ +long totalSize = 0; +for (Long size : components.values()) +totalSize += size; +return totalSize; +} + +public List components() +{ +return new ArrayList<>(components.keySet()); +} + +@Override +public boolean equals(Object o) +{ +if (this == o) +return true; + +if (!(o instanceof ComponentManifest)) +return false; + +ComponentManifest that = (ComponentManifest) o; +return components.equals(that.components); +} + +@Override +public int hashCode() +{ +return components.hashCode(); +} + +public static final IVersionedSerializer serializer = new IVersionedSerializer() +{ +public void serialize(ComponentManifest manifest, DataOutputPlus out, int version) throws IOException +{ +out.writeUnsignedVInt(manifest.components.size()); +for (Map.Entry entry : manifest.components.entrySet()) +{ +out.writeByte(entry.getKey().type.id); --- End diff -- FWIW, I realize that for most components this will be a bit redundant. Technically it's sufficient to just store `component.name`, and get the full `Component` via `Component.parse()`. If you don't like redundancy and want to do it that way, that's perfectly fine too - I'm cool with either option. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #239: [CASSANDRA-14556] Optimize Streaming
Github user iamaleksey commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r205587936 --- Diff: src/java/org/apache/cassandra/db/streaming/CassandraBlockStreamReader.java --- @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.streaming; + +import java.io.File; +import java.io.IOException; +import java.util.Collection; + +import com.google.common.base.Throwables; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Directories; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.SSTableMultiWriter; +import org.apache.cassandra.io.sstable.format.big.BigTableBlockWriter; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.streaming.ProgressInfo; +import org.apache.cassandra.streaming.StreamReceiver; +import org.apache.cassandra.streaming.StreamSession; +import org.apache.cassandra.streaming.messages.StreamMessageHeader; + +import static java.lang.String.format; +import static org.apache.cassandra.utils.FBUtilities.prettyPrintMemory; + +/** + * CassandraBlockStreamReader reads SSTable off the wire and writes it to disk. + */ +public class CassandraBlockStreamReader implements IStreamReader +{ +private static final Logger logger = LoggerFactory.getLogger(CassandraBlockStreamReader.class); + +private final TableId tableId; +private final StreamSession session; +private final CassandraStreamHeader header; +private final int fileSequenceNumber; + +public CassandraBlockStreamReader(StreamMessageHeader messageHeader, CassandraStreamHeader streamHeader, StreamSession session) +{ +if (session.getPendingRepair() != null) +{ +// we should only ever be streaming pending repair sstables if the session has a pending repair id +if (!session.getPendingRepair().equals(messageHeader.pendingRepair)) +throw new IllegalStateException(format("Stream Session & SSTable (%s) pendingRepair UUID mismatch.", messageHeader.tableId)); +} + +this.header = streamHeader; +this.session = session; +this.tableId = messageHeader.tableId; +this.fileSequenceNumber = messageHeader.sequenceNumber; +} + +/** + * @param inputPlus where this reads data from + * @return SSTable transferred + * @throws IOException if reading the remote sstable fails. Will throw an RTE if local write fails. + */ +@SuppressWarnings("resource") // input needs to remain open, streams on top of it can't be closed +@Override +public SSTableMultiWriter read(DataInputPlus inputPlus) throws IOException +{ +ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(tableId); +if (cfs == null) +{ +// schema was dropped during streaming +throw new IOException("Table " + tableId + " was dropped during streaming"); +} + +ComponentManifest manifest = header.componentManifest; +long totalSize = manifest.totalSize(); + +logger.debug("[Stream #{}] Started receiving sstable #{} from {}, size = {}, table = {}", + session.planId(), + fileSequenceNumber, + session.peer, + prettyPrintMemory(totalSize), + cfs.metadata()); + +BigTableBlockWriter writer = null; --- End diff -- It's likely that we'll have more formats in near future. I don't suggest we make
[GitHub] cassandra pull request #239: [CASSANDRA-14556] Optimize Streaming
Github user iamaleksey commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r205586651 --- Diff: src/java/org/apache/cassandra/db/streaming/ComponentManifest.java --- @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.streaming; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +import com.google.common.collect.Iterators; + +import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; + +public final class ComponentManifest implements Iterable +{ +private final LinkedHashMap components; + +public ComponentManifest(Map components) +{ +this.components = new LinkedHashMap<>(components); +} + +public long sizeOf(Component component) +{ +Long size = components.get(component); +if (size == null) +throw new IllegalArgumentException("Component " + component + " is not present in the manifest"); +return size; +} + +public long totalSize() +{ +long totalSize = 0; +for (Long size : components.values()) +totalSize += size; +return totalSize; +} + +public List components() +{ +return new ArrayList<>(components.keySet()); +} + +@Override +public boolean equals(Object o) +{ +if (this == o) +return true; + +if (!(o instanceof ComponentManifest)) +return false; + +ComponentManifest that = (ComponentManifest) o; +return components.equals(that.components); +} + +@Override +public int hashCode() +{ +return components.hashCode(); +} + +public static final IVersionedSerializer serializer = new IVersionedSerializer() +{ +public void serialize(ComponentManifest manifest, DataOutputPlus out, int version) throws IOException +{ +out.writeUnsignedVInt(manifest.components.size()); +for (Map.Entry entry : manifest.components.entrySet()) +{ +out.writeByte(entry.getKey().type.id); --- End diff -- Talked to @dineshjoshi offline, and we realised that this is incomplete - and neither was my proposed version. For completeness, when want to serialize the whole component info, not just its type. And it has two important fields - type and name. Name will usually be derived from the type, but not always. And even though we don't support streaming those components (custom and SI), we might want to change it in the future, and the protocol should allow it. So I suggest we encode`component.type.name()`, the full enum name, followed by `component.name()`. It's a little heavier, but this is completely irrelevant in the big picture, size-wise. The upside is that we can handle encode/decode any component necessary in the future, loss-free. And, again, we don't really need to assign ids. `valueOf()` is plenty good, and allows extension without overlap risk like in `Verb`. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra issue #236: 9608 trunk
Github user jasobrown commented on the issue: https://github.com/apache/cassandra/pull/236 Committed as sha `6ba2fb9395226491872b41312d978a169f36fcdb` --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #236: 9608 trunk
Github user jasobrown closed the pull request at: https://github.com/apache/cassandra/pull/236 --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #239: [CASSANDRA-14556] Optimize Streaming
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r205532095 --- Diff: src/java/org/apache/cassandra/db/streaming/CassandraBlockStreamWriter.java --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.streaming; + +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.channels.FileChannel; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.util.DataOutputStreamPlus; +import org.apache.cassandra.net.async.ByteBufDataOutputStreamPlus; +import org.apache.cassandra.streaming.ProgressInfo; +import org.apache.cassandra.streaming.StreamManager; +import org.apache.cassandra.streaming.StreamSession; + +import static org.apache.cassandra.streaming.StreamManager.StreamRateLimiter; +import static org.apache.cassandra.utils.FBUtilities.prettyPrintMemory; + +/** + * CassandraBlockStreamWriter streams the entire SSTable to given channel. + */ +public class CassandraBlockStreamWriter implements IStreamWriter +{ +private static final Logger logger = LoggerFactory.getLogger(CassandraBlockStreamWriter.class); + +private final SSTableReader sstable; +private final ComponentManifest manifest; +private final StreamSession session; +private final StreamRateLimiter limiter; + +public CassandraBlockStreamWriter(SSTableReader sstable, StreamSession session, ComponentManifest manifest) +{ +this.session = session; +this.sstable = sstable; +this.manifest = manifest; +this.limiter = StreamManager.getRateLimiter(session.peer); +} + +/** + * Stream the entire file to given channel. + * + * + * @param output where this writes data to + * @throws IOException on any I/O error + */ +@Override +public void write(DataOutputStreamPlus output) throws IOException +{ +long totalSize = manifest.totalSize(); +logger.debug("[Stream #{}] Start streaming sstable {} to {}, repairedAt = {}, totalSize = {}", + session.planId(), + sstable.getFilename(), + session.peer, + sstable.getSSTableMetadata().repairedAt, + prettyPrintMemory(totalSize)); + +long progress = 0L; +ByteBufDataOutputStreamPlus byteBufDataOutputStreamPlus = (ByteBufDataOutputStreamPlus) output; + +for (Component component : manifest.components()) +{ +@SuppressWarnings("resource") // this is closed after the file is transferred by ByteBufDataOutputStreamPlus +FileChannel in = new RandomAccessFile(sstable.descriptor.filenameFor(component), "r").getChannel(); + +// Total Length to transmit for this file +long length = in.size(); + +// tracks write progress +logger.debug("[Stream #{}] Block streaming {}.{} gen {} component {} size {}", session.planId(), + sstable.getKeyspaceName(), + sstable.getColumnFamilyName(), + sstable.descriptor.generation, + component, length); --- End diff -- Fixed. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #239: [CASSANDRA-14556] Optimize Streaming
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r205532049 --- Diff: test/unit/org/apache/cassandra/db/streaming/CassandraStreamHeaderTest.java --- @@ -43,8 +51,38 @@ public void serializerTest() new ArrayList<>(), ((CompressionMetadata) null), 0, - SerializationHeader.makeWithoutStats(metadata).toComponent()); + SerializationHeader.makeWithoutStats(metadata).toComponent(), + metadata.id); SerializationUtils.assertSerializationCycle(header, CassandraStreamHeader.serializer); } + +@Test +public void serializerTest_FullSSTableTransfer() +{ +String ddl = "CREATE TABLE tbl (k INT PRIMARY KEY, v INT)"; +TableMetadata metadata = CreateTableStatement.parse(ddl, "ks").build(); + +ComponentManifest manifest = new ComponentManifest(new HashMap(ImmutableMap.of(Component.DATA, 100L))); --- End diff -- Fixed. Not sure why I did this in the first place. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #239: [CASSANDRA-14556] Optimize Streaming
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r205532001 --- Diff: src/java/org/apache/cassandra/db/streaming/CassandraStreamHeader.java --- @@ -183,9 +261,26 @@ public CassandraStreamHeader deserialize(DataInputPlus in, int version) throws I sections.add(new SSTableReader.PartitionPositionBounds(in.readLong(), in.readLong())); CompressionInfo compressionInfo = CompressionInfo.serializer.deserialize(in, version); int sstableLevel = in.readInt(); + SerializationHeader.Component header = SerializationHeader.serializer.deserialize(sstableVersion, in); -return new CassandraStreamHeader(sstableVersion, format, estimatedKeys, sections, compressionInfo, sstableLevel, header); +TableId tableId = TableId.deserialize(in); +boolean fullStream = in.readBoolean(); +ComponentManifest manifest = null; +DecoratedKey firstKey = null; + +if (fullStream) +{ +manifest = ComponentManifest.serializer.deserialize(in, version); +ByteBuffer keyBuf = ByteBufferUtil.readWithVIntLength(in); +IPartitioner partitioner = partitionerMapper.apply(tableId); +if (partitioner == null) +throw new IllegalArgumentException(String.format("Could not determine partitioner for tableId {}", tableId)); --- End diff -- Fixed. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra-dtest pull request #31: CASSANDRA-9608 fix jmxutils.py
Github user snazy closed the pull request at: https://github.com/apache/cassandra-dtest/pull/31 --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra-dtest issue #31: CASSANDRA-9608 fix jmxutils.py
Github user snazy commented on the issue: https://github.com/apache/cassandra-dtest/pull/31 Thanks! Committed as f45a06b2efd08e9971d29b0e15c9ba388e4ae6bd --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #239: [CASSANDRA-14556] Optimize Streaming
Github user iamaleksey commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r205504297 --- Diff: src/java/org/apache/cassandra/db/streaming/CassandraStreamHeader.java --- @@ -65,18 +85,43 @@ private CassandraStreamHeader(Version version, SSTableFormat.Type format, long e this.compressionInfo = compressionInfo; this.sstableLevel = sstableLevel; this.header = header; - +this.fullStream = fullStream; +this.componentManifest = componentManifest; +this.firstKey = firstKey; +this.tableId = tableId; this.size = calculateSize(); } -public CassandraStreamHeader(Version version, SSTableFormat.Type format, long estimatedKeys, List sections, CompressionMetadata compressionMetadata, int sstableLevel, SerializationHeader.Component header) +private CassandraStreamHeader(Version version, SSTableFormat.Type format, long estimatedKeys, --- End diff -- The introduction of the new fields and constructors got us to 5 constructors total with up to 10 arguments, which is no longer manageable, and calls for a builder. It's boring and tedious work, so I did it myself and pushed here - https://github.com/iamaleksey/cassandra/commit/321d21747faa46afcf34518ebdeb811f2a805de8 - please feel free to cherry-pick. In addition to introducing the builder, the commit renames `fullStream` to something a bit more meaningful (`isEntireSSTable`) that clearly reflects what's actually happening, fixes a bug in `serializedSize()` where compression info isn't initialized, and removes some fields without `toString()` implementations from header's own `toString()`. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #239: [CASSANDRA-14556] Optimize Streaming
Github user iamaleksey commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r205450061 --- Diff: src/java/org/apache/cassandra/db/streaming/CassandraBlockStreamWriter.java --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.streaming; + +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.channels.FileChannel; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.util.DataOutputStreamPlus; +import org.apache.cassandra.net.async.ByteBufDataOutputStreamPlus; +import org.apache.cassandra.streaming.ProgressInfo; +import org.apache.cassandra.streaming.StreamManager; +import org.apache.cassandra.streaming.StreamSession; + +import static org.apache.cassandra.streaming.StreamManager.StreamRateLimiter; +import static org.apache.cassandra.utils.FBUtilities.prettyPrintMemory; + +/** + * CassandraBlockStreamWriter streams the entire SSTable to given channel. + */ +public class CassandraBlockStreamWriter implements IStreamWriter +{ +private static final Logger logger = LoggerFactory.getLogger(CassandraBlockStreamWriter.class); + +private final SSTableReader sstable; +private final ComponentManifest manifest; +private final StreamSession session; +private final StreamRateLimiter limiter; + +public CassandraBlockStreamWriter(SSTableReader sstable, StreamSession session, ComponentManifest manifest) +{ +this.session = session; +this.sstable = sstable; +this.manifest = manifest; +this.limiter = StreamManager.getRateLimiter(session.peer); +} + +/** + * Stream the entire file to given channel. + * + * + * @param output where this writes data to + * @throws IOException on any I/O error + */ +@Override +public void write(DataOutputStreamPlus output) throws IOException +{ +long totalSize = manifest.totalSize(); +logger.debug("[Stream #{}] Start streaming sstable {} to {}, repairedAt = {}, totalSize = {}", + session.planId(), + sstable.getFilename(), + session.peer, + sstable.getSSTableMetadata().repairedAt, + prettyPrintMemory(totalSize)); + +long progress = 0L; +ByteBufDataOutputStreamPlus byteBufDataOutputStreamPlus = (ByteBufDataOutputStreamPlus) output; + +for (Component component : manifest.components()) +{ +@SuppressWarnings("resource") // this is closed after the file is transferred by ByteBufDataOutputStreamPlus +FileChannel in = new RandomAccessFile(sstable.descriptor.filenameFor(component), "r").getChannel(); + +// Total Length to transmit for this file +long length = in.size(); + +// tracks write progress +logger.debug("[Stream #{}] Block streaming {}.{} gen {} component {} size {}", session.planId(), + sstable.getKeyspaceName(), + sstable.getColumnFamilyName(), + sstable.descriptor.generation, + component, length); --- End diff -- `prettyPrintMemory()` missing here for `length`. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #239: [CASSANDRA-14556] Optimize Streaming
Github user iamaleksey commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r205445354 --- Diff: src/java/org/apache/cassandra/db/streaming/CassandraStreamHeader.java --- @@ -183,9 +261,26 @@ public CassandraStreamHeader deserialize(DataInputPlus in, int version) throws I sections.add(new SSTableReader.PartitionPositionBounds(in.readLong(), in.readLong())); CompressionInfo compressionInfo = CompressionInfo.serializer.deserialize(in, version); int sstableLevel = in.readInt(); + SerializationHeader.Component header = SerializationHeader.serializer.deserialize(sstableVersion, in); -return new CassandraStreamHeader(sstableVersion, format, estimatedKeys, sections, compressionInfo, sstableLevel, header); +TableId tableId = TableId.deserialize(in); +boolean fullStream = in.readBoolean(); +ComponentManifest manifest = null; +DecoratedKey firstKey = null; + +if (fullStream) +{ +manifest = ComponentManifest.serializer.deserialize(in, version); +ByteBuffer keyBuf = ByteBufferUtil.readWithVIntLength(in); +IPartitioner partitioner = partitionerMapper.apply(tableId); +if (partitioner == null) +throw new IllegalArgumentException(String.format("Could not determine partitioner for tableId {}", tableId)); --- End diff -- Another instance of `String.format()` format string with `{}` instead of `%s`, looks like. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #239: [CASSANDRA-14556] Optimize Streaming
Github user iamaleksey commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r205416649 --- Diff: src/java/org/apache/cassandra/io/sstable/format/big/BigTableBlockWriter.java --- @@ -48,51 +47,61 @@ import org.apache.cassandra.schema.TableId; import org.apache.cassandra.schema.TableMetadataRef; +import static java.lang.String.format; +import static org.apache.cassandra.utils.FBUtilities.prettyPrintMemory; + public class BigTableBlockWriter extends SSTable implements SSTableMultiWriter { +private static final Logger logger = LoggerFactory.getLogger(BigTableBlockWriter.class); + private final TableMetadataRef metadata; -private final LifecycleTransaction txn; private volatile SSTableReader finalReader; private final Map componentWriters; -private final Logger logger = LoggerFactory.getLogger(BigTableBlockWriter.class); - -private final SequentialWriterOption writerOption = SequentialWriterOption.newBuilder() - .trickleFsync(false) - .bufferSize(2 * 1024 * 1024) - .bufferType(BufferType.OFF_HEAP) - .build(); -public static final ImmutableSet supportedComponents = ImmutableSet.of(Component.DATA, Component.PRIMARY_INDEX, Component.STATS, - Component.COMPRESSION_INFO, Component.FILTER, Component.SUMMARY, - Component.DIGEST, Component.CRC); +private static final SequentialWriterOption WRITER_OPTION = +SequentialWriterOption.newBuilder() + .trickleFsync(false) + .bufferSize(2 << 20) + .bufferType(BufferType.OFF_HEAP) + .build(); + +private static final ImmutableSet SUPPORTED_COMPONENTS = +ImmutableSet.of(Component.DATA, +Component.PRIMARY_INDEX, +Component.SUMMARY, +Component.STATS, +Component.COMPRESSION_INFO, +Component.FILTER, +Component.DIGEST, +Component.CRC); public BigTableBlockWriter(Descriptor descriptor, TableMetadataRef metadata, LifecycleTransaction txn, final Set components) { -super(descriptor, ImmutableSet.copyOf(components), metadata, - DatabaseDescriptor.getDiskOptimizationStrategy()); +super(descriptor, ImmutableSet.copyOf(components), metadata, DatabaseDescriptor.getDiskOptimizationStrategy()); + txn.trackNew(this); this.metadata = metadata; -this.txn = txn; -this.componentWriters = new HashMap<>(components.size()); +this.componentWriters = new EnumMap<>(Component.Type.class); -assert supportedComponents.containsAll(components) : String.format("Unsupported streaming component detected %s", - new HashSet(components).removeAll(supportedComponents)); +if (!SUPPORTED_COMPONENTS.containsAll(components)) +throw new AssertionError(format("Unsupported streaming component detected %s", +Sets.difference(components, SUPPORTED_COMPONENTS))); --- End diff -- Neat. I either forgot, or didn't know that `Sets.difference()` was a thing. This is nicer than the way I proposed (: --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra issue #236: 9608 trunk
Github user snazy commented on the issue: https://github.com/apache/cassandra/pull/236 No, it's no longer needed. I removed it. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra issue #236: 9608 trunk
Github user jasobrown commented on the issue: https://github.com/apache/cassandra/pull/236 Is javaexec.in.sh needed anymore? Looks like all the java checks are in `cassandra.in.sh` now. The file is not referenced by dtests or ccm, either. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra issue #239: [CASSANDRA-14556] Optimize Streaming
Github user dineshjoshi commented on the issue: https://github.com/apache/cassandra/pull/239 @iamaleksey I've addressed your comments including the one about disabling faster streaming for legacy counter shards. I did add a much less expensive check for STCS. It won't get all SSTables accurately but it is way cheaper than what I have for LCS. Let me know your thoughts. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #239: [CASSANDRA-14556] Optimize Streaming
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r205339231 --- Diff: src/java/org/apache/cassandra/db/streaming/CassandraBlockStreamWriter.java --- @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.streaming; + +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.channels.FileChannel; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.util.DataOutputStreamPlus; +import org.apache.cassandra.net.async.ByteBufDataOutputStreamPlus; +import org.apache.cassandra.streaming.ProgressInfo; +import org.apache.cassandra.streaming.StreamManager; +import org.apache.cassandra.streaming.StreamSession; +import org.apache.cassandra.utils.FBUtilities; + +import static org.apache.cassandra.streaming.StreamManager.StreamRateLimiter; + +/** + * CassandraBlockStreamWriter streams the entire SSTable to given channel. + */ +public class CassandraBlockStreamWriter implements IStreamWriter +{ +private static final Logger logger = LoggerFactory.getLogger(CassandraBlockStreamWriter.class); + +private final SSTableReader sstable; +private final ComponentManifest manifest; +private final StreamSession session; +private final StreamRateLimiter limiter; + +public CassandraBlockStreamWriter(SSTableReader sstable, StreamSession session, ComponentManifest manifest) +{ +this.session = session; +this.sstable = sstable; +this.manifest = manifest; +this.limiter = StreamManager.getRateLimiter(session.peer); +} + +/** + * Stream the entire file to given channel. + * + * + * @param output where this writes data to + * @throws IOException on any I/O error + */ +@Override +public void write(DataOutputStreamPlus output) throws IOException +{ +long totalSize = manifest.getTotalSize(); +logger.debug("[Stream #{}] Start streaming sstable {} to {}, repairedAt = {}, totalSize = {}", session.planId(), + sstable.getFilename(), session.peer, sstable.getSSTableMetadata().repairedAt, totalSize); + +long progress = 0L; +ByteBufDataOutputStreamPlus byteBufDataOutputStreamPlus = (ByteBufDataOutputStreamPlus) output; + +for (Component component : manifest.getComponents()) +{ +@SuppressWarnings("resource") // this is closed after the file is transferred by ByteBufDataOutputStreamPlus +FileChannel in = new RandomAccessFile(sstable.descriptor.filenameFor(component), "r").getChannel(); + +// Total Length to transmit for this file +long length = in.size(); + +// tracks write progress +long bytesRead = 0; --- End diff -- I did rename this. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org