[GitHub] cassandra-dtest pull request #41: 14421

2019-01-07 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra-dtest/pull/41#discussion_r245848691
  
--- Diff: upgrade_tests/thrift_upgrade_test.py ---
@@ -350,29 +469,41 @@ def test_sparse_supercolumn_with_renames(self):
 client.transport.open()
 client.set_keyspace('ks')
 
-cf = _create_sparse_super_cf('sparse_super_1')
-client.system_add_column_family(cf)
+_create_sparse_super_cf(client, 'sparse_super_1')
+
+#The alter after was failing claiming the column family didn't 
exist.
+#This test is so slow and it's Thrift and going away "soon" and we
+#Don't run it on every commit so just sleep
+import time
+time.sleep(5)
--- End diff --

Do we want to sleep for a fixed duration or keep retrying patiently until 
it succeeds?


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra-dtest pull request #41: 14421

2019-01-07 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra-dtest/pull/41#discussion_r245846732
  
--- Diff: requirements.txt ---
@@ -1,6 +1,6 @@
 -e 
git+https://github.com/datastax/python-driver.git@cassandra-test#egg=cassandra-driver
 # Used ccm version is tracked by cassandra-test branch in ccm repo. Please 
create a PR there for fixes or upgrades to new releases.
--e git+https://github.com/riptano/ccm.git@cassandra-test#egg=ccm
+-e git+https://github.com/aweisberg/ccm.git@cassandra-test#egg=ccm
--- End diff --

Could you ensure that you change this when outstanding changes in upstream 
`ccm` are merged?


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra-dtest pull request #41: 14421

2019-01-07 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra-dtest/pull/41#discussion_r245848315
  
--- Diff: upgrade_tests/thrift_upgrade_test.py ---
@@ -10,99 +10,182 @@
ColumnParent, ConsistencyLevel,
SlicePredicate, SliceRange)
 from thrift_test import _i64, get_thrift_client
-from tools.assertions import assert_length_equal
+from tools.assertions import assert_length_equal, 
assert_lists_of_dicts_equal
+from tools.misc import wait_for_agreement, add_skip
 from .upgrade_base import UpgradeTester
 from .upgrade_manifest import build_upgrade_pairs
 
 since = pytest.mark.since
 logger = logging.getLogger(__name__)
 
 
-def _create_dense_super_cf(name):
-return Cassandra.CfDef('ks', name, column_type='Super',
+def _create_dense_super_cf(thrift, name):
+cfdef = Cassandra.CfDef('ks', name, column_type='Super',
key_validation_class='AsciiType',# pk
comparator_type='AsciiType', # ck
default_validation_class='AsciiType',# SC 
value
subcomparator_type='LongType')   # SC 
key
+thrift.system_add_column_family(cfdef)
+wait_for_agreement(thrift)
 
 
-def _create_sparse_super_cf(name):
-cd1 = ColumnDef('col1', 'LongType', None, None)
-cd2 = ColumnDef('col2', 'LongType', None, None)
-return Cassandra.CfDef('ks', name, column_type='Super',
+def _create_sparse_super_cf(thrift, name):
+cd1 = ColumnDef('col1'.encode(), 'LongType', None, None)
+cd2 = ColumnDef('col2'.encode(), 'LongType', None, None)
+cfdef = Cassandra.CfDef('ks', name, column_type='Super',
column_metadata=[cd1, cd2],
key_validation_class='AsciiType',
comparator_type='AsciiType',
subcomparator_type='AsciiType')
+thrift.system_add_column_family(cfdef)
+wait_for_agreement(thrift)
 
 
-def _validate_sparse_cql(cursor, cf='sparse_super_1', column1='column1', 
col1='col1', col2='col2', key='key'):
+def unpack(list):
--- End diff --

`list` is a Python keyword - 
https://docs.python.org/3/tutorial/datastructures.html#more-on-lists

You should not use `list` as a variable name. Although Python doesn't 
complain it's not a good practice.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra-dtest pull request #41: 14421

2019-01-07 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra-dtest/pull/41#discussion_r245846129
  
--- Diff: conftest.py ---
@@ -409,13 +429,16 @@ def pytest_collection_modifyitems(items, config):
 This function is called upon during the pytest test collection phase 
and allows for modification
 of the test items within the list
 """
-if not config.getoption("--collect-only") and 
config.getoption("--cassandra-dir") is None:
-if config.getoption("--cassandra-version") is None:
+collect_only = config.getoption("--collect-only")
+cassandra_dir = config.getoption("--cassandra-dir")
+cassandra_version = config.getoption("--cassandra-version")
+if not collect_only and cassandra_dir  is None:
+if  cassandra_version is None:
--- End diff --

Nit: Extra space `if  cassandra_version`. Please fix on commit.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra-dtest pull request #41: 14421

2019-01-07 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra-dtest/pull/41#discussion_r245848375
  
--- Diff: upgrade_tests/thrift_upgrade_test.py ---
@@ -10,99 +10,182 @@
ColumnParent, ConsistencyLevel,
SlicePredicate, SliceRange)
 from thrift_test import _i64, get_thrift_client
-from tools.assertions import assert_length_equal
+from tools.assertions import assert_length_equal, 
assert_lists_of_dicts_equal
+from tools.misc import wait_for_agreement, add_skip
 from .upgrade_base import UpgradeTester
 from .upgrade_manifest import build_upgrade_pairs
 
 since = pytest.mark.since
 logger = logging.getLogger(__name__)
 
 
-def _create_dense_super_cf(name):
-return Cassandra.CfDef('ks', name, column_type='Super',
+def _create_dense_super_cf(thrift, name):
+cfdef = Cassandra.CfDef('ks', name, column_type='Super',
key_validation_class='AsciiType',# pk
comparator_type='AsciiType', # ck
default_validation_class='AsciiType',# SC 
value
subcomparator_type='LongType')   # SC 
key
+thrift.system_add_column_family(cfdef)
+wait_for_agreement(thrift)
 
 
-def _create_sparse_super_cf(name):
-cd1 = ColumnDef('col1', 'LongType', None, None)
-cd2 = ColumnDef('col2', 'LongType', None, None)
-return Cassandra.CfDef('ks', name, column_type='Super',
+def _create_sparse_super_cf(thrift, name):
+cd1 = ColumnDef('col1'.encode(), 'LongType', None, None)
+cd2 = ColumnDef('col2'.encode(), 'LongType', None, None)
+cfdef = Cassandra.CfDef('ks', name, column_type='Super',
column_metadata=[cd1, cd2],
key_validation_class='AsciiType',
comparator_type='AsciiType',
subcomparator_type='AsciiType')
+thrift.system_add_column_family(cfdef)
+wait_for_agreement(thrift)
 
 
-def _validate_sparse_cql(cursor, cf='sparse_super_1', column1='column1', 
col1='col1', col2='col2', key='key'):
+def unpack(list):
+result_list = []
+for item_dict in list:
+normalized_dict = {}
+for key, value in item_dict.items():
+if hasattr(value, "items"):
+assert(key == '')
+for a, b in value.items():
+normalized_dict[a] = b
+else:
+normalized_dict[key] = value
+result_list.append(normalized_dict)
+return result_list
+
+
+def add_value(list):
--- End diff --

Please rename `list` variable.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra-dtest pull request #41: 14421

2019-01-07 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra-dtest/pull/41#discussion_r245846879
  
--- Diff: sstable_generation_loading_test.py ---
@@ -27,45 +28,68 @@ def fixture_add_additional_log_patterns(self, 
fixture_dtest_setup):
 fixture_dtest_setup.allow_log_errors = True
 
 upgrade_from = None
-compact = False
+test_compact = False
+
+def compact(self):
+return self.fixture_dtest_setup.cluster.version() < 
MAJOR_VERSION_4 and self.test_compact
 
 def create_schema(self, session, ks, compression):
 create_ks(session, ks, rf=2)
-create_cf(session, "standard1", compression=compression, 
compact_storage=self.compact)
+create_cf(session, "standard1", compression=compression, 
compact_storage=self.compact())
 create_cf(session, "counter1", compression=compression, 
columns={'v': 'counter'},
-  compact_storage=self.compact)
+  compact_storage=self.compact())
 
 def test_sstableloader_compression_none_to_none(self):
+if self.__class__.__name__ != 'TestBasedSSTableLoader' and 
self.upgrade_from is None:
--- End diff --

Since these two lines are redundant. Perhaps refactor this into a small 
method and reuse?


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra-dtest pull request #41: 14421

2018-12-20 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra-dtest/pull/41#discussion_r243218570
  
--- Diff: upgrade_tests/upgrade_through_versions_test.py ---
@@ -794,31 +800,33 @@ def create_upgrade_class(clsname, version_metas, 
protocol_version,
 MultiUpgrade = namedtuple('MultiUpgrade', ('name', 'version_metas', 
'protocol_version', 'extra_config'))
 
 MULTI_UPGRADES = (
-# Proto v3 upgrades (v3 is supported on 2.1, 2.2, 3.0, 3.1, trunk)
-MultiUpgrade(name='TestProtoV3Upgrade_AllVersions_EndsAt_Trunk_HEAD',
- version_metas=[current_2_1_x, current_2_2_x, 
current_3_0_x, indev_3_x], protocol_version=3, extra_config=None),
-
MultiUpgrade(name='TestProtoV3Upgrade_AllVersions_RandomPartitioner_EndsAt_Trunk_HEAD',
- version_metas=[current_2_1_x, current_2_2_x, 
current_3_0_x, indev_3_x], protocol_version=3,
+# Proto v3 upgrades (v3 is supported on 2.1, 2.2, 3.0, 3.11)
+MultiUpgrade(name='TestProtoV3Upgrade_AllVersions_EndsAt_3_11_X',
+ version_metas=[current_2_1_x, current_2_2_x, 
current_3_0_x, indev_3_11_x], protocol_version=3, extra_config=None),
+
MultiUpgrade(name='TestProtoV3Upgrade_AllVersions_RandomPartitioner_EndsAt_3_11_X_HEAD',
+ version_metas=[current_2_1_x, current_2_2_x, 
current_3_0_x, indev_3_11_x], protocol_version=3,
  extra_config=(
  ('partitioner', 
'org.apache.cassandra.dht.RandomPartitioner'),
  )),
 
 # Proto v4 upgrades (v4 is supported on 2.2, 3.0, 3.1, trunk)
 MultiUpgrade(name='TestProtoV4Upgrade_AllVersions_EndsAt_Trunk_HEAD',
- version_metas=[current_2_2_x, current_3_0_x, indev_3_x, 
], protocol_version=4, extra_config=None),
+ version_metas=[current_2_2_x, current_3_0_x, 
current_3_11_x, indev_trunk], protocol_version=4, extra_config=None),
 
MultiUpgrade(name='TestProtoV4Upgrade_AllVersions_RandomPartitioner_EndsAt_Trunk_HEAD',
- version_metas=[current_2_2_x, current_3_0_x, indev_3_x], 
protocol_version=4,
+ version_metas=[current_2_2_x, current_3_0_x, 
current_3_11_x, indev_trunk], protocol_version=4,
  extra_config=(
  ('partitioner', 
'org.apache.cassandra.dht.RandomPartitioner'),
  )),
+#Beta versions don't work with this test since it doesn't specify use 
beta in the client
+#It's fine I guess for now? Can update on release
 # Proto v5 upgrades (v5 is supported on 3.0, 3.11, trunk)
-MultiUpgrade(name='TestProtoV5Upgrade_AllVersions_EndsAt_Trunk_HEAD',
- version_metas=[current_3_0_x, current_3_x, indev_trunk], 
protocol_version=5, extra_config=None),
-
MultiUpgrade(name='TestProtoV5Upgrade_AllVersions_RandomPartitioner_EndsAt_Trunk_HEAD',
- version_metas=[current_3_0_x, current_3_x, indev_trunk], 
protocol_version=5,
- extra_config=(
- ('partitioner', 
'org.apache.cassandra.dht.RandomPartitioner'),
- )),
+# MultiUpgrade(name='TestProtoV5Upgrade_AllVersions_EndsAt_Trunk_HEAD',
--- End diff --

Do we want to leave commented code in? It is ok if this is transient and 
we'll eventually remove it.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #292: provide ZSTD compression support to cassandra (...

2018-12-12 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/292#discussion_r241232030
  
--- Diff: src/java/org/apache/cassandra/io/compress/ZSTDCompressor.java ---
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.compress;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.github.luben.zstd.Zstd;
+
+public class ZSTDCompressor implements ICompressor
+{
+public static final int FAST_COMPRESSION = 1; // fastest compression 
time
+public static final int DEFAULT_LEVEL = 3;
+public static final int BEST_COMPRESSION = 22;// very good compression 
ratio
+private static final ZSTDCompressor instance = new ZSTDCompressor();
+private static final String COMPRESSION_LEVEL_OPTION_NAME = 
"compression_level";
+@VisibleForTesting
+protected static final int compressionLevel = DEFAULT_LEVEL;
+
+public static ZSTDCompressor create(Map 
compressionOptions)
+{
+
validateCompressionLevel(parseCompressionLevelOption(compressionOptions));
+return instance;
--- End diff --

From my brief reading of the JNI Bindings it seems that zstd-jni uses 
`ZSTD_compress` and not `ZSTD_compressCCtx`. The latter allows you to pass in a 
context which would've been more efficient. For now, I think it would be ok to 
simply pass in the compression level. The only value of keeping multiple 
compressor objects around is to retain the `CompressionParams`. It would be 
useful to have a caching factory to avoid creating multiple objects with the 
same compression params.

As far as thread safety goes, the JNI Code looks thread safe specifically 
compress and decompress methods. As pointed out earlier they create and destroy 
the Compression Context on each invocation which is memory unfriendly. So, 
although the zstd benchmarks may look great, I am not so sure about this JNI 
binding.

We should definitely add a JMH Benchmark for zstd.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra-dtest pull request #41: 14421

2018-12-11 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra-dtest/pull/41#discussion_r240824209
  
--- Diff: sstable_generation_loading_test.py ---
@@ -141,6 +166,16 @@ def load_sstable_with_configuration(self, 
pre_compression=None, post_compression
 session.execute("UPDATE standard1 SET v='{}' WHERE KEY='{}' 
AND c='col'".format(i, i))
 session.execute("UPDATE counter1 SET v=v+1 WHERE 
KEY='{}'".format(i))
 
+#Will upgrade to a version that doesn't support compact storage so 
revert the compact
+#storage, this doesn't actually fix it yet
+if self.compact() and default_install_version >= '4':
--- End diff --

Use `LooseVersion` here as well? Prefer constants over magic strings / 
numbers.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra-dtest pull request #41: 14421

2018-12-11 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra-dtest/pull/41#discussion_r240827199
  
--- Diff: upgrade_tests/paging_test.py ---
@@ -503,22 +513,31 @@ def test_basic_compound_paging(self):
 );
 """)
 
-cursor.execute("""
-CREATE TABLE test2 (
-k int,
-c1 int,
-c2 int,
-v text,
-PRIMARY KEY (k, c1, c2)
-) WITH COMPACT STORAGE;
-""")
+testing_compact_storage = self.cluster.version() < '4.0'
+if testing_compact_storage:
+cursor.execute("""
+CREATE TABLE test2 (
+k int,
+c1 int,
+c2 int,
+v text,
+PRIMARY KEY (k, c1, c2)
+) WITH COMPACT STORAGE;
+""")
+
+version_string = self.upgrade_version_string()
+#4.0 doesn't support compact storage
+if version_string == 'trunk' or version_string >= '4.0':
--- End diff --

Use {{LooseVersion}} here as well?


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra-dtest pull request #41: 14421

2018-12-11 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra-dtest/pull/41#discussion_r240827110
  
--- Diff: upgrade_tests/paging_test.py ---
@@ -457,21 +458,30 @@ def test_basic_paging(self):
 );
 """)
 
-cursor.execute("""
-CREATE TABLE test2 (
-k int,
-c int,
-v text,
-PRIMARY KEY (k, c)
-) WITH COMPACT STORAGE;
-""")
+testing_compact_storage = self.cluster.version() < '4.0'
--- End diff --

Use {{LooseVersion}} here as well?


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra-dtest pull request #41: 14421

2018-12-11 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra-dtest/pull/41#discussion_r240823234
  
--- Diff: dtest_setup.py ---
@@ -454,22 +463,14 @@ def maybe_setup_jacoco(self, cluster_name='test'):
 
 @staticmethod
 def create_ccm_cluster(dtest_setup):
-logger.debug("cluster ccm directory: " + dtest_setup.test_path)
+logger.info("cluster ccm directory: " + dtest_setup.test_path)
--- End diff --

Does this need to be info level?


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra-dtest pull request #41: 14421

2018-12-11 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra-dtest/pull/41#discussion_r240827177
  
--- Diff: upgrade_tests/paging_test.py ---
@@ -503,22 +513,31 @@ def test_basic_compound_paging(self):
 );
 """)
 
-cursor.execute("""
-CREATE TABLE test2 (
-k int,
-c1 int,
-c2 int,
-v text,
-PRIMARY KEY (k, c1, c2)
-) WITH COMPACT STORAGE;
-""")
+testing_compact_storage = self.cluster.version() < '4.0'
--- End diff --

Use {{LooseVersion}} here as well?


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra-dtest pull request #41: 14421

2018-12-11 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra-dtest/pull/41#discussion_r240823966
  
--- Diff: sstable_generation_loading_test.py ---
@@ -27,45 +27,68 @@ def fixture_add_additional_log_patterns(self, 
fixture_dtest_setup):
 fixture_dtest_setup.allow_log_errors = True
 
 upgrade_from = None
-compact = False
+test_compact = False
+
+def compact(self):
+return self.fixture_dtest_setup.cluster.version() < '4' and 
self.test_compact
--- End diff --

Use `LooseVersion` instead of a string?


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra-dtest pull request #41: 14421

2018-12-11 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra-dtest/pull/41#discussion_r240828717
  
--- Diff: upgrade_tests/upgrade_manifest.py ---
@@ -70,22 +89,19 @@ def clone_with_local_env_version(self):
 return self
 
 
