[GitHub] cassandra issue #239: [CASSANDRA-14556] Optimize Streaming

2018-07-26 Thread dineshjoshi
Github user dineshjoshi commented on the issue:

https://github.com/apache/cassandra/pull/239
  
@iamaleksey made a few more changes - 

1. Got rid of `IStreamWriter`
2. Ensured we're logging the configuration warning only once at start up 
iff zero copy streaming is enabled
3. Few stylistic changes


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #239: [CASSANDRA-14556] Optimize Streaming

2018-07-26 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/239#discussion_r205646170
  
--- Diff: src/java/org/apache/cassandra/db/streaming/ComponentManifest.java 
---
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.streaming;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.collect.Iterators;
+
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+
+public final class ComponentManifest implements Iterable
+{
+private final LinkedHashMap components;
+
+public ComponentManifest(Map components)
+{
+this.components = new LinkedHashMap<>(components);
+}
+
+public long sizeOf(Component component)
+{
+Long size = components.get(component);
+if (size == null)
+throw new IllegalArgumentException("Component " + component + 
" is not present in the manifest");
+return size;
+}
+
+public long totalSize()
+{
+long totalSize = 0;
+for (Long size : components.values())
+totalSize += size;
+return totalSize;
+}
+
+public List components()
+{
+return new ArrayList<>(components.keySet());
+}
+
+@Override
+public boolean equals(Object o)
+{
+if (this == o)
+return true;
+
+if (!(o instanceof ComponentManifest))
+return false;
+
+ComponentManifest that = (ComponentManifest) o;
+return components.equals(that.components);
+}
+
+@Override
+public int hashCode()
+{
+return components.hashCode();
+}
+
+public static final IVersionedSerializer serializer 
= new IVersionedSerializer()
+{
+public void serialize(ComponentManifest manifest, DataOutputPlus 
out, int version) throws IOException
+{
+out.writeUnsignedVInt(manifest.components.size());
+for (Map.Entry entry : 
manifest.components.entrySet())
+{
+out.writeByte(entry.getKey().type.id);
--- End diff --

Done. I'm just using `component.name`. I think this should be sufficient 
for this PR's scope.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #239: [CASSANDRA-14556] Optimize Streaming

2018-07-26 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/239#discussion_r205639791
  
--- Diff: 
src/java/org/apache/cassandra/db/streaming/CassandraBlockStreamReader.java ---
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.streaming;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+
+import com.google.common.base.Throwables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTableMultiWriter;
+import org.apache.cassandra.io.sstable.format.big.BigTableBlockWriter;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.streaming.ProgressInfo;
+import org.apache.cassandra.streaming.StreamReceiver;
+import org.apache.cassandra.streaming.StreamSession;
+import org.apache.cassandra.streaming.messages.StreamMessageHeader;
+
+import static java.lang.String.format;
+import static org.apache.cassandra.utils.FBUtilities.prettyPrintMemory;
+
+/**
+ * CassandraBlockStreamReader reads SSTable off the wire and writes it to 
disk.
+ */
+public class CassandraBlockStreamReader implements IStreamReader
+{
+private static final Logger logger = 
LoggerFactory.getLogger(CassandraBlockStreamReader.class);
+
+private final TableId tableId;
+private final StreamSession session;
+private final CassandraStreamHeader header;
+private final int fileSequenceNumber;
+
+public CassandraBlockStreamReader(StreamMessageHeader messageHeader, 
CassandraStreamHeader streamHeader, StreamSession session)
+{
+if (session.getPendingRepair() != null)
+{
+// we should only ever be streaming pending repair sstables if 
the session has a pending repair id
+if 
(!session.getPendingRepair().equals(messageHeader.pendingRepair))
+throw new IllegalStateException(format("Stream Session & 
SSTable (%s) pendingRepair UUID mismatch.", messageHeader.tableId));
+}
+
+this.header = streamHeader;
+this.session = session;
+this.tableId = messageHeader.tableId;
+this.fileSequenceNumber = messageHeader.sequenceNumber;
+}
+
+/**
+ * @param inputPlus where this reads data from
+ * @return SSTable transferred
+ * @throws IOException if reading the remote sstable fails. Will throw 
an RTE if local write fails.
+ */
+@SuppressWarnings("resource") // input needs to remain open, streams 
on top of it can't be closed
+@Override
+public SSTableMultiWriter read(DataInputPlus inputPlus) throws 
IOException
+{
+ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(tableId);
+if (cfs == null)
+{
+// schema was dropped during streaming
+throw new IOException("Table " + tableId + " was dropped 
during streaming");
+}
+
+ComponentManifest manifest = header.componentManifest;
+long totalSize = manifest.totalSize();
+
+logger.debug("[Stream #{}] Started receiving sstable #{} from {}, 
size = {}, table = {}",
+ session.planId(),
+ fileSequenceNumber,
+ session.peer,
+ prettyPrintMemory(totalSize),
+ cfs.metadata());
+
+BigTableBlockWriter writer = null;
--- End diff --

Done


---

-

[GitHub] cassandra pull request #239: [CASSANDRA-14556] Optimize Streaming

2018-07-26 Thread iamaleksey
Github user iamaleksey commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/239#discussion_r205599465
  
--- Diff: src/java/org/apache/cassandra/db/streaming/ComponentManifest.java 
---
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.streaming;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.collect.Iterators;
+
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+
+public final class ComponentManifest implements Iterable
+{
+private final LinkedHashMap components;
+
+public ComponentManifest(Map components)
+{
+this.components = new LinkedHashMap<>(components);
+}
+
+public long sizeOf(Component component)
+{
+Long size = components.get(component);
+if (size == null)
+throw new IllegalArgumentException("Component " + component + 
" is not present in the manifest");
+return size;
+}
+
+public long totalSize()
+{
+long totalSize = 0;
+for (Long size : components.values())
+totalSize += size;
+return totalSize;
+}
+
+public List components()
+{
+return new ArrayList<>(components.keySet());
+}
+
+@Override
+public boolean equals(Object o)
+{
+if (this == o)
+return true;
+
+if (!(o instanceof ComponentManifest))
+return false;
+
+ComponentManifest that = (ComponentManifest) o;
+return components.equals(that.components);
+}
+
+@Override
+public int hashCode()
+{
+return components.hashCode();
+}
+
+public static final IVersionedSerializer serializer 
= new IVersionedSerializer()
+{
+public void serialize(ComponentManifest manifest, DataOutputPlus 
out, int version) throws IOException
+{
+out.writeUnsignedVInt(manifest.components.size());
+for (Map.Entry entry : 
manifest.components.entrySet())
+{
+out.writeByte(entry.getKey().type.id);
--- End diff --

FWIW, I realize that for most components this will be a bit redundant. 
Technically it's sufficient to just store `component.name`, and get the full 
`Component` via `Component.parse()`. If you don't like redundancy and want to 
do it that way, that's perfectly fine too - I'm cool with either option.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #239: [CASSANDRA-14556] Optimize Streaming

2018-07-26 Thread iamaleksey
Github user iamaleksey commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/239#discussion_r205587936
  
--- Diff: 
src/java/org/apache/cassandra/db/streaming/CassandraBlockStreamReader.java ---
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.streaming;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+
+import com.google.common.base.Throwables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTableMultiWriter;
+import org.apache.cassandra.io.sstable.format.big.BigTableBlockWriter;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.streaming.ProgressInfo;
+import org.apache.cassandra.streaming.StreamReceiver;
+import org.apache.cassandra.streaming.StreamSession;
+import org.apache.cassandra.streaming.messages.StreamMessageHeader;
+
+import static java.lang.String.format;
+import static org.apache.cassandra.utils.FBUtilities.prettyPrintMemory;
+
+/**
+ * CassandraBlockStreamReader reads SSTable off the wire and writes it to 
disk.
+ */
+public class CassandraBlockStreamReader implements IStreamReader
+{
+private static final Logger logger = 
LoggerFactory.getLogger(CassandraBlockStreamReader.class);
+
+private final TableId tableId;
+private final StreamSession session;
+private final CassandraStreamHeader header;
+private final int fileSequenceNumber;
+
+public CassandraBlockStreamReader(StreamMessageHeader messageHeader, 
CassandraStreamHeader streamHeader, StreamSession session)
+{
+if (session.getPendingRepair() != null)
+{
+// we should only ever be streaming pending repair sstables if 
the session has a pending repair id
+if 
(!session.getPendingRepair().equals(messageHeader.pendingRepair))
+throw new IllegalStateException(format("Stream Session & 
SSTable (%s) pendingRepair UUID mismatch.", messageHeader.tableId));
+}
+
+this.header = streamHeader;
+this.session = session;
+this.tableId = messageHeader.tableId;
+this.fileSequenceNumber = messageHeader.sequenceNumber;
+}
+
+/**
+ * @param inputPlus where this reads data from
+ * @return SSTable transferred
+ * @throws IOException if reading the remote sstable fails. Will throw 
an RTE if local write fails.
+ */
+@SuppressWarnings("resource") // input needs to remain open, streams 
on top of it can't be closed
+@Override
+public SSTableMultiWriter read(DataInputPlus inputPlus) throws 
IOException
+{
+ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(tableId);
+if (cfs == null)
+{
+// schema was dropped during streaming
+throw new IOException("Table " + tableId + " was dropped 
during streaming");
+}
+
+ComponentManifest manifest = header.componentManifest;
+long totalSize = manifest.totalSize();
+
+logger.debug("[Stream #{}] Started receiving sstable #{} from {}, 
size = {}, table = {}",
+ session.planId(),
+ fileSequenceNumber,
+ session.peer,
+ prettyPrintMemory(totalSize),
+ cfs.metadata());
+
+BigTableBlockWriter writer = null;
--- End diff --

It's likely that we'll have more formats in near future. I don't suggest we 
make 

[GitHub] cassandra pull request #239: [CASSANDRA-14556] Optimize Streaming

2018-07-26 Thread iamaleksey
Github user iamaleksey commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/239#discussion_r205586651
  
--- Diff: src/java/org/apache/cassandra/db/streaming/ComponentManifest.java 
---
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.streaming;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.collect.Iterators;
+
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+
+public final class ComponentManifest implements Iterable
+{
+private final LinkedHashMap components;
+
+public ComponentManifest(Map components)
+{
+this.components = new LinkedHashMap<>(components);
+}
+
+public long sizeOf(Component component)
+{
+Long size = components.get(component);
+if (size == null)
+throw new IllegalArgumentException("Component " + component + 
" is not present in the manifest");
+return size;
+}
+
+public long totalSize()
+{
+long totalSize = 0;
+for (Long size : components.values())
+totalSize += size;
+return totalSize;
+}
+
+public List components()
+{
+return new ArrayList<>(components.keySet());
+}
+
+@Override
+public boolean equals(Object o)
+{
+if (this == o)
+return true;
+
+if (!(o instanceof ComponentManifest))
+return false;
+
+ComponentManifest that = (ComponentManifest) o;
+return components.equals(that.components);
+}
+
+@Override
+public int hashCode()
+{
+return components.hashCode();
+}
+
+public static final IVersionedSerializer serializer 
= new IVersionedSerializer()
+{
+public void serialize(ComponentManifest manifest, DataOutputPlus 
out, int version) throws IOException
+{
+out.writeUnsignedVInt(manifest.components.size());
+for (Map.Entry entry : 
manifest.components.entrySet())
+{
+out.writeByte(entry.getKey().type.id);
--- End diff --

Talked to @dineshjoshi offline, and we realised that this is incomplete - 
and neither was my proposed version. For completeness, when want to serialize 
the whole component info, not just its type. And it has two important fields - 
type and name. Name will usually be derived from the type, but not always. And 
even though we don't support streaming those components (custom and SI), we 
might want to change it in the future, and the protocol should allow it.

So I suggest we encode`component.type.name()`, the full enum name, followed 
by `component.name()`. It's a little heavier, but this is completely irrelevant 
in the big picture, size-wise.

The upside is that we can handle encode/decode any component necessary in 
the future, loss-free. And, again, we don't really need to assign ids. 
`valueOf()` is plenty good, and allows extension without overlap risk like in 
`Verb`.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra issue #236: 9608 trunk

2018-07-26 Thread jasobrown
Github user jasobrown commented on the issue:

https://github.com/apache/cassandra/pull/236
  
Committed as sha `6ba2fb9395226491872b41312d978a169f36fcdb`


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #236: 9608 trunk

2018-07-26 Thread jasobrown
Github user jasobrown closed the pull request at:

https://github.com/apache/cassandra/pull/236


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #239: [CASSANDRA-14556] Optimize Streaming

2018-07-26 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/239#discussion_r205532095
  
--- Diff: 
src/java/org/apache/cassandra/db/streaming/CassandraBlockStreamWriter.java ---
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.streaming;
+
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
+import org.apache.cassandra.net.async.ByteBufDataOutputStreamPlus;
+import org.apache.cassandra.streaming.ProgressInfo;
+import org.apache.cassandra.streaming.StreamManager;
+import org.apache.cassandra.streaming.StreamSession;
+
+import static 
org.apache.cassandra.streaming.StreamManager.StreamRateLimiter;
+import static org.apache.cassandra.utils.FBUtilities.prettyPrintMemory;
+
+/**
+ * CassandraBlockStreamWriter streams the entire SSTable to given channel.
+ */
+public class CassandraBlockStreamWriter implements IStreamWriter
+{
+private static final Logger logger = 
LoggerFactory.getLogger(CassandraBlockStreamWriter.class);
+
+private final SSTableReader sstable;
+private final ComponentManifest manifest;
+private final StreamSession session;
+private final StreamRateLimiter limiter;
+
+public CassandraBlockStreamWriter(SSTableReader sstable, StreamSession 
session, ComponentManifest manifest)
+{
+this.session = session;
+this.sstable = sstable;
+this.manifest = manifest;
+this.limiter =  StreamManager.getRateLimiter(session.peer);
+}
+
+/**
+ * Stream the entire file to given channel.
+ * 
+ *
+ * @param output where this writes data to
+ * @throws IOException on any I/O error
+ */
+@Override
+public void write(DataOutputStreamPlus output) throws IOException
+{
+long totalSize = manifest.totalSize();
+logger.debug("[Stream #{}] Start streaming sstable {} to {}, 
repairedAt = {}, totalSize = {}",
+ session.planId(),
+ sstable.getFilename(),
+ session.peer,
+ sstable.getSSTableMetadata().repairedAt,
+ prettyPrintMemory(totalSize));
+
+long progress = 0L;
+ByteBufDataOutputStreamPlus byteBufDataOutputStreamPlus = 
(ByteBufDataOutputStreamPlus) output;
+
+for (Component component : manifest.components())
+{
+@SuppressWarnings("resource") // this is closed after the file 
is transferred by ByteBufDataOutputStreamPlus
+FileChannel in = new 
RandomAccessFile(sstable.descriptor.filenameFor(component), "r").getChannel();
+
+// Total Length to transmit for this file
+long length = in.size();
+
+// tracks write progress
+logger.debug("[Stream #{}] Block streaming {}.{} gen {} 
component {} size {}", session.planId(),
+ sstable.getKeyspaceName(),
+ sstable.getColumnFamilyName(),
+ sstable.descriptor.generation,
+ component, length);
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #239: [CASSANDRA-14556] Optimize Streaming

2018-07-26 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/239#discussion_r205532049
  
--- Diff: 
test/unit/org/apache/cassandra/db/streaming/CassandraStreamHeaderTest.java ---
@@ -43,8 +51,38 @@ public void serializerTest()
  new 
ArrayList<>(),
  
((CompressionMetadata) null),
  0,
- 
SerializationHeader.makeWithoutStats(metadata).toComponent());
+ 
SerializationHeader.makeWithoutStats(metadata).toComponent(),
+ 
metadata.id);
 
 SerializationUtils.assertSerializationCycle(header, 
CassandraStreamHeader.serializer);
 }
+
+@Test
+public void serializerTest_FullSSTableTransfer()
+{
+String ddl = "CREATE TABLE tbl (k INT PRIMARY KEY, v INT)";
+TableMetadata metadata = CreateTableStatement.parse(ddl, 
"ks").build();
+
+ComponentManifest manifest = new ComponentManifest(new 
HashMap(ImmutableMap.of(Component.DATA, 100L)));
--- End diff --

Fixed. Not sure why I did this in the first place.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #239: [CASSANDRA-14556] Optimize Streaming

2018-07-26 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/239#discussion_r205532001
  
--- Diff: 
src/java/org/apache/cassandra/db/streaming/CassandraStreamHeader.java ---
@@ -183,9 +261,26 @@ public CassandraStreamHeader deserialize(DataInputPlus 
in, int version) throws I
 sections.add(new 
SSTableReader.PartitionPositionBounds(in.readLong(), in.readLong()));
 CompressionInfo compressionInfo = 
CompressionInfo.serializer.deserialize(in, version);
 int sstableLevel = in.readInt();
+
 SerializationHeader.Component header =  
SerializationHeader.serializer.deserialize(sstableVersion, in);
 
-return new CassandraStreamHeader(sstableVersion, format, 
estimatedKeys, sections, compressionInfo, sstableLevel, header);
+TableId tableId = TableId.deserialize(in);
+boolean fullStream = in.readBoolean();
+ComponentManifest manifest = null;
+DecoratedKey firstKey = null;
+
+if (fullStream)
+{
+manifest = ComponentManifest.serializer.deserialize(in, 
version);
+ByteBuffer keyBuf = ByteBufferUtil.readWithVIntLength(in);
+IPartitioner partitioner = 
partitionerMapper.apply(tableId);
+if (partitioner == null)
+throw new 
IllegalArgumentException(String.format("Could not determine partitioner for 
tableId {}", tableId));
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra-dtest pull request #31: CASSANDRA-9608 fix jmxutils.py

2018-07-26 Thread snazy
Github user snazy closed the pull request at:

https://github.com/apache/cassandra-dtest/pull/31


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra-dtest issue #31: CASSANDRA-9608 fix jmxutils.py

2018-07-26 Thread snazy
Github user snazy commented on the issue:

https://github.com/apache/cassandra-dtest/pull/31
  
Thanks!

Committed as f45a06b2efd08e9971d29b0e15c9ba388e4ae6bd


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #239: [CASSANDRA-14556] Optimize Streaming

2018-07-26 Thread iamaleksey
Github user iamaleksey commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/239#discussion_r205504297
  
--- Diff: 
src/java/org/apache/cassandra/db/streaming/CassandraStreamHeader.java ---
@@ -65,18 +85,43 @@ private CassandraStreamHeader(Version version, 
SSTableFormat.Type format, long e
 this.compressionInfo = compressionInfo;
 this.sstableLevel = sstableLevel;
 this.header = header;
-
+this.fullStream = fullStream;
+this.componentManifest = componentManifest;
+this.firstKey = firstKey;
+this.tableId = tableId;
 this.size = calculateSize();
 }
 
-public CassandraStreamHeader(Version version, SSTableFormat.Type 
format, long estimatedKeys, List 
sections, CompressionMetadata compressionMetadata, int sstableLevel, 
SerializationHeader.Component header)
+private CassandraStreamHeader(Version version, SSTableFormat.Type 
format, long estimatedKeys,
--- End diff --

The introduction of the new fields and constructors got us to 5 
constructors total with up to 10 arguments, which is no longer manageable, and 
calls for a builder. It's boring and tedious work, so I did it myself and 
pushed here - 

https://github.com/iamaleksey/cassandra/commit/321d21747faa46afcf34518ebdeb811f2a805de8
 - please feel free to cherry-pick.

In addition to introducing the builder, the commit renames `fullStream` to 
something a bit more meaningful (`isEntireSSTable`) that clearly reflects 
what's actually happening, fixes a bug in `serializedSize()` where compression 
info isn't initialized, and removes some fields without `toString()` 
implementations from header's own `toString()`.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #239: [CASSANDRA-14556] Optimize Streaming

2018-07-26 Thread iamaleksey
Github user iamaleksey commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/239#discussion_r205450061
  
--- Diff: 
src/java/org/apache/cassandra/db/streaming/CassandraBlockStreamWriter.java ---
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.streaming;
+
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
+import org.apache.cassandra.net.async.ByteBufDataOutputStreamPlus;
+import org.apache.cassandra.streaming.ProgressInfo;
+import org.apache.cassandra.streaming.StreamManager;
+import org.apache.cassandra.streaming.StreamSession;
+
+import static 
org.apache.cassandra.streaming.StreamManager.StreamRateLimiter;
+import static org.apache.cassandra.utils.FBUtilities.prettyPrintMemory;
+
+/**
+ * CassandraBlockStreamWriter streams the entire SSTable to given channel.
+ */
+public class CassandraBlockStreamWriter implements IStreamWriter
+{
+private static final Logger logger = 
LoggerFactory.getLogger(CassandraBlockStreamWriter.class);
+
+private final SSTableReader sstable;
+private final ComponentManifest manifest;
+private final StreamSession session;
+private final StreamRateLimiter limiter;
+
+public CassandraBlockStreamWriter(SSTableReader sstable, StreamSession 
session, ComponentManifest manifest)
+{
+this.session = session;
+this.sstable = sstable;
+this.manifest = manifest;
+this.limiter =  StreamManager.getRateLimiter(session.peer);
+}
+
+/**
+ * Stream the entire file to given channel.
+ * 
+ *
+ * @param output where this writes data to
+ * @throws IOException on any I/O error
+ */
+@Override
+public void write(DataOutputStreamPlus output) throws IOException
+{
+long totalSize = manifest.totalSize();
+logger.debug("[Stream #{}] Start streaming sstable {} to {}, 
repairedAt = {}, totalSize = {}",
+ session.planId(),
+ sstable.getFilename(),
+ session.peer,
+ sstable.getSSTableMetadata().repairedAt,
+ prettyPrintMemory(totalSize));
+
+long progress = 0L;
+ByteBufDataOutputStreamPlus byteBufDataOutputStreamPlus = 
(ByteBufDataOutputStreamPlus) output;
+
+for (Component component : manifest.components())
+{
+@SuppressWarnings("resource") // this is closed after the file 
is transferred by ByteBufDataOutputStreamPlus
+FileChannel in = new 
RandomAccessFile(sstable.descriptor.filenameFor(component), "r").getChannel();
+
+// Total Length to transmit for this file
+long length = in.size();
+
+// tracks write progress
+logger.debug("[Stream #{}] Block streaming {}.{} gen {} 
component {} size {}", session.planId(),
+ sstable.getKeyspaceName(),
+ sstable.getColumnFamilyName(),
+ sstable.descriptor.generation,
+ component, length);
--- End diff --

`prettyPrintMemory()` missing here for `length`.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #239: [CASSANDRA-14556] Optimize Streaming

2018-07-26 Thread iamaleksey
Github user iamaleksey commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/239#discussion_r205445354
  
--- Diff: 
src/java/org/apache/cassandra/db/streaming/CassandraStreamHeader.java ---
@@ -183,9 +261,26 @@ public CassandraStreamHeader deserialize(DataInputPlus 
in, int version) throws I
 sections.add(new 
SSTableReader.PartitionPositionBounds(in.readLong(), in.readLong()));
 CompressionInfo compressionInfo = 
CompressionInfo.serializer.deserialize(in, version);
 int sstableLevel = in.readInt();
+
 SerializationHeader.Component header =  
SerializationHeader.serializer.deserialize(sstableVersion, in);
 
-return new CassandraStreamHeader(sstableVersion, format, 
estimatedKeys, sections, compressionInfo, sstableLevel, header);
+TableId tableId = TableId.deserialize(in);
+boolean fullStream = in.readBoolean();
+ComponentManifest manifest = null;
+DecoratedKey firstKey = null;
+
+if (fullStream)
+{
+manifest = ComponentManifest.serializer.deserialize(in, 
version);
+ByteBuffer keyBuf = ByteBufferUtil.readWithVIntLength(in);
+IPartitioner partitioner = 
partitionerMapper.apply(tableId);
+if (partitioner == null)
+throw new 
IllegalArgumentException(String.format("Could not determine partitioner for 
tableId {}", tableId));
--- End diff --

Another instance of `String.format()` format string with `{}` instead of 
`%s`, looks like.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #239: [CASSANDRA-14556] Optimize Streaming

2018-07-26 Thread iamaleksey
Github user iamaleksey commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/239#discussion_r205416649
  
--- Diff: 
src/java/org/apache/cassandra/io/sstable/format/big/BigTableBlockWriter.java ---
@@ -48,51 +47,61 @@
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.schema.TableMetadataRef;
 
+import static java.lang.String.format;
+import static org.apache.cassandra.utils.FBUtilities.prettyPrintMemory;
+
 public class BigTableBlockWriter extends SSTable implements 
SSTableMultiWriter
 {
+private static final Logger logger = 
LoggerFactory.getLogger(BigTableBlockWriter.class);
+
 private final TableMetadataRef metadata;
-private final LifecycleTransaction txn;
 private volatile SSTableReader finalReader;
 private final Map componentWriters;
 
-private final Logger logger = 
LoggerFactory.getLogger(BigTableBlockWriter.class);
-
-private final SequentialWriterOption writerOption = 
SequentialWriterOption.newBuilder()
-   
   .trickleFsync(false)
-   
   .bufferSize(2 * 1024 * 1024)
-   
   .bufferType(BufferType.OFF_HEAP)
-   
   .build();
-public static final ImmutableSet supportedComponents = 
ImmutableSet.of(Component.DATA, Component.PRIMARY_INDEX, Component.STATS,
-   
Component.COMPRESSION_INFO, Component.FILTER, Component.SUMMARY,
-   
Component.DIGEST, Component.CRC);
+private static final SequentialWriterOption WRITER_OPTION =
+SequentialWriterOption.newBuilder()
+  .trickleFsync(false)
+  .bufferSize(2 << 20)
+  .bufferType(BufferType.OFF_HEAP)
+  .build();
+
+private static final ImmutableSet SUPPORTED_COMPONENTS =
+ImmutableSet.of(Component.DATA,
+Component.PRIMARY_INDEX,
+Component.SUMMARY,
+Component.STATS,
+Component.COMPRESSION_INFO,
+Component.FILTER,
+Component.DIGEST,
+Component.CRC);
 
 public BigTableBlockWriter(Descriptor descriptor,
TableMetadataRef metadata,
LifecycleTransaction txn,
final Set components)
 {
-super(descriptor, ImmutableSet.copyOf(components), metadata,
-  DatabaseDescriptor.getDiskOptimizationStrategy());
+super(descriptor, ImmutableSet.copyOf(components), metadata, 
DatabaseDescriptor.getDiskOptimizationStrategy());
+
 txn.trackNew(this);
 this.metadata = metadata;
-this.txn = txn;
-this.componentWriters = new HashMap<>(components.size());
+this.componentWriters = new EnumMap<>(Component.Type.class);
 
-assert supportedComponents.containsAll(components) : 
String.format("Unsupported streaming component detected %s",
-   
new HashSet(components).removeAll(supportedComponents));
+if (!SUPPORTED_COMPONENTS.containsAll(components))
+throw new AssertionError(format("Unsupported streaming 
component detected %s",
+Sets.difference(components, 
SUPPORTED_COMPONENTS)));
--- End diff --

Neat. I either forgot, or didn't know that `Sets.difference()` was a thing. 
This is nicer than the way I proposed (:


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra issue #236: 9608 trunk

2018-07-26 Thread snazy
Github user snazy commented on the issue:

https://github.com/apache/cassandra/pull/236
  
No, it's no longer needed. I removed it.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra issue #236: 9608 trunk

2018-07-26 Thread jasobrown
Github user jasobrown commented on the issue:

https://github.com/apache/cassandra/pull/236
  
Is javaexec.in.sh needed anymore? Looks like all the java checks are in 
`cassandra.in.sh` now. The file is not referenced by dtests or ccm, either.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra issue #239: [CASSANDRA-14556] Optimize Streaming

2018-07-26 Thread dineshjoshi
Github user dineshjoshi commented on the issue:

https://github.com/apache/cassandra/pull/239
  
@iamaleksey I've addressed your comments including the one about disabling 
faster streaming for legacy counter shards.

I did add a much less expensive check for STCS. It won't get all SSTables 
accurately but it is way cheaper than what I have for LCS. Let me know your 
thoughts.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #239: [CASSANDRA-14556] Optimize Streaming

2018-07-26 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/239#discussion_r205339231
  
--- Diff: 
src/java/org/apache/cassandra/db/streaming/CassandraBlockStreamWriter.java ---
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.streaming;
+
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
+import org.apache.cassandra.net.async.ByteBufDataOutputStreamPlus;
+import org.apache.cassandra.streaming.ProgressInfo;
+import org.apache.cassandra.streaming.StreamManager;
+import org.apache.cassandra.streaming.StreamSession;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static 
org.apache.cassandra.streaming.StreamManager.StreamRateLimiter;
+
+/**
+ * CassandraBlockStreamWriter streams the entire SSTable to given channel.
+ */
+public class CassandraBlockStreamWriter implements IStreamWriter
+{
+private static final Logger logger = 
LoggerFactory.getLogger(CassandraBlockStreamWriter.class);
+
+private final SSTableReader sstable;
+private final ComponentManifest manifest;
+private final StreamSession session;
+private final StreamRateLimiter limiter;
+
+public CassandraBlockStreamWriter(SSTableReader sstable, StreamSession 
session, ComponentManifest manifest)
+{
+this.session = session;
+this.sstable = sstable;
+this.manifest = manifest;
+this.limiter =  StreamManager.getRateLimiter(session.peer);
+}
+
+/**
+ * Stream the entire file to given channel.
+ * 
+ *
+ * @param output where this writes data to
+ * @throws IOException on any I/O error
+ */
+@Override
+public void write(DataOutputStreamPlus output) throws IOException
+{
+long totalSize = manifest.getTotalSize();
+logger.debug("[Stream #{}] Start streaming sstable {} to {}, 
repairedAt = {}, totalSize = {}", session.planId(),
+ sstable.getFilename(), session.peer, 
sstable.getSSTableMetadata().repairedAt, totalSize);
+
+long progress = 0L;
+ByteBufDataOutputStreamPlus byteBufDataOutputStreamPlus = 
(ByteBufDataOutputStreamPlus) output;
+
+for (Component component : manifest.getComponents())
+{
+@SuppressWarnings("resource") // this is closed after the file 
is transferred by ByteBufDataOutputStreamPlus
+FileChannel in = new 
RandomAccessFile(sstable.descriptor.filenameFor(component), "r").getChannel();
+
+// Total Length to transmit for this file
+long length = in.size();
+
+// tracks write progress
+long bytesRead = 0;
--- End diff --

I did rename this.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org