-indev_2_0_x = None  # None if release not likely
-current_2_0_x = VersionMeta(name='current_2_0_x', family='2.0.x', 
variant='current', version='2.0.17', min_proto_v=1, max_proto_v=2, 
java_versions=(7,))
-
 indev_2_1_x = VersionMeta(name='indev_2_1_x', family='2.1.x', 
variant='indev', version='github:apache/cassandra-2.1', min_proto_v=1, 
max_proto_v=3, java_versions=(7, 8))
-current_2_1_x = VersionMeta(name='current_2_1_x', family='2.1.x', 
variant='current', version='2.1.17', min_proto_v=1, max_proto_v=3, 
java_versions=(7, 8))
+current_2_1_x = VersionMeta(name='current_2_1_x', family='2.1.x', 
variant='current', version='2.1.20', min_proto_v=1, max_proto_v=3, 
java_versions=(7, 8))
 
 indev_2_2_x = VersionMeta(name='indev_2_2_x', family='2.2.x', 
variant='indev', version='github:apache/cassandra-2.2', min_proto_v=1, 
max_proto_v=4, java_versions=(7, 8))
-current_2_2_x = VersionMeta(name='current_2_2_x', family='2.2.x', 
variant='current', version='2.2.9', min_proto_v=1, max_proto_v=4, 
java_versions=(7, 8))
+current_2_2_x = VersionMeta(name='current_2_2_x', family='2.2.x', 
variant='current', version='2.2.13', min_proto_v=1, max_proto_v=4, 
java_versions=(7, 8))
 
-indev_3_0_x = VersionMeta(name='indev_3_0_x', family='3.0.x', 
variant='indev', version='github:apache/cassandra-3.0', min_proto_v=3, 
max_proto_v=4, java_versions=(8,))
-current_3_0_x = VersionMeta(name='current_3_0_x', family='3.0.x', 
variant='current', version='3.0.12', min_proto_v=3, max_proto_v=4, 
java_versions=(8,))
+indev_3_0_x = VersionMeta(name='indev_3_0_x', family='3.0.x', 
variant='indev', version='github:aweisberg/cassandra-3.0', min_proto_v=3, 
max_proto_v=4, java_versions=(8,))
--- End diff --

This needs to be reverted?


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #290: 14841 trunk

2018-10-31 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/290#discussion_r229892479
  
--- Diff: src/java/org/apache/cassandra/gms/Gossiper.java ---
@@ -136,6 +139,38 @@
 
 private volatile long lastProcessedMessageAt = 
System.currentTimeMillis();
 
+private boolean haveMajorVersion3Nodes = true;
+
+final com.google.common.base.Supplier 
haveMajorVersion3NodesSupplier = () ->
+{
+//Once there are no prior version nodes we don't need to keep 
rechecking
+if (!haveMajorVersion3Nodes)
+return false;
+
+Iterable allHosts = 
Iterables.concat(Gossiper.instance.getLiveMembers(), 
Gossiper.instance.getUnreachableMembers());
+CassandraVersion referenceVersion = null;
+
+for (InetAddressAndPort host : allHosts)
+{
+CassandraVersion version = getReleaseVersion(host);
+
+//Raced with changes to gossip state
+if (version == null)
+continue;
+
+if (referenceVersion == null)
+referenceVersion = version;
+
+if (version.major < 4)
--- End diff --

This will capture nodes with major version 2 as well. If that is ok, we 
should probably rename variables to reflect that.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra issue #279: CASSANDRA-14790 Fix flaky LongBufferPoolTest

2018-10-23 Thread dineshjoshi
Github user dineshjoshi commented on the issue:

https://github.com/apache/cassandra/pull/279
  
Hi @jonmeredith, the changes look good. I just had a minor comment but 
other than that, I think we're good.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #279: CASSANDRA-14790 Fix flaky LongBufferPoolTest

2018-10-23 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/279#discussion_r227581961
  
--- Diff: 
test/burn/org/apache/cassandra/utils/memory/LongBufferPoolTest.java ---
@@ -36,9 +36,34 @@
 
 import static org.junit.Assert.*;
 
+/**
+ * Long BufferPool test - make sure that the BufferPool allocates and 
recycles
+ * ByteBuffers under heavy concurrent usage.
+ *
+ * The test creates two groups of threads
+ *
+ * - the burn producer/consumer pair that allocates 1/10 poolSize and then 
returns
+ *   all the memory to the pool. 50% is freed by the producer, 50% passed 
to the consumer thread.
+ *
+ * - a ring of worker threads that allocate buffers and either immediately 
free them,
+ *   or pass to the next worker thread for it to be freed on it's behalf.  
Periodically
+ *   all memory is freed by the thread.
+ *
+ * While the burn/worker threads run, the original main thread checks that 
all of the threads are still
+ * making progress every 10s (no locking issues, or exits from assertion 
failures),
+ * and that every chunk has been freed at least once during the previous 
cycle (if that was possible).
+ *
+ * The test does not expect to survive out-of-memory errors, so needs 
sufficient heap memory
+ * for non-direct buffers and the debug tracking objects that check the 
allocate buffers.
+ * (The timing is very interesting when Xmx is lowered to increase garbage 
collection pauses, but do
+ * not set it too low).
+ */
 public class LongBufferPoolTest
 {
 private static final Logger logger = 
LoggerFactory.getLogger(LongBufferPoolTest.class);
+final int avgBufferSize = 16 << 10;
--- End diff --

Constants are typically uppercased with underscores as separators eg. 
`AVG_BUFFER_SIZE`. Do you also need this to be package private? It can be made 
private and static.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #279: CASSANDRA-14790 Fix flaky LongBufferPoolTest

2018-10-17 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/279#discussion_r226129933
  
--- Diff: 
test/burn/org/apache/cassandra/utils/memory/LongBufferPoolTest.java ---
@@ -327,24 +371,35 @@ BufferCheck sample()
 
 return checks.get(index);
 }
-
-private int sum1toN(int n)
-{
-return (n * (n + 1)) / 2;
-}
 }));
 }
 
-boolean first = true;
 while (!latch.await(10L, TimeUnit.SECONDS))
 {
-if (!first)
-BufferPool.assertAllRecycled();
-first = false;
+int stalledThreads = 0;
+int doneThreads = 0;
+
 for (AtomicBoolean progress : makingProgress)
 {
-assert progress.get();
-progress.set(false);
+if (progress.getAndSet(false) == false)
--- End diff --

```suggestion
if (!progress.getAndSet(false))
```


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #279: CASSANDRA-14790 Fix flaky LongBufferPoolTest

2018-10-17 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/279#discussion_r226129803
  
--- Diff: 
test/burn/org/apache/cassandra/utils/memory/LongBufferPoolTest.java ---
@@ -181,7 +222,7 @@ void checkpoint()
 void testOne() throws Exception
 {
 
-long currentTargetSize = rand.nextInt(poolSize / 1024) 
== 0 ? 0 : targetSize;
+long currentTargetSize = (rand.nextInt(poolSize / 
1024) == 0 || freedAllMemory[threadIdx].get() == false) ? 0 : targetSize;
--- End diff --

```suggestion
long currentTargetSize = (rand.nextInt(poolSize / 1024) 
== 0 || !freedAllMemory[threadIdx].get()) ? 0 : targetSize;
```

This is the canonical way in Java.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #279: CASSANDRA-14790 Fix flaky LongBufferPoolTest

2018-10-15 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/279#discussion_r225404206
  
--- Diff: 
test/burn/org/apache/cassandra/utils/memory/LongBufferPoolTest.java ---
@@ -327,24 +373,41 @@ BufferCheck sample()
 
 return checks.get(index);
 }
-
-private int sum1toN(int n)
-{
-return (n * (n + 1)) / 2;
-}
 }));
 }
 
-boolean first = true;
 while (!latch.await(10L, TimeUnit.SECONDS))
 {
-if (!first)
-BufferPool.assertAllRecycled();
-first = false;
+int stalledThreads = 0;
+int doneThreads = 0;
+
 for (AtomicBoolean progress : makingProgress)
 {
-assert progress.get();
-progress.set(false);
+if (progress.getAndSet(false) == false)
+stalledThreads++;
+}
+
+for (Future r : ret)
+{
+if (r.isDone())
+doneThreads++;
+}
+if (doneThreads == 0) // If any threads have completed, they 
will stop making progress/recycling buffers.
+{ // Assertions failures on the threads 
will be caught below.
+assert stalledThreads == 0;
+boolean allFreed = burnFreed.getAndSet(false);
+for (AtomicBoolean freedMemory : freedAllMemory)
+{
+allFreed = allFreed && freedMemory.getAndSet(false);
+}
+if (allFreed)
+{
--- End diff --

According to the code style, you can skip braces here.
Reference: 
http://cassandra.apache.org/doc/latest/development/code_style.html


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #279: CASSANDRA-14790 Fix flaky LongBufferPoolTest

2018-10-15 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/279#discussion_r225404225
  
--- Diff: 
test/burn/org/apache/cassandra/utils/memory/LongBufferPoolTest.java ---
@@ -327,24 +373,41 @@ BufferCheck sample()
 
 return checks.get(index);
 }
-
-private int sum1toN(int n)
-{
-return (n * (n + 1)) / 2;
-}
 }));
 }
 
-boolean first = true;
 while (!latch.await(10L, TimeUnit.SECONDS))
 {
-if (!first)
-BufferPool.assertAllRecycled();
-first = false;
+int stalledThreads = 0;
+int doneThreads = 0;
+
 for (AtomicBoolean progress : makingProgress)
 {
-assert progress.get();
-progress.set(false);
+if (progress.getAndSet(false) == false)
+stalledThreads++;
+}
+
+for (Future r : ret)
+{
+if (r.isDone())
+doneThreads++;
+}
+if (doneThreads == 0) // If any threads have completed, they 
will stop making progress/recycling buffers.
+{ // Assertions failures on the threads 
will be caught below.
+assert stalledThreads == 0;
+boolean allFreed = burnFreed.getAndSet(false);
+for (AtomicBoolean freedMemory : freedAllMemory)
+{
+allFreed = allFreed && freedMemory.getAndSet(false);
+}
+if (allFreed)
+{
+BufferPool.assertAllRecycled();
+}
+else
+{
--- End diff --

According to the code style, you can skip braces here.
Reference: 
http://cassandra.apache.org/doc/latest/development/code_style.html


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #279: CASSANDRA-14790 Fix flaky LongBufferPoolTest

2018-10-15 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/279#discussion_r225404189
  
--- Diff: 
test/burn/org/apache/cassandra/utils/memory/LongBufferPoolTest.java ---
@@ -327,24 +373,41 @@ BufferCheck sample()
 
 return checks.get(index);
 }
-
-private int sum1toN(int n)
-{
-return (n * (n + 1)) / 2;
-}
 }));
 }
 
-boolean first = true;
 while (!latch.await(10L, TimeUnit.SECONDS))
 {
-if (!first)
-BufferPool.assertAllRecycled();
-first = false;
+int stalledThreads = 0;
+int doneThreads = 0;
+
 for (AtomicBoolean progress : makingProgress)
 {
-assert progress.get();
-progress.set(false);
+if (progress.getAndSet(false) == false)
+stalledThreads++;
+}
+
+for (Future r : ret)
+{
+if (r.isDone())
+doneThreads++;
+}
+if (doneThreads == 0) // If any threads have completed, they 
will stop making progress/recycling buffers.
+{ // Assertions failures on the threads 
will be caught below.
+assert stalledThreads == 0;
+boolean allFreed = burnFreed.getAndSet(false);
+for (AtomicBoolean freedMemory : freedAllMemory)
+{
--- End diff --

According to the code style, you can skip braces here.
Reference: 
http://cassandra.apache.org/doc/latest/development/code_style.html


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #279: CASSANDRA-14790 Fix flaky LongBufferPoolTest

2018-10-15 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/279#discussion_r225403619
  
--- Diff: 
test/burn/org/apache/cassandra/utils/memory/LongBufferPoolTest.java ---
@@ -88,37 +110,51 @@ public void testAllocate(int threadCount, long 
duration, int poolSize) throws In
 final CountDownLatch latch = new CountDownLatch(threadCount);
 final SPSCQueue[] sharedRecycle = new 
SPSCQueue[threadCount];
 final AtomicBoolean[] makingProgress = new 
AtomicBoolean[threadCount];
+AtomicBoolean burnFreed = new AtomicBoolean(false);
--- End diff --

This can be made `final`.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #253: 13630

2018-09-07 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/253#discussion_r216088059
  
--- Diff: src/java/org/apache/cassandra/net/MessageIn.java ---
@@ -231,4 +241,437 @@ public String toString()
 sbuf.append("FROM:").append(from).append(" 
TYPE:").append(getMessageType()).append(" VERB:").append(verb);
 return sbuf.toString();
 }
+
+public static MessageInProcessor getProcessor(InetAddressAndPort peer, 
int messagingVersion)
+{
+return getProcessor(peer, messagingVersion, 
MessageInProcessor.MESSAGING_SERVICE_CONSUMER);
+
+}
+
+public static MessageInProcessor getProcessor(InetAddressAndPort peer, 
int messagingVersion, BiConsumer messageConsumer)
+{
+return messagingVersion >= MessagingService.VERSION_40
+   ? new MessageInProcessorAsOf40(peer, messagingVersion, 
messageConsumer)
+   : new MessageInProcessorPre40(peer, messagingVersion, 
messageConsumer);
+
+}
+
+/**
+ * Implementations contain the mechanics and logic of parsing incoming 
messages. Allows for both non-blocking
+ * and blocking styles of interaction via the {@link 
#process(ByteBuf)} and {@link #process(RebufferingByteBufDataInputPlus)}
+ * methods, respectively.
+ *
+ * Does not contain the actual deserialization code for message fields 
nor payload. That is left to the
+ * {@link MessageIn#read(DataInputPlus, int, int)} family of methods.
+ */
+public static abstract class MessageInProcessor
+{
+/**
+ * The current state of deserializing an incoming message. This 
enum is only used in the nonblocking versions.
+ */
+public enum State
+{
+READ_PREFIX,
+READ_IP_ADDRESS,
+READ_VERB,
+READ_PARAMETERS_SIZE,
+READ_PARAMETERS_DATA,
+READ_PAYLOAD_SIZE,
+READ_PAYLOAD
+}
+
+static final int VERB_LENGTH = Integer.BYTES;
+
+/**
+ * The default target for consuming deserialized {@link MessageIn}.
+ */
+private static final BiConsumer 
MESSAGING_SERVICE_CONSUMER = (messageIn, id) -> 
MessagingService.instance().receive(messageIn, id);
+
+final InetAddressAndPort peer;
+final int messagingVersion;
+
+/**
+ * Abstracts out depending directly on {@link 
MessagingService#receive(MessageIn, int)}; this makes tests more sane
+ * as they don't require nor trigger the entire message processing 
circus.
+ */
+final BiConsumer messageConsumer;
+
+/**
+ * Captures the current {@link State} of processing a message. 
Primarily useful in the non-blocking use case.
+ */
+State state = State.READ_PREFIX;
+
+/**
+ * Captures the current data we've parsed out of in incoming 
message. Primarily useful in the non-blocking use case.
+ */
+MessageHeader messageHeader;
+
+/**
+ * Process the buffer in a non-blocking manner. Will try to read 
out as much of a message(s) as possible,
+ * and send any fully deserialized messages to {@link 
#messageConsumer}.
+ */
+public abstract void process(ByteBuf in) throws IOException;
+
+/**
+ * Process the buffer in a blocking manner. Will read as many 
messages as possible, blocking for more data,
+ * and send any fully deserialized messages to {@link 
#messageConsumer}.
+ */
+public abstract void process(RebufferingByteBufDataInputPlus in) 
throws IOException;
+
+MessageInProcessor(InetAddressAndPort peer, int messagingVersion, 
BiConsumer messageConsumer)
+{
+this.peer = peer;
+this.messagingVersion = messagingVersion;
+this.messageConsumer = messageConsumer;
+}
+
+/**
+ * Only applicable in the non-blocking use case, and should ony be 
used for testing!!!
+ */
+@VisibleForTesting
+public MessageHeader getMessageHeader()
+{
+return messageHeader;
+}
+
+/**
+ * A simple struct to hold the message header data as it is being 
built up.
+ */
+public static class MessageHeader
+{
+public int messageId;
+long constructionTime;
+public InetAddressAndPort from;
+public MessagingService.Verb verb;
+int payloadSize;
 

[GitHub] cassandra pull request #253: 13630

2018-09-07 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/253#discussion_r216103691
  
--- Diff: src/java/org/apache/cassandra/net/async/MessageInHandler.java ---
@@ -18,143 +18,296 @@
 
 package org.apache.cassandra.net.async;
 
-import java.io.DataInputStream;
+import java.io.EOFException;
 import java.io.IOException;
-import java.util.Collections;
-import java.util.EnumMap;
-import java.util.List;
-import java.util.Map;
-import java.util.function.BiConsumer;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
-import com.google.common.primitives.Ints;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelHandlerContext;
-import org.apache.cassandra.io.util.DataInputBuffer;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import io.netty.util.ReferenceCountUtil;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.exceptions.UnknownTableException;
 import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.net.MessageIn;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.net.ParameterType;
-import org.apache.cassandra.utils.vint.VIntCoding;
+import org.apache.cassandra.net.MessageIn.MessageInProcessor;
 
 /**
  * Parses incoming messages as per the 4.0 internode messaging protocol.
  */
-public class MessageInHandler extends BaseMessageInHandler
+public class MessageInHandler extends ChannelInboundHandlerAdapter
 {
 public static final Logger logger = 
LoggerFactory.getLogger(MessageInHandler.class);
 
-private MessageHeader messageHeader;
+private final InetAddressAndPort peer;
 
-MessageInHandler(InetAddressAndPort peer, int messagingVersion)
+private final BufferHandler bufferHandler;
+private volatile boolean closed;
+
+public MessageInHandler(InetAddressAndPort peer, MessageInProcessor 
messageProcessor, boolean handlesLargeMessages)
+{
+this.peer = peer;
+
+bufferHandler = handlesLargeMessages
+? new BlockingBufferHandler(messageProcessor)
+: new NonblockingBufferHandler(messageProcessor);
+}
+
+public void channelRead(ChannelHandlerContext ctx, Object msg) throws 
IOException
+{
+if (!closed)
+{
+bufferHandler.channelRead(ctx, (ByteBuf) msg);
+}
+else
+{
+ReferenceCountUtil.release(msg);
+ctx.close();
+}
+}
+
+public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
+{
+if (cause instanceof EOFException)
+logger.trace("eof reading from socket; closing", cause);
+else if (cause instanceof UnknownTableException)
+logger.warn("Got message from unknown table while reading from 
socket; closing", cause);
+else if (cause instanceof IOException)
+logger.trace("IOException reading from socket; closing", 
cause);
+else
+logger.warn("Unexpected exception caught in inbound channel 
pipeline from " + ctx.channel().remoteAddress(), cause);
+
+close();
+ctx.close();
+}
+
+public void channelInactive(ChannelHandlerContext ctx)
+{
+logger.trace("received channel closed message for peer {} on local 
addr {}", ctx.channel().remoteAddress(), ctx.channel().localAddress());
+close();
+ctx.fireChannelInactive();
+}
+
+void close()
 {
-this (peer, messagingVersion, MESSAGING_SERVICE_CONSUMER);
+closed = true;
+bufferHandler.close();
 }
 
-public MessageInHandler(InetAddressAndPort peer, int messagingVersion, 
BiConsumer messageConsumer)
+boolean isClosed()
 {
-super(peer, messagingVersion, messageConsumer);
+return closed;
+}
 
-assert messagingVersion >= MessagingService.VERSION_40 : 
String.format("wrong messaging version for this handler: got %d, but expect %d 
or higher",
-   
   messagingVersion, MessagingService.VERSION_40);
-state = State.R

[GitHub] cassandra pull request #253: 13630

2018-09-07 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/253#discussion_r216105848
  
--- Diff: src/java/org/apache/cassandra/net/MessageOut.java ---
@@ -180,6 +199,73 @@ public String toString()
 return sbuf.toString();
 }
 
+/**
+ * The main entry point for sending an internode message to a peer 
node in the cluster.
+ */
+public void serialize(DataOutputPlus out, int messagingVersion, 
OutboundConnectionIdentifier destinationId, int id, long timestampNanos) throws 
IOException
+{
+captureTracingInfo(destinationId);
+
+out.writeInt(MessagingService.PROTOCOL_MAGIC);
+out.writeInt(id);
+
+// int cast cuts off the high-order half of the timestamp, which 
we can assume remains
+// the same between now and when the recipient reconstructs it.
+out.writeInt((int) 
NanoTimeToCurrentTimeMillis.convert(timestampNanos));
+serialize(out, messagingVersion);
+}
+
+/**
+ * Record any tracing data, if enabled on this message.
+ */
+@VisibleForTesting
+void captureTracingInfo(OutboundConnectionIdentifier destinationId)
+{
+try
+{
+UUID sessionId =  
(UUID)getParameter(ParameterType.TRACE_SESSION);
+if (sessionId != null)
+{
+TraceState state = Tracing.instance.get(sessionId);
+String logMessage = String.format("Sending %s message to 
%s", verb, destinationId.connectionAddress());
+// session may have already finished; see CASSANDRA-5668
+if (state == null)
+{
+Tracing.TraceType traceType = 
(Tracing.TraceType)getParameter(ParameterType.TRACE_TYPE);
+traceType = traceType == null ? 
Tracing.TraceType.QUERY : traceType;
+
Tracing.instance.trace(ByteBuffer.wrap(UUIDGen.decompose(sessionId)), 
logMessage, traceType.getTTL());
+}
+else
+{
+state.trace(logMessage);
+if (verb == MessagingService.Verb.REQUEST_RESPONSE)
+Tracing.instance.doneWithNonLocalSession(state);
+}
+}
+}
+catch (Exception e)
+{
+logger.warn("failed to capture the tracing info for an 
outbound message to {}, ignoring", destinationId, e);
+}
+}
+
+private Object getParameter(ParameterType type)
+{
+for (int ii = 0; ii < parameters.size(); ii += 
PARAMETER_TUPLE_SIZE)
+{
+if (((ParameterType)parameters.get(ii + 
PARAMETER_TUPLE_TYPE_OFFSET)).equals(type))
--- End diff --

Don't need the typecast to `ParameterType`


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #253: 13630

2018-09-07 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/253#discussion_r216106796
  
--- Diff: src/java/org/apache/cassandra/net/MessageOut.java ---
@@ -180,6 +199,73 @@ public String toString()
 return sbuf.toString();
 }
 
+/**
+ * The main entry point for sending an internode message to a peer 
node in the cluster.
+ */
+public void serialize(DataOutputPlus out, int messagingVersion, 
OutboundConnectionIdentifier destinationId, int id, long timestampNanos) throws 
IOException
+{
+captureTracingInfo(destinationId);
+
+out.writeInt(MessagingService.PROTOCOL_MAGIC);
+out.writeInt(id);
+
+// int cast cuts off the high-order half of the timestamp, which 
we can assume remains
+// the same between now and when the recipient reconstructs it.
+out.writeInt((int) 
NanoTimeToCurrentTimeMillis.convert(timestampNanos));
+serialize(out, messagingVersion);
+}
+
+/**
+ * Record any tracing data, if enabled on this message.
+ */
+@VisibleForTesting
+void captureTracingInfo(OutboundConnectionIdentifier destinationId)
+{
+try
+{
+UUID sessionId =  
(UUID)getParameter(ParameterType.TRACE_SESSION);
+if (sessionId != null)
--- End diff --

I assume `sessionId != null` means that tracing is enabled? Otherwise we 
should explicitly check whether tracing is enabled.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #253: 13630

2018-09-07 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/253#discussion_r216102238
  
--- Diff: 
test/unit/org/apache/cassandra/net/MessageInProcessorPre40Test.java ---
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.io.IOException;
+import java.net.UnknownHostException;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.net.MessageIn.MessageInProcessorPre40;
+import org.apache.cassandra.net.async.ByteBufDataOutputPlus;
+
+public class MessageInProcessorPre40Test
+{
+private static InetAddressAndPort addr;
+
+private MessageInProcessorPre40 processor;
+private ByteBuf buf;
+
+@BeforeClass
+public static void before() throws UnknownHostException
+{
+DatabaseDescriptor.daemonInitialization();
+addr = InetAddressAndPort.getByName("127.0.0.1");
+}
+
+@Before
+public void setup()
+{
+processor = new MessageInProcessorPre40(addr, 
MessagingService.VERSION_30, (messageIn, integer) -> {});
+}
+
+@After
+public void tearDown()
+{
+if (buf != null && buf.refCnt() > 0)
+buf.release();
+}
+
+@Test
+public void canReadNextParam_HappyPath() throws IOException
+{
+buildParamBufPre40(13);
+Assert.assertTrue(processor.canReadNextParam(buf));
+}
+
+@Test
+public void canReadNextParam_OnlyFirstByte() throws IOException
+{
+buildParamBufPre40(13);
+buf.writerIndex(1);
+Assert.assertFalse(processor.canReadNextParam(buf));
+}
+
+@Test
+public void canReadNextParam_PartialUTF() throws IOException
+{
+buildParamBufPre40(13);
+buf.writerIndex(5);
+Assert.assertFalse(processor.canReadNextParam(buf));
+}
+
+@Test
+public void canReadNextParam_TruncatedValueLength() throws IOException
+{
+buildParamBufPre40(13);
+buf.writerIndex(buf.writerIndex() - 13 - 2);
+Assert.assertFalse(processor.canReadNextParam(buf));
+}
+
+@Test
+public void canReadNextParam_MissingLastBytes() throws IOException
+{
+buildParamBufPre40(13);
+buf.writerIndex(buf.writerIndex() - 2);
+Assert.assertFalse(processor.canReadNextParam(buf));
+}
+
+private void buildParamBufPre40(int valueLength) throws IOException
+{
+buf = Unpooled.buffer(1024, 1024); // 1k should be enough for 
everybody!
--- End diff --

🤣


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #253: 13630

2018-09-07 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/253#discussion_r216086521
  
--- Diff: 
src/java/org/apache/cassandra/net/async/RebufferingByteBufDataInputPlus.java ---
@@ -36,6 +37,11 @@
 
 public class RebufferingByteBufDataInputPlus extends 
RebufferingInputStream implements ReadableByteChannel
 {
+/**
+ * Default to a very large value.
+ */
+private static final long DEFAULT_REBUFFER_BLOCK_IN_MILLIS = 
TimeUnit.DAYS.toMillis(2);
--- End diff --

I think it doesn't make sense to wait for more than 5 minutes to rebuffer. 
Given buffer sizes are on the order of a few megabytes at most, 5 minutes is an 
eternity. It's best to keep it short so failures are exposed sooner than later 
and rather than threads being stuck waiting on a call that doesn't timeout in a 
reasonable amount of time.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #253: 13630

2018-09-07 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/253#discussion_r216047177
  
--- Diff: src/java/org/apache/cassandra/net/async/MessageOutHandler.java ---
@@ -140,8 +125,17 @@ public void write(ChannelHandlerContext ctx, Object o, 
ChannelPromise promise)
 
 out = ctx.alloc().ioBuffer((int)currentFrameSize);
 
-captureTracingInfo(msg);
-serializeMessage(msg, out);
+@SuppressWarnings("resource")
+ByteBufDataOutputPlus outputPlus = new 
ByteBufDataOutputPlus(out);
+msg.message.serialize(outputPlus, targetMessagingVersion, 
connectionId, msg.id, msg.timestampNanos);
+
+// next few lines are for debugging ... massively helpful!!
+// if we allocated too much buffer for this message, we'll log 
here.
+// if we allocated to little buffer space, we would have hit 
an exception when trying to write more bytes to it
+if (out.isWritable())
+errorLogger.error("{} reported message size {}, actual 
message size {}, msg {}",
--- End diff --

It's fine. We can leave it in.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #253: 13630

2018-09-07 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/253#discussion_r215733891
  
--- Diff: 
src/java/org/apache/cassandra/net/async/RebufferingByteBufDataInputPlus.java ---
@@ -36,6 +37,11 @@
 
 public class RebufferingByteBufDataInputPlus extends 
RebufferingInputStream implements ReadableByteChannel
 {
+/**
+ * Default to a very large value.
+ */
+private static final long DEFAULT_REBUFFER_BLOCK_IN_MILLIS = 
TimeUnit.DAYS.toMillis(2);
--- End diff --

Its risky to default to a very large value. Why do we want to default to 
such a large value?


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #253: 13630

2018-09-07 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/253#discussion_r215735492
  
--- Diff: 
src/java/org/apache/cassandra/streaming/async/NettyStreamingMessageSender.java 
---
@@ -393,6 +393,18 @@ private Channel getOrCreateChannel()
 }
 }
 
+private void onError(Throwable t)
+{
+try
+{
+session.onError(t).get(5, TimeUnit.MINUTES);
--- End diff --

Why do we have a 5 minute timeout? We should pull this out as a constant.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #253: 13630

2018-09-07 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/253#discussion_r215739220
  
--- Diff: src/java/org/apache/cassandra/net/MessageIn.java ---
@@ -231,4 +241,437 @@ public String toString()
 sbuf.append("FROM:").append(from).append(" 
TYPE:").append(getMessageType()).append(" VERB:").append(verb);
 return sbuf.toString();
 }
+
+public static MessageInProcessor getProcessor(InetAddressAndPort peer, 
int messagingVersion)
+{
+return getProcessor(peer, messagingVersion, 
MessageInProcessor.MESSAGING_SERVICE_CONSUMER);
+
+}
+
+public static MessageInProcessor getProcessor(InetAddressAndPort peer, 
int messagingVersion, BiConsumer messageConsumer)
+{
+return messagingVersion >= MessagingService.VERSION_40
+   ? new MessageInProcessorAsOf40(peer, messagingVersion, 
messageConsumer)
+   : new MessageInProcessorPre40(peer, messagingVersion, 
messageConsumer);
+
+}
+
+/**
+ * Implementations contain the mechanics and logic of parsing incoming 
messages. Allows for both non-blocking
+ * and blocking styles of interaction via the {@link 
#process(ByteBuf)} and {@link #process(RebufferingByteBufDataInputPlus)}
+ * methods, respectively.
+ *
+ * Does not contain the actual deserialization code for message fields 
nor payload. That is left to the
+ * {@link MessageIn#read(DataInputPlus, int, int)} family of methods.
+ */
+public static abstract class MessageInProcessor
+{
+/**
+ * The current state of deserializing an incoming message. This 
enum is only used in the nonblocking versions.
+ */
+public enum State
+{
+READ_PREFIX,
+READ_IP_ADDRESS,
+READ_VERB,
+READ_PARAMETERS_SIZE,
+READ_PARAMETERS_DATA,
+READ_PAYLOAD_SIZE,
+READ_PAYLOAD
+}
+
+static final int VERB_LENGTH = Integer.BYTES;
+
+/**
+ * The default target for consuming deserialized {@link MessageIn}.
+ */
+private static final BiConsumer 
MESSAGING_SERVICE_CONSUMER = (messageIn, id) -> 
MessagingService.instance().receive(messageIn, id);
+
+final InetAddressAndPort peer;
+final int messagingVersion;
+
+/**
+ * Abstracts out depending directly on {@link 
MessagingService#receive(MessageIn, int)}; this makes tests more sane
+ * as they don't require nor trigger the entire message processing 
circus.
+ */
+final BiConsumer messageConsumer;
+
+/**
+ * Captures the current {@link State} of processing a message. 
Primarily useful in the non-blocking use case.
+ */
+State state = State.READ_PREFIX;
+
+/**
+ * Captures the current data we've parsed out of in incoming 
message. Primarily useful in the non-blocking use case.
+ */
+MessageHeader messageHeader;
+
+/**
+ * Process the buffer in a non-blocking manner. Will try to read 
out as much of a message(s) as possible,
+ * and send any fully deserialized messages to {@link 
#messageConsumer}.
+ */
+public abstract void process(ByteBuf in) throws IOException;
+
+/**
+ * Process the buffer in a blocking manner. Will read as many 
messages as possible, blocking for more data,
+ * and send any fully deserialized messages to {@link 
#messageConsumer}.
+ */
+public abstract void process(RebufferingByteBufDataInputPlus in) 
throws IOException;
+
+MessageInProcessor(InetAddressAndPort peer, int messagingVersion, 
BiConsumer messageConsumer)
+{
+this.peer = peer;
+this.messagingVersion = messagingVersion;
+this.messageConsumer = messageConsumer;
+}
+
+/**
+ * Only applicable in the non-blocking use case, and should ony be 
used for testing!!!
+ */
+@VisibleForTesting
+public MessageHeader getMessageHeader()
+{
+return messageHeader;
+}
+
+/**
+ * A simple struct to hold the message header data as it is being 
built up.
+ */
+public static class MessageHeader
+{
+public int messageId;
+long constructionTime;
+public InetAddressAndPort from;
+public MessagingService.Verb verb;
+int payloadSize;
 

[GitHub] cassandra pull request #253: 13630

2018-09-07 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/253#discussion_r215733539
  
--- Diff: src/java/org/apache/cassandra/net/async/MessageOutHandler.java ---
@@ -140,8 +125,17 @@ public void write(ChannelHandlerContext ctx, Object o, 
ChannelPromise promise)
 
 out = ctx.alloc().ioBuffer((int)currentFrameSize);
 
-captureTracingInfo(msg);
-serializeMessage(msg, out);
+@SuppressWarnings("resource")
+ByteBufDataOutputPlus outputPlus = new 
ByteBufDataOutputPlus(out);
+msg.message.serialize(outputPlus, targetMessagingVersion, 
connectionId, msg.id, msg.timestampNanos);
+
+// next few lines are for debugging ... massively helpful!!
+// if we allocated too much buffer for this message, we'll log 
here.
+// if we allocated to little buffer space, we would have hit 
an exception when trying to write more bytes to it
+if (out.isWritable())
+errorLogger.error("{} reported message size {}, actual 
message size {}, msg {}",
--- End diff --

How likely are we to cause log spam?


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #253: 13630

2018-09-07 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/253#discussion_r215734926
  
--- Diff: 
src/java/org/apache/cassandra/net/async/RebufferingByteBufDataInputPlus.java ---
@@ -183,6 +195,11 @@ public int available() throws EOFException
 return availableBytes;
 }
 
+public boolean isEmpty() throws EOFException
+{
+return available() == 0;
--- End diff --

The method `available()` has a side effect of 
`channelConfig.setAutoRead(true)` when the `availableBytes` falls below the 
`lowWatermark`. Are you sure that is ok?


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #255: Marcuse/14618

2018-08-29 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/255#discussion_r213558154
  
--- Diff: src/java/org/apache/cassandra/tools/fqltool/QueryReplayer.java ---
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools.fqltool;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import javax.annotation.Nullable;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Timer;
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Session;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class QueryReplayer implements Closeable
+{
+private final ExecutorService es = Executors.newFixedThreadPool(1);
+private final Iterator> queryIterator;
+private final List targetHosts;
+private final List targetClusters;
+private final List> filters;
+private final List sessions;
+private final ResultHandler resultHandler;
+private final MetricRegistry metrics = new MetricRegistry();
+private final boolean debug;
+private final PrintStream out;
+
+public QueryReplayer(Iterator> queryIterator,
+ List targetHosts,
+ List resultPaths,
+ List> filters,
+ PrintStream out,
+ String useKeyspace,
+ String queryFilePathString,
+ boolean debug)
+{
+this.queryIterator = queryIterator;
+this.targetHosts = targetHosts;
+targetClusters = targetHosts.stream().map(h -> 
Cluster.builder().addContactPoint(h).build()).collect(Collectors.toList());
+this.filters = filters;
+sessions = useKeyspace != null ?
+   targetClusters.stream().map(c -> 
c.connect(useKeyspace)).collect(Collectors.toList()) :
+   
targetClusters.stream().map(Cluster::connect).collect(Collectors.toList());
+File queryFilePath = queryFilePathString != null ? new 
File(queryFilePathString) : null;
+resultHandler = new ResultHandler(targetHosts, resultPaths, 
queryFilePath);
+this.debug = debug;
+this.out = out;
+}
+
+public void replay()
+{
+while (queryIterator.hasNext())
+{
+List queries = queryIterator.next();
+for (FQLQuery query : queries)
+{
+if (filters.stream().anyMatch(f -> !f.test(query)))
+continue;
+try (Timer.Context ctx = metrics.timer("queries").time())
+{
+
List> results = new 
ArrayList<>(sessions.size());
+for (Session session : sessions)
+{
+try
+{
+if (query.keyspace != null && 
!query.keyspace.equals(session.getLoggedKeyspace()))
+{
+if (debug)
+{
+out.println(String.format("Switching 
keyspace from %s to %s", session.getLoggedKeyspace(), query.keyspace));
+

[GitHub] cassandra pull request #255: Marcuse/14618

2018-08-29 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/255#discussion_r213557602
  
--- Diff: src/java/org/apache/cassandra/tools/fqltool/QueryReplayer.java ---
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools.fqltool;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import javax.annotation.Nullable;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Timer;
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Session;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class QueryReplayer implements Closeable
+{
+private final ExecutorService es = Executors.newFixedThreadPool(1);
+private final Iterator> queryIterator;
+private final List targetHosts;
+private final List targetClusters;
+private final List> filters;
+private final List sessions;
+private final ResultHandler resultHandler;
+private final MetricRegistry metrics = new MetricRegistry();
+private final boolean debug;
+private final PrintStream out;
+
+public QueryReplayer(Iterator> queryIterator,
+ List targetHosts,
+ List resultPaths,
+ List> filters,
+ PrintStream out,
+ String useKeyspace,
+ String queryFilePathString,
+ boolean debug)
+{
+this.queryIterator = queryIterator;
+this.targetHosts = targetHosts;
+targetClusters = targetHosts.stream().map(h -> 
Cluster.builder().addContactPoint(h).build()).collect(Collectors.toList());
+this.filters = filters;
+sessions = useKeyspace != null ?
+   targetClusters.stream().map(c -> 
c.connect(useKeyspace)).collect(Collectors.toList()) :
+   
targetClusters.stream().map(Cluster::connect).collect(Collectors.toList());
+File queryFilePath = queryFilePathString != null ? new 
File(queryFilePathString) : null;
+resultHandler = new ResultHandler(targetHosts, resultPaths, 
queryFilePath);
+this.debug = debug;
+this.out = out;
+}
+
+public void replay()
+{
+while (queryIterator.hasNext())
+{
+List queries = queryIterator.next();
+for (FQLQuery query : queries)
+{
+if (filters.stream().anyMatch(f -> !f.test(query)))
+continue;
+try (Timer.Context ctx = metrics.timer("queries").time())
+{
+
List> results = new 
ArrayList<>(sessions.size());
+for (Session session : sessions)
+{
+try
+{
+if (query.keyspace != null && 
!query.keyspace.equals(session.getLoggedKeyspace()))
+{
+if (debug)
+{
--- End diff --

Single statement blocks do not require braces.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #255: Marcuse/14618

2018-08-29 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/255#discussion_r213559247
  
--- Diff: src/java/org/apache/cassandra/tools/fqltool/FQLQueryIterator.java 
---
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools.fqltool;
+
+import java.util.PriorityQueue;
+
+import net.openhft.chronicle.queue.ExcerptTailer;
+import org.apache.cassandra.utils.AbstractIterator;
+
+public class FQLQueryIterator extends AbstractIterator
+{
+private final PriorityQueue pq;
+private final ExcerptTailer tailer;
+private final FQLQueryReader reader;
+
+/**
+ * Create an iterator over the FQLQueries in tailer
+ *
+ * Reads up to readAhead queries in to memory to be able to sort them 
(the files are mostly sorted already)
+ */
+public FQLQueryIterator(ExcerptTailer tailer, int readAhead, boolean 
legacyFiles)
+{
+assert readAhead > 0 : "ReadAhead needs to be > 0";
--- End diff --

Nit: `ReadAhead` -> `readAhead` in the message


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #255: Marcuse/14618

2018-08-29 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/255#discussion_r213557051
  
--- Diff: src/java/org/apache/cassandra/tools/fqltool/QueryReplayer.java ---
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools.fqltool;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import javax.annotation.Nullable;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Timer;
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Session;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class QueryReplayer implements Closeable
+{
+private final ExecutorService es = Executors.newFixedThreadPool(1);
+private final Iterator> queryIterator;
+private final List targetHosts;
--- End diff --

This variable is unused. Its only used in the constructor.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #255: Marcuse/14618

2018-08-29 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/255#discussion_r213564852
  
--- Diff: src/java/org/apache/cassandra/tools/fqltool/ResultStore.java ---
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools.fqltool;
+
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import net.openhft.chronicle.bytes.BytesStore;
+import net.openhft.chronicle.core.io.Closeable;
+import net.openhft.chronicle.queue.ChronicleQueue;
+import net.openhft.chronicle.queue.ChronicleQueueBuilder;
+import net.openhft.chronicle.queue.ExcerptAppender;
+import net.openhft.chronicle.wire.ValueOut;
+import org.apache.cassandra.utils.binlog.BinLog;
+
+/**
+ * see FQLReplayTest#readResultFile for how to read files produced by this 
class
+ */
+public class ResultStore
+{
+private final List queues;
+private final List appenders;
+private final ChronicleQueue queryStoreQueue;
+private final ExcerptAppender queryStoreAppender;
+private final Set finishedHosts = new HashSet<>();
+private final File queryFilePath;
--- End diff --

Unused variable.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #255: Marcuse/14618

2018-08-29 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/255#discussion_r213562282
  
--- Diff: src/java/org/apache/cassandra/tools/fqltool/DriverResultSet.java 
---
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools.fqltool;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.AbstractIterator;
+
+import com.datastax.driver.core.ColumnDefinitions;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+
+/**
+ * Wraps a result set from the driver so that we can reuse the compare 
code when reading
+ * up a result set produced by ResultStore.
+ */
+public class DriverResultSet implements ResultHandler.ComparableResultSet
+{
+private final ResultSet resultSet;
+private final Throwable failureException;
+
+public DriverResultSet(ResultSet resultSet)
+{
+this(resultSet, null);
+}
+
+private DriverResultSet(ResultSet res, Throwable failureException)
+{
+resultSet = res;
+this.failureException = failureException;
+}
+
+public static DriverResultSet failed(Throwable ex)
+{
+return new DriverResultSet(null, ex);
+}
+
+public ResultHandler.ComparableColumnDefinitions getColumnDefinitions()
+{
+if (wasFailed())
+return new DriverColumnDefinitions(null, true);
+
+return new 
DriverColumnDefinitions(resultSet.getColumnDefinitions());
+}
+
+public boolean wasFailed()
+{
+return failureException != null;
+}
+
+public Throwable getFailureException()
+{
+return failureException;
+}
+
+public Iterator iterator()
+{
+if (wasFailed())
+return Collections.emptyListIterator();
+return new AbstractIterator()
+{
+Iterator iter = resultSet.iterator();
+protected ResultHandler.ComparableRow computeNext()
+{
+if (iter.hasNext())
+{
--- End diff --

Unnecessary braces for single statement block (there are bunch of these in 
this file).


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #255: Marcuse/14618

2018-08-29 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/255#discussion_r213563398
  
--- Diff: src/java/org/apache/cassandra/tools/fqltool/FQLQuery.java ---
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools.fqltool;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import com.google.common.primitives.Longs;
+import com.google.common.util.concurrent.FluentFuture;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+
+import com.datastax.driver.core.BatchStatement;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.SimpleStatement;
+import org.apache.cassandra.audit.FullQueryLogger;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.binlog.BinLog;
+
+public abstract class FQLQuery implements Comparable
+{
+public final long queryTime;
+public final QueryOptions queryOptions;
+public final int protocolVersion;
+public final String keyspace;
+
+public FQLQuery(String keyspace, int protocolVersion, QueryOptions 
queryOptions, long queryTime)
+{
+this.queryTime = queryTime;
+this.queryOptions = queryOptions;
+this.protocolVersion = protocolVersion;
+this.keyspace = keyspace;
+}
+
+public abstract ListenableFuture 
execute(Session session);
+
+/**
+ * used when storing the queries executed
+ */
+public abstract BinLog.ReleaseableWriteMarshallable toMarshallable();
+
+/**
+ * Make sure we catch any query errors
+ *
+ * On error, this creates a failed ComparableResultSet with the 
exception set to be able to store
+ * this fact in the result file and handle comparison of failed result 
sets.
+ */
+ListenableFuture 
handleErrors(ListenableFuture result)
+{
+FluentFuture fluentFuture = 
FluentFuture.from(result)
+   
.transform(DriverResultSet::new, MoreExecutors.directExecutor());
+return fluentFuture.catching(Throwable.class, 
DriverResultSet::failed, MoreExecutors.directExecutor());
+}
+
+public boolean equals(Object o)
+{
+if (this == o) return true;
+if (!(o instanceof FQLQuery)) return false;
+FQLQuery fqlQuery = (FQLQuery) o;
+return queryTime == fqlQuery.queryTime &&
+   protocolVersion == fqlQuery.protocolVersion &&
+   
queryOptions.getValues().equals(fqlQuery.queryOptions.getValues()) &&
+   Objects.equals(keyspace, fqlQuery.keyspace);
+}
+
+public int hashCode()
+{
+return Objects.hash(queryTime, queryOptions, protocolVersion, 
keyspace);
+}
+
+public int compareTo(FQLQuery other)
+{
+return Longs.compare(queryTime, other.queryTime);
+}
+
+public static class Single extends FQLQuery
+{
+public final String query;
+public final List values;
+
+public Single(String keyspace, int protocolVersion, QueryOptions 
queryOptions, long queryTime, String queryString, List values)
+{
+super(keyspace, protocolVersion, queryOptions, queryTime);
+this.query = queryString;
+this.values = values;
+}

[GitHub] cassandra pull request #255: Marcuse/14618

2018-08-29 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/255#discussion_r213562501
  
--- Diff: src/java/org/apache/cassandra/tools/fqltool/FQLQuery.java ---
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools.fqltool;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import com.google.common.primitives.Longs;
+import com.google.common.util.concurrent.FluentFuture;
+import com.google.common.util.concurrent.Futures;
--- End diff --

unused import.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #255: Marcuse/14618

2018-08-29 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/255#discussion_r213558579
  
--- Diff: src/java/org/apache/cassandra/tools/fqltool/QueryReplayer.java ---
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools.fqltool;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import javax.annotation.Nullable;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Timer;
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Session;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class QueryReplayer implements Closeable
+{
+private final ExecutorService es = Executors.newFixedThreadPool(1);
+private final Iterator> queryIterator;
+private final List targetHosts;
+private final List targetClusters;
+private final List> filters;
+private final List sessions;
+private final ResultHandler resultHandler;
+private final MetricRegistry metrics = new MetricRegistry();
+private final boolean debug;
+private final PrintStream out;
+
+public QueryReplayer(Iterator> queryIterator,
+ List targetHosts,
+ List resultPaths,
+ List> filters,
+ PrintStream out,
+ String useKeyspace,
+ String queryFilePathString,
+ boolean debug)
+{
+this.queryIterator = queryIterator;
+this.targetHosts = targetHosts;
+targetClusters = targetHosts.stream().map(h -> 
Cluster.builder().addContactPoint(h).build()).collect(Collectors.toList());
+this.filters = filters;
+sessions = useKeyspace != null ?
+   targetClusters.stream().map(c -> 
c.connect(useKeyspace)).collect(Collectors.toList()) :
+   
targetClusters.stream().map(Cluster::connect).collect(Collectors.toList());
+File queryFilePath = queryFilePathString != null ? new 
File(queryFilePathString) : null;
+resultHandler = new ResultHandler(targetHosts, resultPaths, 
queryFilePath);
+this.debug = debug;
+this.out = out;
+}
+
+public void replay()
+{
+while (queryIterator.hasNext())
+{
+List queries = queryIterator.next();
+for (FQLQuery query : queries)
+{
+if (filters.stream().anyMatch(f -> !f.test(query)))
+continue;
+try (Timer.Context ctx = metrics.timer("queries").time())
+{
+
List> results = new 
ArrayList<>(sessions.size());
+for (Session session : sessions)
+{
+try
+{
+if (query.keyspace != null && 
!query.keyspace.equals(session.getLoggedKeyspace()))
+{
+if (debug)
+{
+out.println(String.format("Switching 
keyspace from %s to %s", session.getLoggedKeyspace(), query.keyspace));
+

[GitHub] cassandra pull request #255: Marcuse/14618

2018-08-29 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/255#discussion_r213558075
  
--- Diff: src/java/org/apache/cassandra/tools/fqltool/QueryReplayer.java ---
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools.fqltool;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import javax.annotation.Nullable;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Timer;
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Session;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class QueryReplayer implements Closeable
+{
+private final ExecutorService es = Executors.newFixedThreadPool(1);
+private final Iterator> queryIterator;
+private final List targetHosts;
+private final List targetClusters;
+private final List> filters;
+private final List sessions;
+private final ResultHandler resultHandler;
+private final MetricRegistry metrics = new MetricRegistry();
+private final boolean debug;
+private final PrintStream out;
+
+public QueryReplayer(Iterator> queryIterator,
+ List targetHosts,
+ List resultPaths,
+ List> filters,
+ PrintStream out,
+ String useKeyspace,
+ String queryFilePathString,
+ boolean debug)
+{
+this.queryIterator = queryIterator;
+this.targetHosts = targetHosts;
+targetClusters = targetHosts.stream().map(h -> 
Cluster.builder().addContactPoint(h).build()).collect(Collectors.toList());
+this.filters = filters;
+sessions = useKeyspace != null ?
+   targetClusters.stream().map(c -> 
c.connect(useKeyspace)).collect(Collectors.toList()) :
+   
targetClusters.stream().map(Cluster::connect).collect(Collectors.toList());
+File queryFilePath = queryFilePathString != null ? new 
File(queryFilePathString) : null;
+resultHandler = new ResultHandler(targetHosts, resultPaths, 
queryFilePath);
+this.debug = debug;
+this.out = out;
+}
+
+public void replay()
+{
+while (queryIterator.hasNext())
+{
+List queries = queryIterator.next();
+for (FQLQuery query : queries)
+{
+if (filters.stream().anyMatch(f -> !f.test(query)))
+continue;
+try (Timer.Context ctx = metrics.timer("queries").time())
+{
+
List> results = new 
ArrayList<>(sessions.size());
+for (Session session : sessions)
+{
+try
+{
+if (query.keyspace != null && 
!query.keyspace.equals(session.getLoggedKeyspace()))
+{
+if (debug)
+{
+out.println(String.format("Switching 
keyspace from %s to %s", session.getLoggedKeyspace(), query.keyspac

[GitHub] cassandra pull request #255: Marcuse/14618

2018-08-29 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/255#discussion_r213563877
  
--- Diff: src/java/org/apache/cassandra/tools/fqltool/FQLQueryReader.java 
---
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools.fqltool;
+
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import com.datastax.driver.core.BatchStatement;
+import io.netty.buffer.Unpooled;
+import net.openhft.chronicle.core.io.IORuntimeException;
+import net.openhft.chronicle.wire.ReadMarshallable;
+import net.openhft.chronicle.wire.ValueIn;
+import net.openhft.chronicle.wire.WireIn;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.transport.ProtocolVersion;
+
+public class FQLQueryReader implements ReadMarshallable
+{
+private FQLQuery query;
+private final boolean legacyFiles;
+
+public FQLQueryReader()
+{
+this(false);
+}
+
+public FQLQueryReader(boolean legacyFiles)
+{
+this.legacyFiles = legacyFiles;
+}
+
+public void readMarshallable(WireIn wireIn) throws IORuntimeException
+{
+String type = wireIn.read("type").text();
+int protocolVersion = wireIn.read("protocol-version").int32();
+QueryOptions queryOptions = 
QueryOptions.codec.decode(Unpooled.wrappedBuffer(wireIn.read("query-options").bytes()),
 ProtocolVersion.decode(protocolVersion));
+long queryTime = wireIn.read("query-time").int64();
+String keyspace = legacyFiles ? null : 
wireIn.read("keyspace").text();
+switch (type)
+{
+case "single":
+String queryString = wireIn.read("query").text();
+query = new FQLQuery.Single(keyspace, protocolVersion, 
queryOptions, queryTime, queryString, queryOptions.getValues());
+break;
+case "batch":
+BatchStatement.Type batchType = 
BatchStatement.Type.valueOf(wireIn.read("batch-type").text());
+ValueIn in = wireIn.read("queries");
+int queryCount = in.int32();
+
+List queries = new ArrayList<>(queryCount);
+for (int i = 0; i < queryCount; i++)
+queries.add(in.text());
+in = wireIn.read("values");
+int valueCount = in.int32();
+List> values = new 
ArrayList<>(valueCount);
+for (int ii = 0; ii < valueCount; ii++)
+{
+List subValues = new ArrayList<>();
+values.add(subValues);
+int numSubValues = in.int32();
+for (int zz = 0; zz < numSubValues; zz++)
+{
--- End diff --

Redundant braces


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #255: Marcuse/14618

2018-08-29 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/255#discussion_r213564246
  
--- Diff: src/java/org/apache/cassandra/tools/fqltool/ResultComparator.java 
---
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools.fqltool;
+
+
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.Streams;
+
+public class ResultComparator
+{
+/**
+ * Compares the rows in rows
+ * the row at position x in rows will have come from host at position 
x in targetHosts
+ */
+public boolean compareRows(List targetHosts, FQLQuery query, 
List rows)
+{
+if (rows.size() < 2 || rows.stream().allMatch(Objects::isNull))
+return true;
+
+if (rows.stream().anyMatch(Objects::isNull))
+{
+handleMismatch(targetHosts, query, rows);
+return false;
+}
+
+ResultHandler.ComparableRow ref = rows.get(0);
+boolean equal = true;
+for (int i = 1; i < rows.size(); i++)
+{
+ResultHandler.ComparableRow compare = rows.get(i);
+if (!ref.equals(compare))
+{
+equal = false;
+handleMismatch(targetHosts, query, rows);
+}
+}
+return equal;
+}
+
+/**
+ * Compares the column definitions
+ *
+ * the column definitions at position x in cds will have come from 
host at position x in targetHosts
+ */
+public boolean compareColumnDefinitions(List targetHosts, 
FQLQuery query, List cds)
+{
+if (cds.size() < 2)
+return true;
+
+boolean equal = true;
+List refDefs = 
cds.get(0).asList();
+for (int i = 1; i < cds.size(); i++)
+{
+List toCompare = 
cds.get(i).asList();
+if (!refDefs.equals(toCompare))
+{
+handleColumnDefMismatch(targetHosts, query, cds);
+equal = false;
+}
+}
+return equal;
+}
+
+private void handleMismatch(List targetHosts, FQLQuery query, 
List rows)
+{
+System.out.println("MISMATCH:");
+System.out.println("Query = " + query);
+System.out.println("Results:");
+System.out.println(Streams.zip(rows.stream(), 
targetHosts.stream(), (r, host) -> String.format("%s: %s%n", host, r == null ? 
"null" : r)).collect(Collectors.joining()));
+}
+
+private void handleColumnDefMismatch(List targetHosts, 
FQLQuery query, List cds)
+{
+System.out.println("COLUMN DEFINITION MISMATCH:");
+System.out.println("Query = " + query);
+System.out.println("Results: ");
+System.out.println(Streams.zip(cds.stream(), targetHosts.stream(), 
(cd, host) -> String.format("%s: %s%n", host, 
columnDefinitionsString(cd))).collect(Collectors.joining()));
+}
+
+private String 
columnDefinitionsString(ResultHandler.ComparableColumnDefinitions cd)
+{
+StringBuilder sb = new StringBuilder();
+if (cd == null)
+{
--- End diff --

Redundant braces


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #255: Marcuse/14618

2018-08-29 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/255#discussion_r213562548
  
--- Diff: src/java/org/apache/cassandra/tools/fqltool/FQLQuery.java ---
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools.fqltool;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import com.google.common.primitives.Longs;
+import com.google.common.util.concurrent.FluentFuture;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+
+import com.datastax.driver.core.BatchStatement;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
--- End diff --

unused import.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #255: Marcuse/14618

2018-08-29 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/255#discussion_r213564366
  
--- Diff: src/java/org/apache/cassandra/tools/fqltool/ResultHandler.java ---
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools.fqltool;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import com.datastax.driver.core.ResultSet;
--- End diff --

Unused import.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #255: Marcuse/14618

2018-08-29 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/255#discussion_r213558547
  
--- Diff: src/java/org/apache/cassandra/tools/fqltool/QueryReplayer.java ---
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools.fqltool;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import javax.annotation.Nullable;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Timer;
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Session;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class QueryReplayer implements Closeable
+{
+private final ExecutorService es = Executors.newFixedThreadPool(1);
+private final Iterator> queryIterator;
+private final List targetHosts;
+private final List targetClusters;
+private final List> filters;
+private final List sessions;
+private final ResultHandler resultHandler;
+private final MetricRegistry metrics = new MetricRegistry();
+private final boolean debug;
+private final PrintStream out;
+
+public QueryReplayer(Iterator> queryIterator,
+ List targetHosts,
+ List resultPaths,
+ List> filters,
+ PrintStream out,
+ String useKeyspace,
+ String queryFilePathString,
+ boolean debug)
+{
+this.queryIterator = queryIterator;
+this.targetHosts = targetHosts;
+targetClusters = targetHosts.stream().map(h -> 
Cluster.builder().addContactPoint(h).build()).collect(Collectors.toList());
+this.filters = filters;
+sessions = useKeyspace != null ?
+   targetClusters.stream().map(c -> 
c.connect(useKeyspace)).collect(Collectors.toList()) :
+   
targetClusters.stream().map(Cluster::connect).collect(Collectors.toList());
+File queryFilePath = queryFilePathString != null ? new 
File(queryFilePathString) : null;
+resultHandler = new ResultHandler(targetHosts, resultPaths, 
queryFilePath);
+this.debug = debug;
+this.out = out;
+}
+
+public void replay()
+{
+while (queryIterator.hasNext())
+{
+List queries = queryIterator.next();
+for (FQLQuery query : queries)
+{
+if (filters.stream().anyMatch(f -> !f.test(query)))
+continue;
+try (Timer.Context ctx = metrics.timer("queries").time())
+{
+
List> results = new 
ArrayList<>(sessions.size());
+for (Session session : sessions)
+{
+try
+{
+if (query.keyspace != null && 
!query.keyspace.equals(session.getLoggedKeyspace()))
+{
+if (debug)
+{
+out.println(String.format("Switching 
keyspace from %s to %s", session.getLoggedKeyspace(), query.keyspace));
+

[GitHub] cassandra pull request #255: Marcuse/14618

2018-08-29 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/255#discussion_r213562817
  
--- Diff: src/java/org/apache/cassandra/tools/fqltool/FQLQuery.java ---
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools.fqltool;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import com.google.common.primitives.Longs;
+import com.google.common.util.concurrent.FluentFuture;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+
+import com.datastax.driver.core.BatchStatement;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.SimpleStatement;
+import org.apache.cassandra.audit.FullQueryLogger;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.binlog.BinLog;
+
+public abstract class FQLQuery implements Comparable
+{
+public final long queryTime;
+public final QueryOptions queryOptions;
+public final int protocolVersion;
+public final String keyspace;
+
+public FQLQuery(String keyspace, int protocolVersion, QueryOptions 
queryOptions, long queryTime)
--- End diff --

Is there a reason why you didn't use `o.a.c.t.ProtocolVersion`?


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #244: Refactor and add samplers for CASSANDRA-14436

2018-08-16 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/244#discussion_r210488658
  
--- Diff: src/java/org/apache/cassandra/metrics/MaxSampler.java ---
@@ -0,0 +1,59 @@
+package org.apache.cassandra.metrics;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+import com.google.common.collect.MinMaxPriorityQueue;
+
+public abstract class MaxSampler extends Sampler
+{
+private int capacity;
+private MinMaxPriorityQueue> queue;
+private long endTimeMillis = -1;
+private final Comparator> comp = 
Collections.reverseOrder(Comparator.comparing(p -> p.count));
+
+public boolean isEnabled()
+{
+return endTimeMillis != -1 && clock.currentTimeMillis() <= 
endTimeMillis;
+}
+
--- End diff --

Nit: Extra space.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #244: Refactor and add samplers for CASSANDRA-14436

2018-08-16 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/244#discussion_r210487768
  
--- Diff: src/java/org/apache/cassandra/db/ReadExecutionController.java ---
@@ -132,6 +145,15 @@ public void close()
 {
 if (baseOp != null)
 baseOp.close();
+
+if (startTimeNanos != -1)
+{
+String cql = command.toCQLString();
+int timeMicros = (int) 
Math.min(TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - startTimeNanos), 
Integer.MAX_VALUE);
--- End diff --

This should be `o.a.c.u.Clock`


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra issue #244: Refactor and add samplers for CASSANDRA-14436

2018-08-03 Thread dineshjoshi
Github user dineshjoshi commented on the issue:

https://github.com/apache/cassandra/pull/244
  
Stepping back a bit, I see the samplers are stateful classes that are 
enabled and disabled. This means, if there is an exception in the rmi thread 
that is executing the `beginLocalSampling` and `finishLocalSampling`, the 
samplers will continue to run indefinitely and this might cause issues. It 
would be best to instantiate Samplers on demand with a specific Duration. Each 
sampler can stop accepting new samples once the duration expires. This would 
also mean that you no longer have to keep enabling disabling samplers - 
allowing you to get rid of `enabled` and other internal state variables could 
be made immutable for example in `FrequencySampler`, `StreamSummary` can be 
declared as `final` and initialized in the constructor.

If you want all samplers to start sampling exactly at the same moment (not 
sure if that is a requirement) then you could potentially use a shared 
countdown latch. The thread instantiating the samplers can decrement it once it 
is done creating and initializing all samplers. WDYT?


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #244: Refactor and add samplers for CASSANDRA-14436

2018-08-03 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/244#discussion_r207697381
  
--- Diff: src/java/org/apache/cassandra/metrics/FrequencySampler.java ---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.metrics;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.clearspring.analytics.stream.StreamSummary;
+
+/**
+ * Find the most frequent sample. A sample adds to the sum of its key ie
+ * add("x", 10); and add("x", 20); will result in "x" = 30 This 
uses StreamSummary to only store the
+ * approximate cardinality (capacity) of keys. If the number of distinct 
keys exceed the capacity, the error of the
+ * sample may increase depending on distribution of keys among the total 
set.
+ * 
+ * @param 
+ */
+public abstract class FrequencySampler extends Sampler
+{
+private static final Logger logger = 
LoggerFactory.getLogger(FrequencySampler.class);
+private boolean enabled = false;
+
+private StreamSummary summary;
+
+/**
+ * Start to record samples
+ *
+ * @param capacity
+ *Number of sample items to keep in memory, the lower this 
is
+ *the less accurate results are. For best results use value
+ *close to cardinality, but understand the memory trade 
offs.
+ */
+public synchronized void beginSampling(int capacity)
+{
+if (!enabled)
+{
+summary = new StreamSummary(capacity);
+enabled = true;
+}
+}
+
+/**
+ * Call to stop collecting samples, and gather the results
+ * @param count Number of most frequent items to return
+ */
+public synchronized List> finishSampling(int count)
+{
+List> results = Collections.EMPTY_LIST;
--- End diff --

`Collections.emptyList()` is safer.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #244: Refactor and add samplers for CASSANDRA-14436

2018-08-03 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/244#discussion_r207697017
  
--- Diff: src/java/org/apache/cassandra/metrics/TableMetrics.java ---
@@ -281,7 +301,7 @@ public Long getValue()
 public final Meter readRepairRequests;
 public final Meter shortReadProtectionRequests;
 
-public final Map> samplers;
+public final Map> samplers;
--- End diff --

This can be replaced with an `EnumMap`.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #244: Refactor and add samplers for CASSANDRA-14436

2018-08-03 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/244#discussion_r207697707
  
--- Diff: src/java/org/apache/cassandra/metrics/Sampler.java ---
@@ -0,0 +1,67 @@
+package org.apache.cassandra.metrics;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+
+public abstract class Sampler
--- End diff --

It would be nice to have a jmh benchmark for the new `Sampler`s


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #244: Refactor and add samplers for CASSANDRA-14436

2018-08-03 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/244#discussion_r207697487
  
--- Diff: src/java/org/apache/cassandra/metrics/Sampler.java ---
@@ -0,0 +1,67 @@
+package org.apache.cassandra.metrics;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+
+public abstract class Sampler
+{
+public enum SamplerType
+{
+READS, WRITES, LOCAL_READ_TIME, WRITE_SIZE, CAS_CONTENTIONS
+}
+
+@VisibleForTesting
+static final ThreadPoolExecutor samplerExecutor = new 
JMXEnabledThreadPoolExecutor(1, 1,
+TimeUnit.SECONDS,
+new LinkedBlockingQueue(),
+new NamedThreadFactory("Sampler"),
+"internal");
+
+public void addSample(final T item, final int value)
+{
+if (isEnabled())
+samplerExecutor.execute(() -> insert(item, value));
+}
+
+protected abstract void insert(T item, long value);
+
+public abstract boolean isEnabled();
+
+public abstract void beginSampling(int capacity);
+
+public abstract List> finishSampling(int count);
+
+public abstract String toString(T value);
+
+/**
+ * Represents the ranked items collected during a sample period
+ */
+public static class Sample implements Serializable
+{
+
--- End diff --

Extra white space?


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #244: Refactor and add samplers for CASSANDRA-14436

2018-08-03 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/244#discussion_r207697651
  
--- Diff: src/java/org/apache/cassandra/tools/nodetool/ProfileLoad.java ---
@@ -0,0 +1,178 @@
+package org.apache.cassandra.tools.nodetool;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.commons.lang3.StringUtils.join;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.OpenDataException;
+
+import org.apache.cassandra.metrics.Sampler.SamplerType;
+import org.apache.cassandra.tools.NodeProbe;
+import org.apache.cassandra.tools.NodeTool.NodeToolCmd;
+import org.apache.cassandra.tools.nodetool.formatter.TableBuilder;
+import org.apache.cassandra.utils.Pair;
+
+import com.google.common.collect.Lists;
+
+import io.airlift.airline.Arguments;
+import io.airlift.airline.Command;
+import io.airlift.airline.Option;
+
+@Command(name = "profileload", description = "Low footprint profiling of 
activity for a period of time")
+public class ProfileLoad extends NodeToolCmd
+{
+@Arguments(usage = "  ", description = 
"The keyspace, column family name, and duration in milliseconds")
+private List args = new ArrayList<>();
+
+@Option(name = "-s", description = "Capacity of the sampler, higher 
for more accuracy (Default: 256)")
+private int capacity = 256;
+
+@Option(name = "-k", description = "Number of the top samples to list 
(Default: 10)")
+private int topCount = 10;
+
+@Option(name = "-a", description = "Comma separated list of samplers 
to use (Default: all)")
+private String samplers = join(SamplerType.values(), ',');
+
+@Override
+public void execute(NodeProbe probe)
+{
+checkArgument(args.size() == 3 || args.size() == 1 || args.size() 
== 0, "Invalid arguments, either [keyspace table duration] or [duration] or no 
args");
+checkArgument(topCount < capacity, "TopK count (-k) option must be 
smaller then the summary capacity (-s)");
+String keyspace = null;
+String table = null;
+Integer duration = 1;
+if(args.size() == 3)
+{
+keyspace = args.get(0);
+table = args.get(1);
+duration = Integer.valueOf(args.get(2));
+}
+else if (args.size() == 1)
+{
+duration = Integer.valueOf(args.get(0));
+}
+// generate the list of samplers
+List targets = Lists.newArrayList();
+List available = 
Arrays.stream(SamplerType.values()).map(Enum::toString).collect(Collectors.toList());
+for (String s : samplers.split(","))
+{
+String sampler = s.trim().toUpperCase();
+checkArgument(available.contains(sampler), String.format("'%s' 
sampler is not available from: %s", s, Arrays.toString(SamplerType.values(;
+targets.add(sampler);
+}
+
+Map> results;
+try
+{
+if (keyspace == null)
+{
--- End diff --

Braces are unnecessary.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #244: Refactor and add samplers for CASSANDRA-14436

2018-08-03 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/244#discussion_r207697217
  
--- Diff: src/java/org/apache/cassandra/db/ReadExecutionController.java ---
@@ -113,6 +123,11 @@ static ReadExecutionController forCommand(ReadCommand 
command)
 throw e;
 }
 }
+if (baseCfs.metric.topLocalReadQueryTime.isEnabled())
+{
--- End diff --

Single line if conditions don't need braces.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #244: Refactor and add samplers for CASSANDRA-14436

2018-08-03 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/244#discussion_r207697430
  
--- Diff: src/java/org/apache/cassandra/metrics/FrequencySampler.java ---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.metrics;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.clearspring.analytics.stream.StreamSummary;
+
+/**
+ * Find the most frequent sample. A sample adds to the sum of its key ie
+ * add("x", 10); and add("x", 20); will result in "x" = 30 This 
uses StreamSummary to only store the
+ * approximate cardinality (capacity) of keys. If the number of distinct 
keys exceed the capacity, the error of the
+ * sample may increase depending on distribution of keys among the total 
set.
+ * 
+ * @param 
+ */
+public abstract class FrequencySampler extends Sampler
+{
+private static final Logger logger = 
LoggerFactory.getLogger(FrequencySampler.class);
+private boolean enabled = false;
+
+private StreamSummary summary;
+
+/**
+ * Start to record samples
+ *
+ * @param capacity
+ *Number of sample items to keep in memory, the lower this 
is
+ *the less accurate results are. For best results use value
+ *close to cardinality, but understand the memory trade 
offs.
+ */
+public synchronized void beginSampling(int capacity)
+{
+if (!enabled)
+{
+summary = new StreamSummary(capacity);
+enabled = true;
+}
+}
+
+/**
+ * Call to stop collecting samples, and gather the results
+ * @param count Number of most frequent items to return
+ */
+public synchronized List> finishSampling(int count)
+{
+List> results = Collections.EMPTY_LIST;
+if (enabled)
+{
+enabled = false;
+results = summary.topK(count)
+ .stream()
+ .map(c -> new Sample(c.getItem(), 
c.getCount(), c.getError()))
+ .collect(Collectors.toList());
+}
+return results;
+}
+
+protected synchronized void insert(final T item, final long value)
+{
+// samplerExecutor is single threaded but still need
+// synchronization against jmx calls to finishSampling
+if (enabled && value > 0)
+{
+try
+{
+summary.offer(item, (int) Math.min(value, 
Integer.MAX_VALUE));
+} catch (Exception e)
+{
+logger.trace("Failure to offer sample", e);
+}
+}
+}
+
+public boolean isEnabled()
+{
+return enabled;
+}
+
+public void setEnabled(boolean enabled)
+{
+this.enabled = enabled;
--- End diff --

This allows the user of the class to enable the `FrequencySampler` without 
actually initializing the `summary` variable. This will cause an NPE.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #244: Refactor and add samplers for CASSANDRA-14436

2018-08-03 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/244#discussion_r207697637
  
--- Diff: src/java/org/apache/cassandra/tools/nodetool/ProfileLoad.java ---
@@ -0,0 +1,178 @@
+package org.apache.cassandra.tools.nodetool;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.commons.lang3.StringUtils.join;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.OpenDataException;
+
+import org.apache.cassandra.metrics.Sampler.SamplerType;
+import org.apache.cassandra.tools.NodeProbe;
+import org.apache.cassandra.tools.NodeTool.NodeToolCmd;
+import org.apache.cassandra.tools.nodetool.formatter.TableBuilder;
+import org.apache.cassandra.utils.Pair;
+
+import com.google.common.collect.Lists;
+
+import io.airlift.airline.Arguments;
+import io.airlift.airline.Command;
+import io.airlift.airline.Option;
+
+@Command(name = "profileload", description = "Low footprint profiling of 
activity for a period of time")
+public class ProfileLoad extends NodeToolCmd
+{
+@Arguments(usage = "  ", description = 
"The keyspace, column family name, and duration in milliseconds")
+private List args = new ArrayList<>();
+
+@Option(name = "-s", description = "Capacity of the sampler, higher 
for more accuracy (Default: 256)")
+private int capacity = 256;
+
+@Option(name = "-k", description = "Number of the top samples to list 
(Default: 10)")
+private int topCount = 10;
+
+@Option(name = "-a", description = "Comma separated list of samplers 
to use (Default: all)")
+private String samplers = join(SamplerType.values(), ',');
+
+@Override
+public void execute(NodeProbe probe)
+{
+checkArgument(args.size() == 3 || args.size() == 1 || args.size() 
== 0, "Invalid arguments, either [keyspace table duration] or [duration] or no 
args");
+checkArgument(topCount < capacity, "TopK count (-k) option must be 
smaller then the summary capacity (-s)");
+String keyspace = null;
+String table = null;
+Integer duration = 1;
--- End diff --

What is the unit for `duration`? It might be better to just use 
`java.time.Duration`?


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #244: Refactor and add samplers for CASSANDRA-14436

2018-08-03 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/244#discussion_r207697287
  
--- Diff: src/java/org/apache/cassandra/db/ReadExecutionController.java ---
@@ -132,6 +147,17 @@ public void close()
 {
 if (baseOp != null)
 baseOp.close();
+
+if (startTime != -1)
+{
+String cql = command.toCQLString();
+int time = (int) 
Math.min(TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - startTime), 
Integer.MAX_VALUE);
--- End diff --

`timeMillis` or `millis` for brevity?


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #244: Refactor and add samplers for CASSANDRA-14436

2018-08-03 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/244#discussion_r207697244
  
--- Diff: src/java/org/apache/cassandra/db/ReadExecutionController.java ---
@@ -113,6 +123,11 @@ static ReadExecutionController forCommand(ReadCommand 
command)
 throw e;
 }
 }
+if (baseCfs.metric.topLocalReadQueryTime.isEnabled())
+{
+result.startTime = System.nanoTime();
--- End diff --

I prefer naming primitive variables with units for example - 
`startTimeNanos`.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #244: Refactor and add samplers for CASSANDRA-14436

2018-08-03 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/244#discussion_r207697297
  
--- Diff: src/java/org/apache/cassandra/db/ReadExecutionController.java ---
@@ -132,6 +147,17 @@ public void close()
 {
 if (baseOp != null)
 baseOp.close();
+
+if (startTime != -1)
+{
+String cql = command.toCQLString();
+int time = (int) 
Math.min(TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - startTime), 
Integer.MAX_VALUE);
+ColumnFamilyStore cfs = 
ColumnFamilyStore.getIfExists(baseMetadata.id);
+if(cfs != null)
+{
--- End diff --

You can skip braces for single line if conditions.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra issue #239: [CASSANDRA-14556] Optimize Streaming

2018-07-26 Thread dineshjoshi
Github user dineshjoshi commented on the issue:

https://github.com/apache/cassandra/pull/239
  
@iamaleksey made a few more changes - 

1. Got rid of `IStreamWriter`
2. Ensured we're logging the configuration warning only once at start up 
iff zero copy streaming is enabled
3. Few stylistic changes


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #239: [CASSANDRA-14556] Optimize Streaming

2018-07-26 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/239#discussion_r205646170
  
--- Diff: src/java/org/apache/cassandra/db/streaming/ComponentManifest.java 
---
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.streaming;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.collect.Iterators;
+
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+
+public final class ComponentManifest implements Iterable
+{
+private final LinkedHashMap components;
+
+public ComponentManifest(Map components)
+{
+this.components = new LinkedHashMap<>(components);
+}
+
+public long sizeOf(Component component)
+{
+Long size = components.get(component);
+if (size == null)
+throw new IllegalArgumentException("Component " + component + 
" is not present in the manifest");
+return size;
+}
+
+public long totalSize()
+{
+long totalSize = 0;
+for (Long size : components.values())
+totalSize += size;
+return totalSize;
+}
+
+public List components()
+{
+return new ArrayList<>(components.keySet());
+}
+
+@Override
+public boolean equals(Object o)
+{
+if (this == o)
+return true;
+
+if (!(o instanceof ComponentManifest))
+return false;
+
+ComponentManifest that = (ComponentManifest) o;
+return components.equals(that.components);
+}
+
+@Override
+public int hashCode()
+{
+return components.hashCode();
+}
+
+public static final IVersionedSerializer serializer 
= new IVersionedSerializer()
+{
+public void serialize(ComponentManifest manifest, DataOutputPlus 
out, int version) throws IOException
+{
+out.writeUnsignedVInt(manifest.components.size());
+for (Map.Entry entry : 
manifest.components.entrySet())
+{
+out.writeByte(entry.getKey().type.id);
--- End diff --

Done. I'm just using `component.name`. I think this should be sufficient 
for this PR's scope.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #239: [CASSANDRA-14556] Optimize Streaming

2018-07-26 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/239#discussion_r205639791
  
--- Diff: 
src/java/org/apache/cassandra/db/streaming/CassandraBlockStreamReader.java ---
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.streaming;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+
+import com.google.common.base.Throwables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTableMultiWriter;
+import org.apache.cassandra.io.sstable.format.big.BigTableBlockWriter;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.streaming.ProgressInfo;
+import org.apache.cassandra.streaming.StreamReceiver;
+import org.apache.cassandra.streaming.StreamSession;
+import org.apache.cassandra.streaming.messages.StreamMessageHeader;
+
+import static java.lang.String.format;
+import static org.apache.cassandra.utils.FBUtilities.prettyPrintMemory;
+
+/**
+ * CassandraBlockStreamReader reads SSTable off the wire and writes it to 
disk.
+ */
+public class CassandraBlockStreamReader implements IStreamReader
+{
+private static final Logger logger = 
LoggerFactory.getLogger(CassandraBlockStreamReader.class);
+
+private final TableId tableId;
+private final StreamSession session;
+private final CassandraStreamHeader header;
+private final int fileSequenceNumber;
+
+public CassandraBlockStreamReader(StreamMessageHeader messageHeader, 
CassandraStreamHeader streamHeader, StreamSession session)
+{
+if (session.getPendingRepair() != null)
+{
+// we should only ever be streaming pending repair sstables if 
the session has a pending repair id
+if 
(!session.getPendingRepair().equals(messageHeader.pendingRepair))
+throw new IllegalStateException(format("Stream Session & 
SSTable (%s) pendingRepair UUID mismatch.", messageHeader.tableId));
+}
+
+this.header = streamHeader;
+this.session = session;
+this.tableId = messageHeader.tableId;
+this.fileSequenceNumber = messageHeader.sequenceNumber;
+}
+
+/**
+ * @param inputPlus where this reads data from
+ * @return SSTable transferred
+ * @throws IOException if reading the remote sstable fails. Will throw 
an RTE if local write fails.
+ */
+@SuppressWarnings("resource") // input needs to remain open, streams 
on top of it can't be closed
+@Override
+public SSTableMultiWriter read(DataInputPlus inputPlus) throws 
IOException
+{
+ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(tableId);
+if (cfs == null)
+{
+// schema was dropped during streaming
+throw new IOException("Table " + tableId + " was dropped 
during streaming");
+}
+
+ComponentManifest manifest = header.componentManifest;
+long totalSize = manifest.totalSize();
+
+logger.debug("[Stream #{}] Started receiving sstable #{} from {}, 
size = {}, table = {}",
+ session.planId(),
+ fileSequenceNumber,
+ session.peer,
+ prettyPrintMemory(totalSize),
+ cfs.metadata());
+
+  

[GitHub] cassandra pull request #239: [CASSANDRA-14556] Optimize Streaming

2018-07-26 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/239#discussion_r205532095
  
--- Diff: 
src/java/org/apache/cassandra/db/streaming/CassandraBlockStreamWriter.java ---
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.streaming;
+
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
+import org.apache.cassandra.net.async.ByteBufDataOutputStreamPlus;
+import org.apache.cassandra.streaming.ProgressInfo;
+import org.apache.cassandra.streaming.StreamManager;
+import org.apache.cassandra.streaming.StreamSession;
+
+import static 
org.apache.cassandra.streaming.StreamManager.StreamRateLimiter;
+import static org.apache.cassandra.utils.FBUtilities.prettyPrintMemory;
+
+/**
+ * CassandraBlockStreamWriter streams the entire SSTable to given channel.
+ */
+public class CassandraBlockStreamWriter implements IStreamWriter
+{
+private static final Logger logger = 
LoggerFactory.getLogger(CassandraBlockStreamWriter.class);
+
+private final SSTableReader sstable;
+private final ComponentManifest manifest;
+private final StreamSession session;
+private final StreamRateLimiter limiter;
+
+public CassandraBlockStreamWriter(SSTableReader sstable, StreamSession 
session, ComponentManifest manifest)
+{
+this.session = session;
+this.sstable = sstable;
+this.manifest = manifest;
+this.limiter =  StreamManager.getRateLimiter(session.peer);
+}
+
+/**
+ * Stream the entire file to given channel.
+ * 
+ *
+ * @param output where this writes data to
+ * @throws IOException on any I/O error
+ */
+@Override
+public void write(DataOutputStreamPlus output) throws IOException
+{
+long totalSize = manifest.totalSize();
+logger.debug("[Stream #{}] Start streaming sstable {} to {}, 
repairedAt = {}, totalSize = {}",
+ session.planId(),
+ sstable.getFilename(),
+ session.peer,
+ sstable.getSSTableMetadata().repairedAt,
+ prettyPrintMemory(totalSize));
+
+long progress = 0L;
+ByteBufDataOutputStreamPlus byteBufDataOutputStreamPlus = 
(ByteBufDataOutputStreamPlus) output;
+
+for (Component component : manifest.components())
+{
+@SuppressWarnings("resource") // this is closed after the file 
is transferred by ByteBufDataOutputStreamPlus
+FileChannel in = new 
RandomAccessFile(sstable.descriptor.filenameFor(component), "r").getChannel();
+
+// Total Length to transmit for this file
+long length = in.size();
+
+// tracks write progress
+logger.debug("[Stream #{}] Block streaming {}.{} gen {} 
component {} size {}", session.planId(),
+ sstable.getKeyspaceName(),
+ sstable.getColumnFamilyName(),
+ sstable.descriptor.generation,
+ component, length);
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #239: [CASSANDRA-14556] Optimize Streaming

2018-07-26 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/239#discussion_r205532049
  
--- Diff: 
test/unit/org/apache/cassandra/db/streaming/CassandraStreamHeaderTest.java ---
@@ -43,8 +51,38 @@ public void serializerTest()
  new 
ArrayList<>(),
  
((CompressionMetadata) null),
  0,
- 
SerializationHeader.makeWithoutStats(metadata).toComponent());
+ 
SerializationHeader.makeWithoutStats(metadata).toComponent(),
+ 
metadata.id);
 
 SerializationUtils.assertSerializationCycle(header, 
CassandraStreamHeader.serializer);
 }
+
+@Test
+public void serializerTest_FullSSTableTransfer()
+{
+String ddl = "CREATE TABLE tbl (k INT PRIMARY KEY, v INT)";
+TableMetadata metadata = CreateTableStatement.parse(ddl, 
"ks").build();
+
+ComponentManifest manifest = new ComponentManifest(new 
HashMap(ImmutableMap.of(Component.DATA, 100L)));
--- End diff --

Fixed. Not sure why I did this in the first place.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #239: [CASSANDRA-14556] Optimize Streaming

2018-07-26 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/239#discussion_r205532001
  
--- Diff: 
src/java/org/apache/cassandra/db/streaming/CassandraStreamHeader.java ---
@@ -183,9 +261,26 @@ public CassandraStreamHeader deserialize(DataInputPlus 
in, int version) throws I
 sections.add(new 
SSTableReader.PartitionPositionBounds(in.readLong(), in.readLong()));
 CompressionInfo compressionInfo = 
CompressionInfo.serializer.deserialize(in, version);
 int sstableLevel = in.readInt();
+
 SerializationHeader.Component header =  
SerializationHeader.serializer.deserialize(sstableVersion, in);
 
-return new CassandraStreamHeader(sstableVersion, format, 
estimatedKeys, sections, compressionInfo, sstableLevel, header);
+TableId tableId = TableId.deserialize(in);
+boolean fullStream = in.readBoolean();
+ComponentManifest manifest = null;
+DecoratedKey firstKey = null;
+
+if (fullStream)
+{
+manifest = ComponentManifest.serializer.deserialize(in, 
version);
+ByteBuffer keyBuf = ByteBufferUtil.readWithVIntLength(in);
+IPartitioner partitioner = 
partitionerMapper.apply(tableId);
+if (partitioner == null)
+throw new 
IllegalArgumentException(String.format("Could not determine partitioner for 
tableId {}", tableId));
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra issue #239: [CASSANDRA-14556] Optimize Streaming

2018-07-26 Thread dineshjoshi
Github user dineshjoshi commented on the issue:

https://github.com/apache/cassandra/pull/239
  
@iamaleksey I've addressed your comments including the one about disabling 
faster streaming for legacy counter shards.

I did add a much less expensive check for STCS. It won't get all SSTables 
accurately but it is way cheaper than what I have for LCS. Let me know your 
thoughts.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #239: [CASSANDRA-14556] Optimize Streaming

2018-07-26 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/239#discussion_r205339231
  
--- Diff: 
src/java/org/apache/cassandra/db/streaming/CassandraBlockStreamWriter.java ---
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.streaming;
+
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
+import org.apache.cassandra.net.async.ByteBufDataOutputStreamPlus;
+import org.apache.cassandra.streaming.ProgressInfo;
+import org.apache.cassandra.streaming.StreamManager;
+import org.apache.cassandra.streaming.StreamSession;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static 
org.apache.cassandra.streaming.StreamManager.StreamRateLimiter;
+
+/**
+ * CassandraBlockStreamWriter streams the entire SSTable to given channel.
+ */
+public class CassandraBlockStreamWriter implements IStreamWriter
+{
+private static final Logger logger = 
LoggerFactory.getLogger(CassandraBlockStreamWriter.class);
+
+private final SSTableReader sstable;
+private final ComponentManifest manifest;
+private final StreamSession session;
+private final StreamRateLimiter limiter;
+
+public CassandraBlockStreamWriter(SSTableReader sstable, StreamSession 
session, ComponentManifest manifest)
+{
+this.session = session;
+this.sstable = sstable;
+this.manifest = manifest;
+this.limiter =  StreamManager.getRateLimiter(session.peer);
+}
+
+/**
+ * Stream the entire file to given channel.
+ * 
+ *
+ * @param output where this writes data to
+ * @throws IOException on any I/O error
+ */
+@Override
+public void write(DataOutputStreamPlus output) throws IOException
+{
+long totalSize = manifest.getTotalSize();
+logger.debug("[Stream #{}] Start streaming sstable {} to {}, 
repairedAt = {}, totalSize = {}", session.planId(),
+ sstable.getFilename(), session.peer, 
sstable.getSSTableMetadata().repairedAt, totalSize);
+
+long progress = 0L;
+ByteBufDataOutputStreamPlus byteBufDataOutputStreamPlus = 
(ByteBufDataOutputStreamPlus) output;
+
+for (Component component : manifest.getComponents())
+{
+@SuppressWarnings("resource") // this is closed after the file 
is transferred by ByteBufDataOutputStreamPlus
+FileChannel in = new 
RandomAccessFile(sstable.descriptor.filenameFor(component), "r").getChannel();
+
+// Total Length to transmit for this file
+long length = in.size();
+
+// tracks write progress
+long bytesRead = 0;
--- End diff --

I did rename this.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #239: [CASSANDRA-14556] Optimize Streaming

2018-07-25 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/239#discussion_r205299016
  
--- Diff: 
src/java/org/apache/cassandra/db/streaming/CassandraBlockStreamWriter.java ---
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.streaming;
+
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
+import org.apache.cassandra.net.async.ByteBufDataOutputStreamPlus;
+import org.apache.cassandra.streaming.ProgressInfo;
+import org.apache.cassandra.streaming.StreamManager;
+import org.apache.cassandra.streaming.StreamSession;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static 
org.apache.cassandra.streaming.StreamManager.StreamRateLimiter;
+
+/**
+ * CassandraBlockStreamWriter streams the entire SSTable to given channel.
+ */
+public class CassandraBlockStreamWriter implements IStreamWriter
+{
+private static final Logger logger = 
LoggerFactory.getLogger(CassandraBlockStreamWriter.class);
+
+private final SSTableReader sstable;
+private final ComponentManifest manifest;
+private final StreamSession session;
+private final StreamRateLimiter limiter;
+
+public CassandraBlockStreamWriter(SSTableReader sstable, StreamSession 
session, ComponentManifest manifest)
+{
+this.session = session;
+this.sstable = sstable;
+this.manifest = manifest;
+this.limiter =  StreamManager.getRateLimiter(session.peer);
+}
+
+/**
+ * Stream the entire file to given channel.
+ * 
+ *
+ * @param output where this writes data to
+ * @throws IOException on any I/O error
+ */
+@Override
+public void write(DataOutputStreamPlus output) throws IOException
+{
+long totalSize = manifest.getTotalSize();
+logger.debug("[Stream #{}] Start streaming sstable {} to {}, 
repairedAt = {}, totalSize = {}", session.planId(),
+ sstable.getFilename(), session.peer, 
sstable.getSSTableMetadata().repairedAt, totalSize);
+
+long progress = 0L;
+ByteBufDataOutputStreamPlus byteBufDataOutputStreamPlus = 
(ByteBufDataOutputStreamPlus) output;
+
+for (Component component : manifest.getComponents())
+{
+@SuppressWarnings("resource") // this is closed after the file 
is transferred by ByteBufDataOutputStreamPlus
+FileChannel in = new 
RandomAccessFile(sstable.descriptor.filenameFor(component), "r").getChannel();
+
+// Total Length to transmit for this file
+long length = in.size();
+
+// tracks write progress
+long bytesRead = 0;
+logger.debug("[Stream #{}] Block streaming {}.{} gen {} 
component {} size {}", session.planId(),
+sstable.getKeyspaceName(), 
sstable.getColumnFamilyName(), sstable.descriptor.generation, component, 
length);
+
+bytesRead += byteBufDataOutputStreamPlus.writeToChannel(in, 
limiter);
--- End diff --

Yes, this *should* be the same. I avoided making that assumption and 
instead use what `writeToChannel()` returns. It is also used to update the 
`session.progress()`. Ideally, `bytesRead == length`, else we have some bug. I 
did rewrite this to make it concise.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #239: [CASSANDRA-14556] Optimize Streaming

2018-07-25 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/239#discussion_r205298316
  
--- Diff: 
src/java/org/apache/cassandra/db/streaming/CassandraBlockStreamWriter.java ---
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.streaming;
+
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
+import org.apache.cassandra.net.async.ByteBufDataOutputStreamPlus;
+import org.apache.cassandra.streaming.ProgressInfo;
+import org.apache.cassandra.streaming.StreamManager;
+import org.apache.cassandra.streaming.StreamSession;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static 
org.apache.cassandra.streaming.StreamManager.StreamRateLimiter;
+
+/**
+ * CassandraBlockStreamWriter streams the entire SSTable to given channel.
+ */
+public class CassandraBlockStreamWriter implements IStreamWriter
+{
+private static final Logger logger = 
LoggerFactory.getLogger(CassandraBlockStreamWriter.class);
+
+private final SSTableReader sstable;
+private final ComponentManifest manifest;
+private final StreamSession session;
+private final StreamRateLimiter limiter;
+
+public CassandraBlockStreamWriter(SSTableReader sstable, StreamSession 
session, ComponentManifest manifest)
+{
+this.session = session;
+this.sstable = sstable;
+this.manifest = manifest;
+this.limiter =  StreamManager.getRateLimiter(session.peer);
+}
+
+/**
+ * Stream the entire file to given channel.
+ * 
+ *
+ * @param output where this writes data to
+ * @throws IOException on any I/O error
+ */
+@Override
+public void write(DataOutputStreamPlus output) throws IOException
+{
+long totalSize = manifest.getTotalSize();
+logger.debug("[Stream #{}] Start streaming sstable {} to {}, 
repairedAt = {}, totalSize = {}", session.planId(),
+ sstable.getFilename(), session.peer, 
sstable.getSSTableMetadata().repairedAt, totalSize);
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #239: [CASSANDRA-14556] Optimize Streaming

2018-07-25 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/239#discussion_r205298028
  
--- Diff: 
src/java/org/apache/cassandra/db/streaming/CassandraBlockStreamWriter.java ---
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.streaming;
+
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
+import org.apache.cassandra.net.async.ByteBufDataOutputStreamPlus;
+import org.apache.cassandra.streaming.ProgressInfo;
+import org.apache.cassandra.streaming.StreamManager;
+import org.apache.cassandra.streaming.StreamSession;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static 
org.apache.cassandra.streaming.StreamManager.StreamRateLimiter;
+
+/**
+ * CassandraBlockStreamWriter streams the entire SSTable to given channel.
+ */
+public class CassandraBlockStreamWriter implements IStreamWriter
+{
+private static final Logger logger = 
LoggerFactory.getLogger(CassandraBlockStreamWriter.class);
+
+private final SSTableReader sstable;
+private final ComponentManifest manifest;
+private final StreamSession session;
+private final StreamRateLimiter limiter;
+
+public CassandraBlockStreamWriter(SSTableReader sstable, StreamSession 
session, ComponentManifest manifest)
+{
+this.session = session;
+this.sstable = sstable;
+this.manifest = manifest;
+this.limiter =  StreamManager.getRateLimiter(session.peer);
+}
+
+/**
+ * Stream the entire file to given channel.
+ * 
+ *
+ * @param output where this writes data to
+ * @throws IOException on any I/O error
+ */
+@Override
+public void write(DataOutputStreamPlus output) throws IOException
+{
+long totalSize = manifest.getTotalSize();
+logger.debug("[Stream #{}] Start streaming sstable {} to {}, 
repairedAt = {}, totalSize = {}", session.planId(),
+ sstable.getFilename(), session.peer, 
sstable.getSSTableMetadata().repairedAt, totalSize);
+
+long progress = 0L;
+ByteBufDataOutputStreamPlus byteBufDataOutputStreamPlus = 
(ByteBufDataOutputStreamPlus) output;
+
+for (Component component : manifest.getComponents())
+{
+@SuppressWarnings("resource") // this is closed after the file 
is transferred by ByteBufDataOutputStreamPlus
+FileChannel in = new 
RandomAccessFile(sstable.descriptor.filenameFor(component), "r").getChannel();
+
+// Total Length to transmit for this file
+long length = in.size();
+
+// tracks write progress
+long bytesRead = 0;
+logger.debug("[Stream #{}] Block streaming {}.{} gen {} 
component {} size {}", session.planId(),
+sstable.getKeyspaceName(), 
sstable.getColumnFamilyName(), sstable.descriptor.generation, component, 
length);
+
+bytesRead += byteBufDataOutputStreamPlus.writeToChannel(in, 
limiter);
+progress += bytesRead;
+
+session.progress(sstable.descriptor.filenameFor(component), 
ProgressInfo.Direction.OUT, bytesRead,
+ length);
+
+logger.debug("[Stream #{}] Finished block streaming {}.{} gen 
{} component {} to {}, xfered = {}, length = {}, totalSize = {}",
+ session.planId(), sstable.getKeyspaceName(), 
sstable.getColumnFamilyName(),

[GitHub] cassandra pull request #239: [CASSANDRA-14556] Optimize Streaming

2018-07-25 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/239#discussion_r205297956
  
--- Diff: 
src/java/org/apache/cassandra/db/streaming/CassandraBlockStreamWriter.java ---
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.streaming;
+
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
+import org.apache.cassandra.net.async.ByteBufDataOutputStreamPlus;
+import org.apache.cassandra.streaming.ProgressInfo;
+import org.apache.cassandra.streaming.StreamManager;
+import org.apache.cassandra.streaming.StreamSession;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static 
org.apache.cassandra.streaming.StreamManager.StreamRateLimiter;
+
+/**
+ * CassandraBlockStreamWriter streams the entire SSTable to given channel.
+ */
+public class CassandraBlockStreamWriter implements IStreamWriter
+{
+private static final Logger logger = 
LoggerFactory.getLogger(CassandraBlockStreamWriter.class);
+
+private final SSTableReader sstable;
+private final ComponentManifest manifest;
+private final StreamSession session;
+private final StreamRateLimiter limiter;
+
+public CassandraBlockStreamWriter(SSTableReader sstable, StreamSession 
session, ComponentManifest manifest)
+{
+this.session = session;
+this.sstable = sstable;
+this.manifest = manifest;
+this.limiter =  StreamManager.getRateLimiter(session.peer);
+}
+
+/**
+ * Stream the entire file to given channel.
+ * 
+ *
+ * @param output where this writes data to
+ * @throws IOException on any I/O error
+ */
+@Override
+public void write(DataOutputStreamPlus output) throws IOException
+{
+long totalSize = manifest.getTotalSize();
+logger.debug("[Stream #{}] Start streaming sstable {} to {}, 
repairedAt = {}, totalSize = {}", session.planId(),
+ sstable.getFilename(), session.peer, 
sstable.getSSTableMetadata().repairedAt, totalSize);
+
+long progress = 0L;
+ByteBufDataOutputStreamPlus byteBufDataOutputStreamPlus = 
(ByteBufDataOutputStreamPlus) output;
+
+for (Component component : manifest.getComponents())
+{
+@SuppressWarnings("resource") // this is closed after the file 
is transferred by ByteBufDataOutputStreamPlus
+FileChannel in = new 
RandomAccessFile(sstable.descriptor.filenameFor(component), "r").getChannel();
+
+// Total Length to transmit for this file
+long length = in.size();
+
+// tracks write progress
+long bytesRead = 0;
+logger.debug("[Stream #{}] Block streaming {}.{} gen {} 
component {} size {}", session.planId(),
+sstable.getKeyspaceName(), 
sstable.getColumnFamilyName(), sstable.descriptor.generation, component, 
length);
+
+bytesRead += byteBufDataOutputStreamPlus.writeToChannel(in, 
limiter);
+progress += bytesRead;
+
+session.progress(sstable.descriptor.filenameFor(component), 
ProgressInfo.Direction.OUT, bytesRead,
+ length);
+
+logger.debug("[Stream #{}] Finished block streaming {}.{} gen 
{} component {} to {}, xfered = {}, length = {}, totalSize = {}",
+ session.planId(), sstable.getKeyspaceName(), 
sstable.getColumnFamilyName(),

[GitHub] cassandra pull request #239: [CASSANDRA-14556] Optimize Streaming

2018-07-25 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/239#discussion_r205297558
  
--- Diff: src/java/org/apache/cassandra/db/streaming/ComponentManifest.java 
---
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.streaming;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+
+public class ComponentManifest
+{
+private final LinkedHashMap manifest;
+private final Set components = new 
LinkedHashSet<>(Component.Type.values().length);
+private final long totalSize;
+
+public ComponentManifest(Map componentManifest)
+{
+this.manifest = new LinkedHashMap<>(componentManifest);
+
+long size = 0;
+for (Map.Entry entry : 
this.manifest.entrySet())
+{
+size += entry.getValue();
+this.components.add(Component.parse(entry.getKey().repr));
+}
+
+this.totalSize = size;
+}
+
+public Long getSizeForType(Component.Type type)
+{
+return manifest.get(type);
+}
+
+public long getTotalSize()
+{
+return totalSize;
+}
+
+public Set getComponents()
+{
+return Collections.unmodifiableSet(components);
+}
+
+public boolean equals(Object o)
+{
+if (this == o) return true;
+if (o == null || getClass() != o.getClass()) return false;
+ComponentManifest that = (ComponentManifest) o;
+return totalSize == that.totalSize &&
+   Objects.equals(manifest, that.manifest);
+}
+
+public int hashCode()
+{
+
+return Objects.hash(manifest, totalSize);
+}
+
+public static final IVersionedSerializer serializer 
= new IVersionedSerializer()
+{
+public void serialize(ComponentManifest manifest, DataOutputPlus 
out, int version) throws IOException
+{
+out.writeInt(manifest.manifest.size());
+for (Map.Entry entry : 
manifest.manifest.entrySet())
+serialize(entry.getKey(), entry.getValue(), out);
+}
+
+public ComponentManifest deserialize(DataInputPlus in, int 
version) throws IOException
+{
+LinkedHashMap components = new 
LinkedHashMap<>(Component.Type.values().length);
+
+int size = in.readInt();
+assert size >= 0 : "Invalid number of components";
+
+for (int i = 0; i < size; i++)
+{
+Component.Type type = 
Component.Type.fromRepresentation(in.readByte());
+long length = in.readLong();
+components.put(type, length);
+}
+
+return new ComponentManifest(components);
+}
+
+public long serializedSize(ComponentManifest manifest, int version)
+{
+long size = 0;
+size += TypeSizes.sizeof(manifest.manifest.size());
+for (Map.Entry entry : 
manifest.manifest.entrySet())
+{
+size += TypeSizes.sizeof(entry.getKey().id);
+size += TypeSizes.sizeof(entry.getValue());
+}
+return size;
+}
+
+private void serialize(Component.Type type, long size, 
DataOutputPlus out) th

[GitHub] cassandra pull request #239: [CASSANDRA-14556] Optimize Streaming

2018-07-25 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/239#discussion_r205297151
  
--- Diff: src/java/org/apache/cassandra/db/streaming/ComponentManifest.java 
---
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.streaming;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+
+public class ComponentManifest
+{
+private final LinkedHashMap manifest;
--- End diff --

I had originally thought of going in this direction. I have rewritten this 
class and incorporated your suggestion including most stylistic changes.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #239: [CASSANDRA-14556] Optimize Streaming

2018-07-25 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/239#discussion_r205297184
  
--- Diff: src/java/org/apache/cassandra/db/streaming/ComponentManifest.java 
---
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.streaming;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+
+public class ComponentManifest
+{
+private final LinkedHashMap manifest;
+private final Set components = new 
LinkedHashSet<>(Component.Type.values().length);
--- End diff --

Done.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #239: [CASSANDRA-14556] Optimize Streaming

2018-07-25 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/239#discussion_r205283093
  
--- Diff: 
src/java/org/apache/cassandra/io/sstable/format/big/BigTableBlockWriter.java ---
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.sstable.format.big;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.io.FSWriteError;
+import org.apache.cassandra.io.compress.BufferType;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTable;
+import org.apache.cassandra.io.sstable.SSTableMultiWriter;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.SequentialWriter;
+import org.apache.cassandra.io.util.SequentialWriterOption;
+import org.apache.cassandra.net.async.RebufferingByteBufDataInputPlus;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.schema.TableMetadataRef;
+
+public class BigTableBlockWriter extends SSTable implements 
SSTableMultiWriter
+{
+private final TableMetadataRef metadata;
+private final LifecycleTransaction txn;
+private volatile SSTableReader finalReader;
+private final Map componentWriters;
+
+private final Logger logger = 
LoggerFactory.getLogger(BigTableBlockWriter.class);
+
+private final SequentialWriterOption writerOption = 
SequentialWriterOption.newBuilder()
+   
   .trickleFsync(false)
+   
   .bufferSize(2 * 1024 * 1024)
+   
   .bufferType(BufferType.OFF_HEAP)
+   
   .build();
+public static final ImmutableSet supportedComponents = 
ImmutableSet.of(Component.DATA, Component.PRIMARY_INDEX, Component.STATS,
+   
Component.COMPRESSION_INFO, Component.FILTER, Component.SUMMARY,
+   
Component.DIGEST, Component.CRC);
+
+public BigTableBlockWriter(Descriptor descriptor,
+   TableMetadataRef metadata,
+   LifecycleTransaction txn,
+   final Set components)
+{
+super(descriptor, ImmutableSet.copyOf(components), metadata,
+  DatabaseDescriptor.getDiskOptimizationStrategy());
+txn.trackNew(this);
+this.metadata = metadata;
+this.txn = txn;
+this.componentWriters = new HashMap<>(components.size());
+
+assert supportedComponents.containsAll(components) : 
String.format("Unsupported streaming component detected %s",
+   
new HashSet(components).removeAll(supportedComponents));
+
+for (Component c : components)
+componentWriters.put(c.type, makeWriter(descriptor, c, 
writerOption));
+}
+
+private static SequentialWriter makeWrit

[GitHub] cassandra pull request #239: [CASSANDRA-14556] Optimize Streaming

2018-07-25 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/239#discussion_r205282807
  
--- Diff: 
src/java/org/apache/cassandra/io/sstable/format/big/BigTableBlockWriter.java ---
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.sstable.format.big;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.io.FSWriteError;
+import org.apache.cassandra.io.compress.BufferType;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTable;
+import org.apache.cassandra.io.sstable.SSTableMultiWriter;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.SequentialWriter;
+import org.apache.cassandra.io.util.SequentialWriterOption;
+import org.apache.cassandra.net.async.RebufferingByteBufDataInputPlus;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.schema.TableMetadataRef;
+
+public class BigTableBlockWriter extends SSTable implements 
SSTableMultiWriter
+{
+private final TableMetadataRef metadata;
+private final LifecycleTransaction txn;
+private volatile SSTableReader finalReader;
+private final Map componentWriters;
+
+private final Logger logger = 
LoggerFactory.getLogger(BigTableBlockWriter.class);
+
+private final SequentialWriterOption writerOption = 
SequentialWriterOption.newBuilder()
+   
   .trickleFsync(false)
+   
   .bufferSize(2 * 1024 * 1024)
+   
   .bufferType(BufferType.OFF_HEAP)
+   
   .build();
+public static final ImmutableSet supportedComponents = 
ImmutableSet.of(Component.DATA, Component.PRIMARY_INDEX, Component.STATS,
+   
Component.COMPRESSION_INFO, Component.FILTER, Component.SUMMARY,
+   
Component.DIGEST, Component.CRC);
+
+public BigTableBlockWriter(Descriptor descriptor,
+   TableMetadataRef metadata,
+   LifecycleTransaction txn,
+   final Set components)
+{
+super(descriptor, ImmutableSet.copyOf(components), metadata,
+  DatabaseDescriptor.getDiskOptimizationStrategy());
+txn.trackNew(this);
+this.metadata = metadata;
+this.txn = txn;
+this.componentWriters = new HashMap<>(components.size());
+
+assert supportedComponents.containsAll(components) : 
String.format("Unsupported streaming component detected %s",
+   
new HashSet(components).removeAll(supportedComponents));
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #239: [CASSANDRA-14556] Optimize Streaming

2018-07-25 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/239#discussion_r205282784
  
--- Diff: 
src/java/org/apache/cassandra/io/sstable/format/big/BigTableBlockWriter.java ---
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.sstable.format.big;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.io.FSWriteError;
+import org.apache.cassandra.io.compress.BufferType;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTable;
+import org.apache.cassandra.io.sstable.SSTableMultiWriter;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.SequentialWriter;
+import org.apache.cassandra.io.util.SequentialWriterOption;
+import org.apache.cassandra.net.async.RebufferingByteBufDataInputPlus;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.schema.TableMetadataRef;
+
+public class BigTableBlockWriter extends SSTable implements 
SSTableMultiWriter
+{
+private final TableMetadataRef metadata;
+private final LifecycleTransaction txn;
+private volatile SSTableReader finalReader;
+private final Map componentWriters;
+
+private final Logger logger = 
LoggerFactory.getLogger(BigTableBlockWriter.class);
+
+private final SequentialWriterOption writerOption = 
SequentialWriterOption.newBuilder()
+   
   .trickleFsync(false)
+   
   .bufferSize(2 * 1024 * 1024)
+   
   .bufferType(BufferType.OFF_HEAP)
+   
   .build();
+public static final ImmutableSet supportedComponents = 
ImmutableSet.of(Component.DATA, Component.PRIMARY_INDEX, Component.STATS,
+   
Component.COMPRESSION_INFO, Component.FILTER, Component.SUMMARY,
+   
Component.DIGEST, Component.CRC);
+
+public BigTableBlockWriter(Descriptor descriptor,
+   TableMetadataRef metadata,
+   LifecycleTransaction txn,
+   final Set components)
+{
+super(descriptor, ImmutableSet.copyOf(components), metadata,
+  DatabaseDescriptor.getDiskOptimizationStrategy());
+txn.trackNew(this);
+this.metadata = metadata;
+this.txn = txn;
--- End diff --

Removed.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #239: [CASSANDRA-14556] Optimize Streaming

2018-07-25 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/239#discussion_r205277660
  
--- Diff: 
src/java/org/apache/cassandra/db/streaming/CassandraBlockStreamReader.java ---
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.streaming;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Set;
+
+import com.google.common.base.Throwables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.SerializationHeader;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTableMultiWriter;
+import org.apache.cassandra.io.sstable.format.SSTableFormat;
+import org.apache.cassandra.io.sstable.format.Version;
+import org.apache.cassandra.io.sstable.format.big.BigTableBlockWriter;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.streaming.ProgressInfo;
+import org.apache.cassandra.streaming.StreamReceiver;
+import org.apache.cassandra.streaming.StreamSession;
+import org.apache.cassandra.streaming.messages.StreamMessageHeader;
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+ * CassandraBlockStreamReader reads SSTable off the wire and writes it to 
disk.
+ */
+public class CassandraBlockStreamReader implements IStreamReader
+{
+private static final Logger logger = 
LoggerFactory.getLogger(CassandraBlockStreamReader.class);
+private final TableId tableId;
+private final StreamSession session;
+private final int sstableLevel;
+private final SerializationHeader.Component header;
+private final int fileSeqNum;
+private final ComponentManifest manifest;
+private final SSTableFormat.Type format;
+private final Version version;
+private final DecoratedKey firstKey;
+
+public CassandraBlockStreamReader(StreamMessageHeader header, 
CassandraStreamHeader streamHeader, StreamSession session)
+{
+if (session.getPendingRepair() != null)
+{
+// we should only ever be streaming pending repair
+// sstables if the session has a pending repair id
+if (!session.getPendingRepair().equals(header.pendingRepair))
+throw new IllegalStateException(String.format("Stream 
Session & SSTable ({}) pendingRepair UUID mismatch.",
+  
header.tableId));
+}
+this.session = session;
+this.tableId = header.tableId;
+this.manifest = streamHeader.componentManifest;
+this.sstableLevel = streamHeader.sstableLevel;
+this.header = streamHeader.header;
+this.format = streamHeader.format;
+this.fileSeqNum = header.sequenceNumber;
+this.version = streamHeader.version;
+this.firstKey = streamHeader.firstKey;
+}
+
+/**
+ * @param inputPlus where this reads data from
+ * @return SSTable transferred
+ * @throws IOException if reading the remote sstable fails. Will throw 
an RTE if local write fails.
+ */
+@SuppressWarnings("resource") // input needs to remain open, streams 
on top of it can't be closed
+@Override
+public SSTableMultiWriter read(DataInputPlus inputPlus) throws 
IOException
+{
+long totalSize = manifest.getTotalSize();
+
+ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(tableId);
+
+if (cfs == nul

[GitHub] cassandra pull request #239: [CASSANDRA-14556] Optimize Streaming

2018-07-25 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/239#discussion_r205277545
  
--- Diff: 
src/java/org/apache/cassandra/db/streaming/CassandraBlockStreamReader.java ---
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.streaming;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Set;
+
+import com.google.common.base.Throwables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.SerializationHeader;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTableMultiWriter;
+import org.apache.cassandra.io.sstable.format.SSTableFormat;
+import org.apache.cassandra.io.sstable.format.Version;
+import org.apache.cassandra.io.sstable.format.big.BigTableBlockWriter;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.streaming.ProgressInfo;
+import org.apache.cassandra.streaming.StreamReceiver;
+import org.apache.cassandra.streaming.StreamSession;
+import org.apache.cassandra.streaming.messages.StreamMessageHeader;
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+ * CassandraBlockStreamReader reads SSTable off the wire and writes it to 
disk.
+ */
+public class CassandraBlockStreamReader implements IStreamReader
+{
+private static final Logger logger = 
LoggerFactory.getLogger(CassandraBlockStreamReader.class);
+private final TableId tableId;
+private final StreamSession session;
+private final int sstableLevel;
+private final SerializationHeader.Component header;
--- End diff --

Cherry picked.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #239: [CASSANDRA-14556] Optimize Streaming

2018-07-25 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/239#discussion_r205277492
  
--- Diff: 
src/java/org/apache/cassandra/db/streaming/CassandraBlockStreamReader.java ---
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.streaming;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Set;
+
+import com.google.common.base.Throwables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.SerializationHeader;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTableMultiWriter;
+import org.apache.cassandra.io.sstable.format.SSTableFormat;
+import org.apache.cassandra.io.sstable.format.Version;
+import org.apache.cassandra.io.sstable.format.big.BigTableBlockWriter;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.streaming.ProgressInfo;
+import org.apache.cassandra.streaming.StreamReceiver;
+import org.apache.cassandra.streaming.StreamSession;
+import org.apache.cassandra.streaming.messages.StreamMessageHeader;
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+ * CassandraBlockStreamReader reads SSTable off the wire and writes it to 
disk.
+ */
+public class CassandraBlockStreamReader implements IStreamReader
+{
+private static final Logger logger = 
LoggerFactory.getLogger(CassandraBlockStreamReader.class);
+private final TableId tableId;
+private final StreamSession session;
+private final int sstableLevel;
+private final SerializationHeader.Component header;
+private final int fileSeqNum;
+private final ComponentManifest manifest;
+private final SSTableFormat.Type format;
+private final Version version;
+private final DecoratedKey firstKey;
+
+public CassandraBlockStreamReader(StreamMessageHeader header, 
CassandraStreamHeader streamHeader, StreamSession session)
+{
+if (session.getPendingRepair() != null)
+{
+// we should only ever be streaming pending repair
+// sstables if the session has a pending repair id
+if (!session.getPendingRepair().equals(header.pendingRepair))
+throw new IllegalStateException(String.format("Stream 
Session & SSTable ({}) pendingRepair UUID mismatch.",
+  
header.tableId));
+}
+this.session = session;
+this.tableId = header.tableId;
+this.manifest = streamHeader.componentManifest;
+this.sstableLevel = streamHeader.sstableLevel;
+this.header = streamHeader.header;
+this.format = streamHeader.format;
+this.fileSeqNum = header.sequenceNumber;
+this.version = streamHeader.version;
+this.firstKey = streamHeader.firstKey;
+}
+
+/**
+ * @param inputPlus where this reads data from
+ * @return SSTable transferred
+ * @throws IOException if reading the remote sstable fails. Will throw 
an RTE if local write fails.
+ */
+@SuppressWarnings("resource") // input needs to remain open, streams 
on top of it can't be closed
+@Override
+public SSTableMultiWriter read(DataInputPlus inputPlus) throws 
IOException
+{
+long totalSize = manifest.getTotalSize();
+
+ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(tableId);
+
+if (cfs == nul

[GitHub] cassandra pull request #239: [CASSANDRA-14556] Optimize Streaming

2018-07-25 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/239#discussion_r205276519
  
--- Diff: 
src/java/org/apache/cassandra/db/streaming/CassandraBlockStreamReader.java ---
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.streaming;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Set;
+
+import com.google.common.base.Throwables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.SerializationHeader;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTableMultiWriter;
+import org.apache.cassandra.io.sstable.format.SSTableFormat;
+import org.apache.cassandra.io.sstable.format.Version;
+import org.apache.cassandra.io.sstable.format.big.BigTableBlockWriter;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.streaming.ProgressInfo;
+import org.apache.cassandra.streaming.StreamReceiver;
+import org.apache.cassandra.streaming.StreamSession;
+import org.apache.cassandra.streaming.messages.StreamMessageHeader;
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+ * CassandraBlockStreamReader reads SSTable off the wire and writes it to 
disk.
+ */
+public class CassandraBlockStreamReader implements IStreamReader
+{
+private static final Logger logger = 
LoggerFactory.getLogger(CassandraBlockStreamReader.class);
+private final TableId tableId;
+private final StreamSession session;
+private final int sstableLevel;
+private final SerializationHeader.Component header;
+private final int fileSeqNum;
+private final ComponentManifest manifest;
+private final SSTableFormat.Type format;
+private final Version version;
+private final DecoratedKey firstKey;
+
+public CassandraBlockStreamReader(StreamMessageHeader header, 
CassandraStreamHeader streamHeader, StreamSession session)
+{
+if (session.getPendingRepair() != null)
+{
+// we should only ever be streaming pending repair
+// sstables if the session has a pending repair id
+if (!session.getPendingRepair().equals(header.pendingRepair))
+throw new IllegalStateException(String.format("Stream 
Session & SSTable ({}) pendingRepair UUID mismatch.",
--- End diff --

Great catch. Yes, it used to be a logging call.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #239: [CASSANDRA-14556] Optimize Streaming

2018-07-25 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/239#discussion_r205276269
  
--- Diff: 
src/java/org/apache/cassandra/io/sstable/format/big/BigTableBlockWriter.java ---
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.sstable.format.big;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.io.FSWriteError;
+import org.apache.cassandra.io.compress.BufferType;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTable;
+import org.apache.cassandra.io.sstable.SSTableMultiWriter;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.SequentialWriter;
+import org.apache.cassandra.io.util.SequentialWriterOption;
+import org.apache.cassandra.net.async.RebufferingByteBufDataInputPlus;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.schema.TableMetadataRef;
+
+public class BigTableBlockWriter extends SSTable implements 
SSTableMultiWriter
+{
+private final TableMetadataRef metadata;
+private final LifecycleTransaction txn;
+private volatile SSTableReader finalReader;
+private final Map componentWriters;
+
+private final Logger logger = 
LoggerFactory.getLogger(BigTableBlockWriter.class);
+
+private final SequentialWriterOption writerOption = 
SequentialWriterOption.newBuilder()
+   
   .trickleFsync(false)
+   
   .bufferSize(2 * 1024 * 1024)
+   
   .bufferType(BufferType.OFF_HEAP)
+   
   .build();
+public static final ImmutableSet supportedComponents = 
ImmutableSet.of(Component.DATA, Component.PRIMARY_INDEX, Component.STATS,
+   
Component.COMPRESSION_INFO, Component.FILTER, Component.SUMMARY,
+   
Component.DIGEST, Component.CRC);
+
+public BigTableBlockWriter(Descriptor descriptor,
+   TableMetadataRef metadata,
+   LifecycleTransaction txn,
+   final Set components)
+{
+super(descriptor, ImmutableSet.copyOf(components), metadata,
+  DatabaseDescriptor.getDiskOptimizationStrategy());
+txn.trackNew(this);
+this.metadata = metadata;
+this.txn = txn;
+this.componentWriters = new HashMap<>(components.size());
+
+assert supportedComponents.containsAll(components) : 
String.format("Unsupported streaming component detected %s",
+   
new HashSet(components).removeAll(supportedComponents));
+
+for (Component c : components)
+componentWriters.put(c.type, makeWriter(descriptor, c, 
writerOption));
+}
+
+private static SequentialWriter makeWrit

[GitHub] cassandra pull request #239: [CASSANDRA-14556] Optimize Streaming

2018-07-25 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/239#discussion_r205276122
  
--- Diff: 
src/java/org/apache/cassandra/db/streaming/CassandraBlockStreamReader.java ---
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.streaming;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Set;
+
+import com.google.common.base.Throwables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.SerializationHeader;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTableMultiWriter;
+import org.apache.cassandra.io.sstable.format.SSTableFormat;
+import org.apache.cassandra.io.sstable.format.Version;
+import org.apache.cassandra.io.sstable.format.big.BigTableBlockWriter;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.streaming.ProgressInfo;
+import org.apache.cassandra.streaming.StreamReceiver;
+import org.apache.cassandra.streaming.StreamSession;
+import org.apache.cassandra.streaming.messages.StreamMessageHeader;
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+ * CassandraBlockStreamReader reads SSTable off the wire and writes it to 
disk.
+ */
+public class CassandraBlockStreamReader implements IStreamReader
+{
+private static final Logger logger = 
LoggerFactory.getLogger(CassandraBlockStreamReader.class);
+private final TableId tableId;
+private final StreamSession session;
+private final int sstableLevel;
+private final SerializationHeader.Component header;
+private final int fileSeqNum;
+private final ComponentManifest manifest;
+private final SSTableFormat.Type format;
+private final Version version;
+private final DecoratedKey firstKey;
+
+public CassandraBlockStreamReader(StreamMessageHeader header, 
CassandraStreamHeader streamHeader, StreamSession session)
+{
+if (session.getPendingRepair() != null)
+{
+// we should only ever be streaming pending repair
+// sstables if the session has a pending repair id
+if (!session.getPendingRepair().equals(header.pendingRepair))
+throw new IllegalStateException(String.format("Stream 
Session & SSTable ({}) pendingRepair UUID mismatch.",
+  
header.tableId));
+}
+this.session = session;
+this.tableId = header.tableId;
+this.manifest = streamHeader.componentManifest;
+this.sstableLevel = streamHeader.sstableLevel;
+this.header = streamHeader.header;
+this.format = streamHeader.format;
+this.fileSeqNum = header.sequenceNumber;
+this.version = streamHeader.version;
+this.firstKey = streamHeader.firstKey;
+}
+
+/**
+ * @param inputPlus where this reads data from
+ * @return SSTable transferred
+ * @throws IOException if reading the remote sstable fails. Will throw 
an RTE if local write fails.
+ */
+@SuppressWarnings("resource") // input needs to remain open, streams 
on top of it can't be closed
+@Override
+public SSTableMultiWriter read(DataInputPlus inputPlus) throws 
IOException
+{
+long totalSize = manifest.getTotalSize();
+
+ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(tableId);
+
+if (cfs == nul

[GitHub] cassandra pull request #239: [CASSANDRA-14556] Optimize Streaming

2018-07-25 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/239#discussion_r205275130
  
--- Diff: src/java/org/apache/cassandra/db/compaction/Verifier.java ---
@@ -361,12 +361,26 @@ public RangeOwnHelper(List> 
normalizedRanges)
  * @throws RuntimeException if the key is not contained
  */
 public void check(DecoratedKey key)
+{
+if (!checkBoolean(key))
--- End diff --

I am cherry picking this change. 

The only concern I have is we're now changing the signature and behavior of 
a pre-existing method. I generally try to avoid such changes to avoid confusing 
other authors.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra issue #239: [CASSANDRA-14556] Optimize Streaming

2018-07-25 Thread dineshjoshi
Github user dineshjoshi commented on the issue:

https://github.com/apache/cassandra/pull/239
  
@iamaleksey I *think* I've resolved all your comments. The last dtest was 
all green. I'm not sure about the latest push but I don't forsee any breakages. 
Please let me know if there are more changes required.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #239: [CASSANDRA-14556] Optimize Streaming

2018-07-25 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/239#discussion_r204998639
  
--- Diff: 
src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java ---
@@ -62,7 +90,23 @@ public CassandraOutgoingFile(StreamOperation operation, 
Ref ref,
 sections,
 sstable.compression ? 
sstable.getCompressionMetadata() : null,
 keepSSTableLevel ? 
sstable.getSSTableLevel() : 0,
-
sstable.header.toComponent());
+
sstable.header.toComponent(), manifest, shouldStreamFullSSTable(),
+sstable.first,
+sstable.metadata().id);
+}
+
+@VisibleForTesting
+public static ComponentManifest getComponentManifest(SSTableReader 
sstable)
+{
+LinkedHashMap components = new 
LinkedHashMap<>(STREAM_COMPONENTS.size());
+for (Component component : STREAM_COMPONENTS)
--- End diff --

@iamaleksey would it be worth having a list of required components here? 
Any one of the missing components will be a fatal? For example, when I reset 
the level on the receiving side, I expect stats to exist and is sent over.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #239: [CASSANDRA-14556] Optimize Streaming

2018-07-24 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/239#discussion_r204955316
  
--- Diff: 
src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java ---
@@ -114,13 +155,51 @@ public void write(StreamSession session, 
DataOutputStreamPlus out, int version)
 CassandraStreamHeader.serializer.serialize(header, out, version);
 out.flush();
 
-CassandraStreamWriter writer = header.compressionInfo == null ?
-   new CassandraStreamWriter(sstable, 
header.sections, session) :
-   new 
CompressedCassandraStreamWriter(sstable, header.sections,
-   
header.compressionInfo, session);
+IStreamWriter writer;
+if (shouldStreamFullSSTable())
+{
+writer = new CassandraBlockStreamWriter(sstable, session, 
components);
+}
+else
+{
+writer = (header.compressionInfo == null) ?
+ new CassandraStreamWriter(sstable, header.sections, 
session) :
+ new CompressedCassandraStreamWriter(sstable, 
header.sections,
+ 
header.compressionInfo, session);
+}
 writer.write(out);
 }
 
+@VisibleForTesting
+public boolean shouldStreamFullSSTable()
+{
+return isFullSSTableTransfersEnabled && isFullyContained;
+}
+
+@VisibleForTesting
+public boolean fullyContainedIn(List> normalizedRanges, 
SSTableReader sstable)
+{
+if (normalizedRanges == null)
+return false;
+
+RangeOwnHelper rangeOwnHelper = new 
RangeOwnHelper(normalizedRanges);
+try (KeyIterator iter = new KeyIterator(sstable.descriptor, 
sstable.metadata()))
+{
+while (iter.hasNext())
+{
+DecoratedKey key = iter.next();
+try
+{
+rangeOwnHelper.check(key);
+} catch(RuntimeException e)
--- End diff --

I will disable Zero Copy streaming for STCS and I've created 
[CASSANDRA-14586](https://issues.apache.org/jira/browse/CASSANDRA-14586) to 
further discuss fixes.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #239: [CASSANDRA-14556] Optimize Streaming

2018-07-24 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/239#discussion_r204955090
  
--- Diff: src/java/org/apache/cassandra/db/streaming/ComponentInfo.java ---
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.streaming;
+
+import java.io.IOException;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+
+public class ComponentInfo
--- End diff --

I have refactored this to `ComponentManifest`


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #239: [CASSANDRA-14556] Optimize Streaming

2018-07-24 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/239#discussion_r204955043
  
--- Diff: src/java/org/apache/cassandra/db/lifecycle/LogFile.java ---
@@ -66,7 +66,7 @@
 private final LogReplicaSet replicas = new LogReplicaSet();
 
 // The transaction records, this set must be ORDER PRESERVING
-private final LinkedHashSet records = new LinkedHashSet<>();
+private final Set records = Collections.synchronizedSet(new 
LinkedHashSet<>()); // TODO: Hack until we fix CASSANDRA-14554
--- End diff --

I should've worded the comment differently but it is a hack to avoid a 
`ConcurrentModificationException` 
([CASSANDRA-14554](https://issues.apache.org/jira/browse/CASSANDRA-14554)). 
This currently exists in trunk and was not introduced as part of this PR hence 
the separate ticket.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #239: [CASSANDRA-14556] Optimize Streaming

2018-07-24 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/239#discussion_r204954805
  
--- Diff: 
src/java/org/apache/cassandra/db/streaming/CassandraBlockStreamReader.java ---
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.streaming;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+
+import com.google.common.base.Throwables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.SerializationHeader;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTableMultiWriter;
+import org.apache.cassandra.io.sstable.format.SSTableFormat;
+import org.apache.cassandra.io.sstable.format.Version;
+import org.apache.cassandra.io.sstable.format.big.BigTableBlockWriter;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.streaming.ProgressInfo;
+import org.apache.cassandra.streaming.StreamReceiver;
+import org.apache.cassandra.streaming.StreamSession;
+import org.apache.cassandra.streaming.messages.StreamMessageHeader;
+import org.apache.cassandra.utils.Collectors3;
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+ * CassandraBlockStreamReader reads SSTable off the wire and writes it to 
disk.
+ */
+public class CassandraBlockStreamReader implements IStreamReader
+{
+private static final Logger logger = 
LoggerFactory.getLogger(CassandraBlockStreamReader.class);
+protected final TableId tableId;
+protected final StreamSession session;
+protected final int sstableLevel;
--- End diff --

Cleaned up the protected fields. I still need to reset the level.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #239: [CASSANDRA-14556] Optimize Streaming

2018-07-24 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/239#discussion_r204954749
  
--- Diff: src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---
@@ -2260,6 +2260,20 @@ public static int getStreamingConnectionsPerHost()
 return conf.streaming_connections_per_host;
 }
 
+public static boolean isFullSSTableTransfersEnabled()
+{
+if (conf.server_encryption_options.enabled || 
conf.server_encryption_options.optional)
+{
+logger.debug("Internode encryption enabled. Disabling zero 
copy SSTable transfers for streaming.");
--- End diff --

Done


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #239: [CASSANDRA-14556] Optimize Streaming

2018-07-24 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/239#discussion_r204954729
  
--- Diff: src/java/org/apache/cassandra/db/streaming/ComponentInfo.java ---
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.streaming;
+
+import java.io.IOException;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+
+public class ComponentInfo
+{
+final Component.Type type;
+final long length;
+
+public ComponentInfo(Component.Type type, long length)
+{
+assert length >= 0 : "Component length cannot be negative";
+this.type = type;
+this.length = length;
+}
+
+@Override
+public String toString()
+{
+return "ComponentInfo{" +
+   "type=" + type +
+   ", length=" + length +
+   '}';
+}
+
+public boolean equals(Object o)
--- End diff --

`CassandraStreamHeader`'s `equals()` and `hashCode()` method are not dead 
code. They're used in `CassandraStreamHeaderTest::serializerTest` which 
eventually ends up in 
[`SerializationUtils::assertSerializationCycle`](https://github.com/apache/cassandra/blob/9714a7c817b64a3358f69e536535c756c5c6df48/test/unit/org/apache/cassandra/serializers/SerializationUtils.java#L60).
 This is what originally forced me to implement those methods. The only other 
choice was to test the equality for `ComponentInfo` in 
`CassandraStreamHeader::equal()` which would have been a bad choice.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra issue #239: [CASSANDRA-14556] Optimize Streaming

2018-07-24 Thread dineshjoshi
Github user dineshjoshi commented on the issue:

https://github.com/apache/cassandra/pull/239
  
@iamaleksey I have resolved most of your comments. I still have a couple to 
go. I will update this PR when I am done with those and get a clean dtest run.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



  1   2   >