[GitHub] cassandra-dtest pull request #41: 14421
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra-dtest/pull/41#discussion_r245848691 --- Diff: upgrade_tests/thrift_upgrade_test.py --- @@ -350,29 +469,41 @@ def test_sparse_supercolumn_with_renames(self): client.transport.open() client.set_keyspace('ks') -cf = _create_sparse_super_cf('sparse_super_1') -client.system_add_column_family(cf) +_create_sparse_super_cf(client, 'sparse_super_1') + +#The alter after was failing claiming the column family didn't exist. +#This test is so slow and it's Thrift and going away "soon" and we +#Don't run it on every commit so just sleep +import time +time.sleep(5) --- End diff -- Do we want to sleep for a fixed duration or keep retrying patiently until it succeeds? --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra-dtest pull request #41: 14421
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra-dtest/pull/41#discussion_r245846732 --- Diff: requirements.txt --- @@ -1,6 +1,6 @@ -e git+https://github.com/datastax/python-driver.git@cassandra-test#egg=cassandra-driver # Used ccm version is tracked by cassandra-test branch in ccm repo. Please create a PR there for fixes or upgrades to new releases. --e git+https://github.com/riptano/ccm.git@cassandra-test#egg=ccm +-e git+https://github.com/aweisberg/ccm.git@cassandra-test#egg=ccm --- End diff -- Could you ensure that you change this when outstanding changes in upstream `ccm` are merged? --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra-dtest pull request #41: 14421
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra-dtest/pull/41#discussion_r245848315 --- Diff: upgrade_tests/thrift_upgrade_test.py --- @@ -10,99 +10,182 @@ ColumnParent, ConsistencyLevel, SlicePredicate, SliceRange) from thrift_test import _i64, get_thrift_client -from tools.assertions import assert_length_equal +from tools.assertions import assert_length_equal, assert_lists_of_dicts_equal +from tools.misc import wait_for_agreement, add_skip from .upgrade_base import UpgradeTester from .upgrade_manifest import build_upgrade_pairs since = pytest.mark.since logger = logging.getLogger(__name__) -def _create_dense_super_cf(name): -return Cassandra.CfDef('ks', name, column_type='Super', +def _create_dense_super_cf(thrift, name): +cfdef = Cassandra.CfDef('ks', name, column_type='Super', key_validation_class='AsciiType',# pk comparator_type='AsciiType', # ck default_validation_class='AsciiType',# SC value subcomparator_type='LongType') # SC key +thrift.system_add_column_family(cfdef) +wait_for_agreement(thrift) -def _create_sparse_super_cf(name): -cd1 = ColumnDef('col1', 'LongType', None, None) -cd2 = ColumnDef('col2', 'LongType', None, None) -return Cassandra.CfDef('ks', name, column_type='Super', +def _create_sparse_super_cf(thrift, name): +cd1 = ColumnDef('col1'.encode(), 'LongType', None, None) +cd2 = ColumnDef('col2'.encode(), 'LongType', None, None) +cfdef = Cassandra.CfDef('ks', name, column_type='Super', column_metadata=[cd1, cd2], key_validation_class='AsciiType', comparator_type='AsciiType', subcomparator_type='AsciiType') +thrift.system_add_column_family(cfdef) +wait_for_agreement(thrift) -def _validate_sparse_cql(cursor, cf='sparse_super_1', column1='column1', col1='col1', col2='col2', key='key'): +def unpack(list): --- End diff -- `list` is a Python keyword - https://docs.python.org/3/tutorial/datastructures.html#more-on-lists You should not use `list` as a variable name. Although Python doesn't complain it's not a good practice. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra-dtest pull request #41: 14421
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra-dtest/pull/41#discussion_r245846129 --- Diff: conftest.py --- @@ -409,13 +429,16 @@ def pytest_collection_modifyitems(items, config): This function is called upon during the pytest test collection phase and allows for modification of the test items within the list """ -if not config.getoption("--collect-only") and config.getoption("--cassandra-dir") is None: -if config.getoption("--cassandra-version") is None: +collect_only = config.getoption("--collect-only") +cassandra_dir = config.getoption("--cassandra-dir") +cassandra_version = config.getoption("--cassandra-version") +if not collect_only and cassandra_dir is None: +if cassandra_version is None: --- End diff -- Nit: Extra space `if cassandra_version`. Please fix on commit. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra-dtest pull request #41: 14421
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra-dtest/pull/41#discussion_r245848375 --- Diff: upgrade_tests/thrift_upgrade_test.py --- @@ -10,99 +10,182 @@ ColumnParent, ConsistencyLevel, SlicePredicate, SliceRange) from thrift_test import _i64, get_thrift_client -from tools.assertions import assert_length_equal +from tools.assertions import assert_length_equal, assert_lists_of_dicts_equal +from tools.misc import wait_for_agreement, add_skip from .upgrade_base import UpgradeTester from .upgrade_manifest import build_upgrade_pairs since = pytest.mark.since logger = logging.getLogger(__name__) -def _create_dense_super_cf(name): -return Cassandra.CfDef('ks', name, column_type='Super', +def _create_dense_super_cf(thrift, name): +cfdef = Cassandra.CfDef('ks', name, column_type='Super', key_validation_class='AsciiType',# pk comparator_type='AsciiType', # ck default_validation_class='AsciiType',# SC value subcomparator_type='LongType') # SC key +thrift.system_add_column_family(cfdef) +wait_for_agreement(thrift) -def _create_sparse_super_cf(name): -cd1 = ColumnDef('col1', 'LongType', None, None) -cd2 = ColumnDef('col2', 'LongType', None, None) -return Cassandra.CfDef('ks', name, column_type='Super', +def _create_sparse_super_cf(thrift, name): +cd1 = ColumnDef('col1'.encode(), 'LongType', None, None) +cd2 = ColumnDef('col2'.encode(), 'LongType', None, None) +cfdef = Cassandra.CfDef('ks', name, column_type='Super', column_metadata=[cd1, cd2], key_validation_class='AsciiType', comparator_type='AsciiType', subcomparator_type='AsciiType') +thrift.system_add_column_family(cfdef) +wait_for_agreement(thrift) -def _validate_sparse_cql(cursor, cf='sparse_super_1', column1='column1', col1='col1', col2='col2', key='key'): +def unpack(list): +result_list = [] +for item_dict in list: +normalized_dict = {} +for key, value in item_dict.items(): +if hasattr(value, "items"): +assert(key == '') +for a, b in value.items(): +normalized_dict[a] = b +else: +normalized_dict[key] = value +result_list.append(normalized_dict) +return result_list + + +def add_value(list): --- End diff -- Please rename `list` variable. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra-dtest pull request #41: 14421
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra-dtest/pull/41#discussion_r245846879 --- Diff: sstable_generation_loading_test.py --- @@ -27,45 +28,68 @@ def fixture_add_additional_log_patterns(self, fixture_dtest_setup): fixture_dtest_setup.allow_log_errors = True upgrade_from = None -compact = False +test_compact = False + +def compact(self): +return self.fixture_dtest_setup.cluster.version() < MAJOR_VERSION_4 and self.test_compact def create_schema(self, session, ks, compression): create_ks(session, ks, rf=2) -create_cf(session, "standard1", compression=compression, compact_storage=self.compact) +create_cf(session, "standard1", compression=compression, compact_storage=self.compact()) create_cf(session, "counter1", compression=compression, columns={'v': 'counter'}, - compact_storage=self.compact) + compact_storage=self.compact()) def test_sstableloader_compression_none_to_none(self): +if self.__class__.__name__ != 'TestBasedSSTableLoader' and self.upgrade_from is None: --- End diff -- Since these two lines are redundant. Perhaps refactor this into a small method and reuse? --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra-dtest pull request #41: 14421
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra-dtest/pull/41#discussion_r243218570 --- Diff: upgrade_tests/upgrade_through_versions_test.py --- @@ -794,31 +800,33 @@ def create_upgrade_class(clsname, version_metas, protocol_version, MultiUpgrade = namedtuple('MultiUpgrade', ('name', 'version_metas', 'protocol_version', 'extra_config')) MULTI_UPGRADES = ( -# Proto v3 upgrades (v3 is supported on 2.1, 2.2, 3.0, 3.1, trunk) -MultiUpgrade(name='TestProtoV3Upgrade_AllVersions_EndsAt_Trunk_HEAD', - version_metas=[current_2_1_x, current_2_2_x, current_3_0_x, indev_3_x], protocol_version=3, extra_config=None), - MultiUpgrade(name='TestProtoV3Upgrade_AllVersions_RandomPartitioner_EndsAt_Trunk_HEAD', - version_metas=[current_2_1_x, current_2_2_x, current_3_0_x, indev_3_x], protocol_version=3, +# Proto v3 upgrades (v3 is supported on 2.1, 2.2, 3.0, 3.11) +MultiUpgrade(name='TestProtoV3Upgrade_AllVersions_EndsAt_3_11_X', + version_metas=[current_2_1_x, current_2_2_x, current_3_0_x, indev_3_11_x], protocol_version=3, extra_config=None), + MultiUpgrade(name='TestProtoV3Upgrade_AllVersions_RandomPartitioner_EndsAt_3_11_X_HEAD', + version_metas=[current_2_1_x, current_2_2_x, current_3_0_x, indev_3_11_x], protocol_version=3, extra_config=( ('partitioner', 'org.apache.cassandra.dht.RandomPartitioner'), )), # Proto v4 upgrades (v4 is supported on 2.2, 3.0, 3.1, trunk) MultiUpgrade(name='TestProtoV4Upgrade_AllVersions_EndsAt_Trunk_HEAD', - version_metas=[current_2_2_x, current_3_0_x, indev_3_x, ], protocol_version=4, extra_config=None), + version_metas=[current_2_2_x, current_3_0_x, current_3_11_x, indev_trunk], protocol_version=4, extra_config=None), MultiUpgrade(name='TestProtoV4Upgrade_AllVersions_RandomPartitioner_EndsAt_Trunk_HEAD', - version_metas=[current_2_2_x, current_3_0_x, indev_3_x], protocol_version=4, + version_metas=[current_2_2_x, current_3_0_x, current_3_11_x, indev_trunk], protocol_version=4, extra_config=( ('partitioner', 'org.apache.cassandra.dht.RandomPartitioner'), )), +#Beta versions don't work with this test since it doesn't specify use beta in the client +#It's fine I guess for now? Can update on release # Proto v5 upgrades (v5 is supported on 3.0, 3.11, trunk) -MultiUpgrade(name='TestProtoV5Upgrade_AllVersions_EndsAt_Trunk_HEAD', - version_metas=[current_3_0_x, current_3_x, indev_trunk], protocol_version=5, extra_config=None), - MultiUpgrade(name='TestProtoV5Upgrade_AllVersions_RandomPartitioner_EndsAt_Trunk_HEAD', - version_metas=[current_3_0_x, current_3_x, indev_trunk], protocol_version=5, - extra_config=( - ('partitioner', 'org.apache.cassandra.dht.RandomPartitioner'), - )), +# MultiUpgrade(name='TestProtoV5Upgrade_AllVersions_EndsAt_Trunk_HEAD', --- End diff -- Do we want to leave commented code in? It is ok if this is transient and we'll eventually remove it. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #292: provide ZSTD compression support to cassandra (...
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/292#discussion_r241232030 --- Diff: src/java/org/apache/cassandra/io/compress/ZSTDCompressor.java --- @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io.compress; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import com.google.common.annotations.VisibleForTesting; +import com.github.luben.zstd.Zstd; + +public class ZSTDCompressor implements ICompressor +{ +public static final int FAST_COMPRESSION = 1; // fastest compression time +public static final int DEFAULT_LEVEL = 3; +public static final int BEST_COMPRESSION = 22;// very good compression ratio +private static final ZSTDCompressor instance = new ZSTDCompressor(); +private static final String COMPRESSION_LEVEL_OPTION_NAME = "compression_level"; +@VisibleForTesting +protected static final int compressionLevel = DEFAULT_LEVEL; + +public static ZSTDCompressor create(Map compressionOptions) +{ + validateCompressionLevel(parseCompressionLevelOption(compressionOptions)); +return instance; --- End diff -- From my brief reading of the JNI Bindings it seems that zstd-jni uses `ZSTD_compress` and not `ZSTD_compressCCtx`. The latter allows you to pass in a context which would've been more efficient. For now, I think it would be ok to simply pass in the compression level. The only value of keeping multiple compressor objects around is to retain the `CompressionParams`. It would be useful to have a caching factory to avoid creating multiple objects with the same compression params. As far as thread safety goes, the JNI Code looks thread safe specifically compress and decompress methods. As pointed out earlier they create and destroy the Compression Context on each invocation which is memory unfriendly. So, although the zstd benchmarks may look great, I am not so sure about this JNI binding. We should definitely add a JMH Benchmark for zstd. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra-dtest pull request #41: 14421
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra-dtest/pull/41#discussion_r240824209 --- Diff: sstable_generation_loading_test.py --- @@ -141,6 +166,16 @@ def load_sstable_with_configuration(self, pre_compression=None, post_compression session.execute("UPDATE standard1 SET v='{}' WHERE KEY='{}' AND c='col'".format(i, i)) session.execute("UPDATE counter1 SET v=v+1 WHERE KEY='{}'".format(i)) +#Will upgrade to a version that doesn't support compact storage so revert the compact +#storage, this doesn't actually fix it yet +if self.compact() and default_install_version >= '4': --- End diff -- Use `LooseVersion` here as well? Prefer constants over magic strings / numbers. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra-dtest pull request #41: 14421
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra-dtest/pull/41#discussion_r240827199 --- Diff: upgrade_tests/paging_test.py --- @@ -503,22 +513,31 @@ def test_basic_compound_paging(self): ); """) -cursor.execute(""" -CREATE TABLE test2 ( -k int, -c1 int, -c2 int, -v text, -PRIMARY KEY (k, c1, c2) -) WITH COMPACT STORAGE; -""") +testing_compact_storage = self.cluster.version() < '4.0' +if testing_compact_storage: +cursor.execute(""" +CREATE TABLE test2 ( +k int, +c1 int, +c2 int, +v text, +PRIMARY KEY (k, c1, c2) +) WITH COMPACT STORAGE; +""") + +version_string = self.upgrade_version_string() +#4.0 doesn't support compact storage +if version_string == 'trunk' or version_string >= '4.0': --- End diff -- Use {{LooseVersion}} here as well? --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra-dtest pull request #41: 14421
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra-dtest/pull/41#discussion_r240827110 --- Diff: upgrade_tests/paging_test.py --- @@ -457,21 +458,30 @@ def test_basic_paging(self): ); """) -cursor.execute(""" -CREATE TABLE test2 ( -k int, -c int, -v text, -PRIMARY KEY (k, c) -) WITH COMPACT STORAGE; -""") +testing_compact_storage = self.cluster.version() < '4.0' --- End diff -- Use {{LooseVersion}} here as well? --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra-dtest pull request #41: 14421
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra-dtest/pull/41#discussion_r240823234 --- Diff: dtest_setup.py --- @@ -454,22 +463,14 @@ def maybe_setup_jacoco(self, cluster_name='test'): @staticmethod def create_ccm_cluster(dtest_setup): -logger.debug("cluster ccm directory: " + dtest_setup.test_path) +logger.info("cluster ccm directory: " + dtest_setup.test_path) --- End diff -- Does this need to be info level? --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra-dtest pull request #41: 14421
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra-dtest/pull/41#discussion_r240827177 --- Diff: upgrade_tests/paging_test.py --- @@ -503,22 +513,31 @@ def test_basic_compound_paging(self): ); """) -cursor.execute(""" -CREATE TABLE test2 ( -k int, -c1 int, -c2 int, -v text, -PRIMARY KEY (k, c1, c2) -) WITH COMPACT STORAGE; -""") +testing_compact_storage = self.cluster.version() < '4.0' --- End diff -- Use {{LooseVersion}} here as well? --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra-dtest pull request #41: 14421
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra-dtest/pull/41#discussion_r240823966 --- Diff: sstable_generation_loading_test.py --- @@ -27,45 +27,68 @@ def fixture_add_additional_log_patterns(self, fixture_dtest_setup): fixture_dtest_setup.allow_log_errors = True upgrade_from = None -compact = False +test_compact = False + +def compact(self): +return self.fixture_dtest_setup.cluster.version() < '4' and self.test_compact --- End diff -- Use `LooseVersion` instead of a string? --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra-dtest pull request #41: 14421
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra-dtest/pull/41#discussion_r240828717 --- Diff: upgrade_tests/upgrade_manifest.py --- @@ -70,22 +89,19 @@ def clone_with_local_env_version(self): return self -indev_2_0_x = None # None if release not likely -current_2_0_x = VersionMeta(name='current_2_0_x', family='2.0.x', variant='current', version='2.0.17', min_proto_v=1, max_proto_v=2, java_versions=(7,)) - indev_2_1_x = VersionMeta(name='indev_2_1_x', family='2.1.x', variant='indev', version='github:apache/cassandra-2.1', min_proto_v=1, max_proto_v=3, java_versions=(7, 8)) -current_2_1_x = VersionMeta(name='current_2_1_x', family='2.1.x', variant='current', version='2.1.17', min_proto_v=1, max_proto_v=3, java_versions=(7, 8)) +current_2_1_x = VersionMeta(name='current_2_1_x', family='2.1.x', variant='current', version='2.1.20', min_proto_v=1, max_proto_v=3, java_versions=(7, 8)) indev_2_2_x = VersionMeta(name='indev_2_2_x', family='2.2.x', variant='indev', version='github:apache/cassandra-2.2', min_proto_v=1, max_proto_v=4, java_versions=(7, 8)) -current_2_2_x = VersionMeta(name='current_2_2_x', family='2.2.x', variant='current', version='2.2.9', min_proto_v=1, max_proto_v=4, java_versions=(7, 8)) +current_2_2_x = VersionMeta(name='current_2_2_x', family='2.2.x', variant='current', version='2.2.13', min_proto_v=1, max_proto_v=4, java_versions=(7, 8)) -indev_3_0_x = VersionMeta(name='indev_3_0_x', family='3.0.x', variant='indev', version='github:apache/cassandra-3.0', min_proto_v=3, max_proto_v=4, java_versions=(8,)) -current_3_0_x = VersionMeta(name='current_3_0_x', family='3.0.x', variant='current', version='3.0.12', min_proto_v=3, max_proto_v=4, java_versions=(8,)) +indev_3_0_x = VersionMeta(name='indev_3_0_x', family='3.0.x', variant='indev', version='github:aweisberg/cassandra-3.0', min_proto_v=3, max_proto_v=4, java_versions=(8,)) --- End diff -- This needs to be reverted? --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #290: 14841 trunk
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/290#discussion_r229892479 --- Diff: src/java/org/apache/cassandra/gms/Gossiper.java --- @@ -136,6 +139,38 @@ private volatile long lastProcessedMessageAt = System.currentTimeMillis(); +private boolean haveMajorVersion3Nodes = true; + +final com.google.common.base.Supplier haveMajorVersion3NodesSupplier = () -> +{ +//Once there are no prior version nodes we don't need to keep rechecking +if (!haveMajorVersion3Nodes) +return false; + +Iterable allHosts = Iterables.concat(Gossiper.instance.getLiveMembers(), Gossiper.instance.getUnreachableMembers()); +CassandraVersion referenceVersion = null; + +for (InetAddressAndPort host : allHosts) +{ +CassandraVersion version = getReleaseVersion(host); + +//Raced with changes to gossip state +if (version == null) +continue; + +if (referenceVersion == null) +referenceVersion = version; + +if (version.major < 4) --- End diff -- This will capture nodes with major version 2 as well. If that is ok, we should probably rename variables to reflect that. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra issue #279: CASSANDRA-14790 Fix flaky LongBufferPoolTest
Github user dineshjoshi commented on the issue: https://github.com/apache/cassandra/pull/279 Hi @jonmeredith, the changes look good. I just had a minor comment but other than that, I think we're good. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #279: CASSANDRA-14790 Fix flaky LongBufferPoolTest
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/279#discussion_r227581961 --- Diff: test/burn/org/apache/cassandra/utils/memory/LongBufferPoolTest.java --- @@ -36,9 +36,34 @@ import static org.junit.Assert.*; +/** + * Long BufferPool test - make sure that the BufferPool allocates and recycles + * ByteBuffers under heavy concurrent usage. + * + * The test creates two groups of threads + * + * - the burn producer/consumer pair that allocates 1/10 poolSize and then returns + * all the memory to the pool. 50% is freed by the producer, 50% passed to the consumer thread. + * + * - a ring of worker threads that allocate buffers and either immediately free them, + * or pass to the next worker thread for it to be freed on it's behalf. Periodically + * all memory is freed by the thread. + * + * While the burn/worker threads run, the original main thread checks that all of the threads are still + * making progress every 10s (no locking issues, or exits from assertion failures), + * and that every chunk has been freed at least once during the previous cycle (if that was possible). + * + * The test does not expect to survive out-of-memory errors, so needs sufficient heap memory + * for non-direct buffers and the debug tracking objects that check the allocate buffers. + * (The timing is very interesting when Xmx is lowered to increase garbage collection pauses, but do + * not set it too low). + */ public class LongBufferPoolTest { private static final Logger logger = LoggerFactory.getLogger(LongBufferPoolTest.class); +final int avgBufferSize = 16 << 10; --- End diff -- Constants are typically uppercased with underscores as separators eg. `AVG_BUFFER_SIZE`. Do you also need this to be package private? It can be made private and static. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #279: CASSANDRA-14790 Fix flaky LongBufferPoolTest
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/279#discussion_r226129933 --- Diff: test/burn/org/apache/cassandra/utils/memory/LongBufferPoolTest.java --- @@ -327,24 +371,35 @@ BufferCheck sample() return checks.get(index); } - -private int sum1toN(int n) -{ -return (n * (n + 1)) / 2; -} })); } -boolean first = true; while (!latch.await(10L, TimeUnit.SECONDS)) { -if (!first) -BufferPool.assertAllRecycled(); -first = false; +int stalledThreads = 0; +int doneThreads = 0; + for (AtomicBoolean progress : makingProgress) { -assert progress.get(); -progress.set(false); +if (progress.getAndSet(false) == false) --- End diff -- ```suggestion if (!progress.getAndSet(false)) ``` --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #279: CASSANDRA-14790 Fix flaky LongBufferPoolTest
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/279#discussion_r226129803 --- Diff: test/burn/org/apache/cassandra/utils/memory/LongBufferPoolTest.java --- @@ -181,7 +222,7 @@ void checkpoint() void testOne() throws Exception { -long currentTargetSize = rand.nextInt(poolSize / 1024) == 0 ? 0 : targetSize; +long currentTargetSize = (rand.nextInt(poolSize / 1024) == 0 || freedAllMemory[threadIdx].get() == false) ? 0 : targetSize; --- End diff -- ```suggestion long currentTargetSize = (rand.nextInt(poolSize / 1024) == 0 || !freedAllMemory[threadIdx].get()) ? 0 : targetSize; ``` This is the canonical way in Java. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #279: CASSANDRA-14790 Fix flaky LongBufferPoolTest
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/279#discussion_r225404206 --- Diff: test/burn/org/apache/cassandra/utils/memory/LongBufferPoolTest.java --- @@ -327,24 +373,41 @@ BufferCheck sample() return checks.get(index); } - -private int sum1toN(int n) -{ -return (n * (n + 1)) / 2; -} })); } -boolean first = true; while (!latch.await(10L, TimeUnit.SECONDS)) { -if (!first) -BufferPool.assertAllRecycled(); -first = false; +int stalledThreads = 0; +int doneThreads = 0; + for (AtomicBoolean progress : makingProgress) { -assert progress.get(); -progress.set(false); +if (progress.getAndSet(false) == false) +stalledThreads++; +} + +for (Future r : ret) +{ +if (r.isDone()) +doneThreads++; +} +if (doneThreads == 0) // If any threads have completed, they will stop making progress/recycling buffers. +{ // Assertions failures on the threads will be caught below. +assert stalledThreads == 0; +boolean allFreed = burnFreed.getAndSet(false); +for (AtomicBoolean freedMemory : freedAllMemory) +{ +allFreed = allFreed && freedMemory.getAndSet(false); +} +if (allFreed) +{ --- End diff -- According to the code style, you can skip braces here. Reference: http://cassandra.apache.org/doc/latest/development/code_style.html --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #279: CASSANDRA-14790 Fix flaky LongBufferPoolTest
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/279#discussion_r225404225 --- Diff: test/burn/org/apache/cassandra/utils/memory/LongBufferPoolTest.java --- @@ -327,24 +373,41 @@ BufferCheck sample() return checks.get(index); } - -private int sum1toN(int n) -{ -return (n * (n + 1)) / 2; -} })); } -boolean first = true; while (!latch.await(10L, TimeUnit.SECONDS)) { -if (!first) -BufferPool.assertAllRecycled(); -first = false; +int stalledThreads = 0; +int doneThreads = 0; + for (AtomicBoolean progress : makingProgress) { -assert progress.get(); -progress.set(false); +if (progress.getAndSet(false) == false) +stalledThreads++; +} + +for (Future r : ret) +{ +if (r.isDone()) +doneThreads++; +} +if (doneThreads == 0) // If any threads have completed, they will stop making progress/recycling buffers. +{ // Assertions failures on the threads will be caught below. +assert stalledThreads == 0; +boolean allFreed = burnFreed.getAndSet(false); +for (AtomicBoolean freedMemory : freedAllMemory) +{ +allFreed = allFreed && freedMemory.getAndSet(false); +} +if (allFreed) +{ +BufferPool.assertAllRecycled(); +} +else +{ --- End diff -- According to the code style, you can skip braces here. Reference: http://cassandra.apache.org/doc/latest/development/code_style.html --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #279: CASSANDRA-14790 Fix flaky LongBufferPoolTest
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/279#discussion_r225404189 --- Diff: test/burn/org/apache/cassandra/utils/memory/LongBufferPoolTest.java --- @@ -327,24 +373,41 @@ BufferCheck sample() return checks.get(index); } - -private int sum1toN(int n) -{ -return (n * (n + 1)) / 2; -} })); } -boolean first = true; while (!latch.await(10L, TimeUnit.SECONDS)) { -if (!first) -BufferPool.assertAllRecycled(); -first = false; +int stalledThreads = 0; +int doneThreads = 0; + for (AtomicBoolean progress : makingProgress) { -assert progress.get(); -progress.set(false); +if (progress.getAndSet(false) == false) +stalledThreads++; +} + +for (Future r : ret) +{ +if (r.isDone()) +doneThreads++; +} +if (doneThreads == 0) // If any threads have completed, they will stop making progress/recycling buffers. +{ // Assertions failures on the threads will be caught below. +assert stalledThreads == 0; +boolean allFreed = burnFreed.getAndSet(false); +for (AtomicBoolean freedMemory : freedAllMemory) +{ --- End diff -- According to the code style, you can skip braces here. Reference: http://cassandra.apache.org/doc/latest/development/code_style.html --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #279: CASSANDRA-14790 Fix flaky LongBufferPoolTest
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/279#discussion_r225403619 --- Diff: test/burn/org/apache/cassandra/utils/memory/LongBufferPoolTest.java --- @@ -88,37 +110,51 @@ public void testAllocate(int threadCount, long duration, int poolSize) throws In final CountDownLatch latch = new CountDownLatch(threadCount); final SPSCQueue[] sharedRecycle = new SPSCQueue[threadCount]; final AtomicBoolean[] makingProgress = new AtomicBoolean[threadCount]; +AtomicBoolean burnFreed = new AtomicBoolean(false); --- End diff -- This can be made `final`. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #253: 13630
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/253#discussion_r216088059 --- Diff: src/java/org/apache/cassandra/net/MessageIn.java --- @@ -231,4 +241,437 @@ public String toString() sbuf.append("FROM:").append(from).append(" TYPE:").append(getMessageType()).append(" VERB:").append(verb); return sbuf.toString(); } + +public static MessageInProcessor getProcessor(InetAddressAndPort peer, int messagingVersion) +{ +return getProcessor(peer, messagingVersion, MessageInProcessor.MESSAGING_SERVICE_CONSUMER); + +} + +public static MessageInProcessor getProcessor(InetAddressAndPort peer, int messagingVersion, BiConsumer messageConsumer) +{ +return messagingVersion >= MessagingService.VERSION_40 + ? new MessageInProcessorAsOf40(peer, messagingVersion, messageConsumer) + : new MessageInProcessorPre40(peer, messagingVersion, messageConsumer); + +} + +/** + * Implementations contain the mechanics and logic of parsing incoming messages. Allows for both non-blocking + * and blocking styles of interaction via the {@link #process(ByteBuf)} and {@link #process(RebufferingByteBufDataInputPlus)} + * methods, respectively. + * + * Does not contain the actual deserialization code for message fields nor payload. That is left to the + * {@link MessageIn#read(DataInputPlus, int, int)} family of methods. + */ +public static abstract class MessageInProcessor +{ +/** + * The current state of deserializing an incoming message. This enum is only used in the nonblocking versions. + */ +public enum State +{ +READ_PREFIX, +READ_IP_ADDRESS, +READ_VERB, +READ_PARAMETERS_SIZE, +READ_PARAMETERS_DATA, +READ_PAYLOAD_SIZE, +READ_PAYLOAD +} + +static final int VERB_LENGTH = Integer.BYTES; + +/** + * The default target for consuming deserialized {@link MessageIn}. + */ +private static final BiConsumer MESSAGING_SERVICE_CONSUMER = (messageIn, id) -> MessagingService.instance().receive(messageIn, id); + +final InetAddressAndPort peer; +final int messagingVersion; + +/** + * Abstracts out depending directly on {@link MessagingService#receive(MessageIn, int)}; this makes tests more sane + * as they don't require nor trigger the entire message processing circus. + */ +final BiConsumer messageConsumer; + +/** + * Captures the current {@link State} of processing a message. Primarily useful in the non-blocking use case. + */ +State state = State.READ_PREFIX; + +/** + * Captures the current data we've parsed out of in incoming message. Primarily useful in the non-blocking use case. + */ +MessageHeader messageHeader; + +/** + * Process the buffer in a non-blocking manner. Will try to read out as much of a message(s) as possible, + * and send any fully deserialized messages to {@link #messageConsumer}. + */ +public abstract void process(ByteBuf in) throws IOException; + +/** + * Process the buffer in a blocking manner. Will read as many messages as possible, blocking for more data, + * and send any fully deserialized messages to {@link #messageConsumer}. + */ +public abstract void process(RebufferingByteBufDataInputPlus in) throws IOException; + +MessageInProcessor(InetAddressAndPort peer, int messagingVersion, BiConsumer messageConsumer) +{ +this.peer = peer; +this.messagingVersion = messagingVersion; +this.messageConsumer = messageConsumer; +} + +/** + * Only applicable in the non-blocking use case, and should ony be used for testing!!! + */ +@VisibleForTesting +public MessageHeader getMessageHeader() +{ +return messageHeader; +} + +/** + * A simple struct to hold the message header data as it is being built up. + */ +public static class MessageHeader +{ +public int messageId; +long constructionTime; +public InetAddressAndPort from; +public MessagingService.Verb verb; +int payloadSize;
[GitHub] cassandra pull request #253: 13630
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/253#discussion_r216103691 --- Diff: src/java/org/apache/cassandra/net/async/MessageInHandler.java --- @@ -18,143 +18,296 @@ package org.apache.cassandra.net.async; -import java.io.DataInputStream; +import java.io.EOFException; import java.io.IOException; -import java.util.Collections; -import java.util.EnumMap; -import java.util.List; -import java.util.Map; -import java.util.function.BiConsumer; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; -import com.google.common.primitives.Ints; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; -import org.apache.cassandra.io.util.DataInputBuffer; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.handler.codec.ByteToMessageDecoder; +import io.netty.util.ReferenceCountUtil; +import org.apache.cassandra.concurrent.NamedThreadFactory; +import org.apache.cassandra.exceptions.UnknownTableException; import org.apache.cassandra.locator.InetAddressAndPort; -import org.apache.cassandra.net.MessageIn; -import org.apache.cassandra.net.MessagingService; -import org.apache.cassandra.net.ParameterType; -import org.apache.cassandra.utils.vint.VIntCoding; +import org.apache.cassandra.net.MessageIn.MessageInProcessor; /** * Parses incoming messages as per the 4.0 internode messaging protocol. */ -public class MessageInHandler extends BaseMessageInHandler +public class MessageInHandler extends ChannelInboundHandlerAdapter { public static final Logger logger = LoggerFactory.getLogger(MessageInHandler.class); -private MessageHeader messageHeader; +private final InetAddressAndPort peer; -MessageInHandler(InetAddressAndPort peer, int messagingVersion) +private final BufferHandler bufferHandler; +private volatile boolean closed; + +public MessageInHandler(InetAddressAndPort peer, MessageInProcessor messageProcessor, boolean handlesLargeMessages) +{ +this.peer = peer; + +bufferHandler = handlesLargeMessages +? new BlockingBufferHandler(messageProcessor) +: new NonblockingBufferHandler(messageProcessor); +} + +public void channelRead(ChannelHandlerContext ctx, Object msg) throws IOException +{ +if (!closed) +{ +bufferHandler.channelRead(ctx, (ByteBuf) msg); +} +else +{ +ReferenceCountUtil.release(msg); +ctx.close(); +} +} + +public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) +{ +if (cause instanceof EOFException) +logger.trace("eof reading from socket; closing", cause); +else if (cause instanceof UnknownTableException) +logger.warn("Got message from unknown table while reading from socket; closing", cause); +else if (cause instanceof IOException) +logger.trace("IOException reading from socket; closing", cause); +else +logger.warn("Unexpected exception caught in inbound channel pipeline from " + ctx.channel().remoteAddress(), cause); + +close(); +ctx.close(); +} + +public void channelInactive(ChannelHandlerContext ctx) +{ +logger.trace("received channel closed message for peer {} on local addr {}", ctx.channel().remoteAddress(), ctx.channel().localAddress()); +close(); +ctx.fireChannelInactive(); +} + +void close() { -this (peer, messagingVersion, MESSAGING_SERVICE_CONSUMER); +closed = true; +bufferHandler.close(); } -public MessageInHandler(InetAddressAndPort peer, int messagingVersion, BiConsumer messageConsumer) +boolean isClosed() { -super(peer, messagingVersion, messageConsumer); +return closed; +} -assert messagingVersion >= MessagingService.VERSION_40 : String.format("wrong messaging version for this handler: got %d, but expect %d or higher", - messagingVersion, MessagingService.VERSION_40); -state = State.R
[GitHub] cassandra pull request #253: 13630
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/253#discussion_r216105848 --- Diff: src/java/org/apache/cassandra/net/MessageOut.java --- @@ -180,6 +199,73 @@ public String toString() return sbuf.toString(); } +/** + * The main entry point for sending an internode message to a peer node in the cluster. + */ +public void serialize(DataOutputPlus out, int messagingVersion, OutboundConnectionIdentifier destinationId, int id, long timestampNanos) throws IOException +{ +captureTracingInfo(destinationId); + +out.writeInt(MessagingService.PROTOCOL_MAGIC); +out.writeInt(id); + +// int cast cuts off the high-order half of the timestamp, which we can assume remains +// the same between now and when the recipient reconstructs it. +out.writeInt((int) NanoTimeToCurrentTimeMillis.convert(timestampNanos)); +serialize(out, messagingVersion); +} + +/** + * Record any tracing data, if enabled on this message. + */ +@VisibleForTesting +void captureTracingInfo(OutboundConnectionIdentifier destinationId) +{ +try +{ +UUID sessionId = (UUID)getParameter(ParameterType.TRACE_SESSION); +if (sessionId != null) +{ +TraceState state = Tracing.instance.get(sessionId); +String logMessage = String.format("Sending %s message to %s", verb, destinationId.connectionAddress()); +// session may have already finished; see CASSANDRA-5668 +if (state == null) +{ +Tracing.TraceType traceType = (Tracing.TraceType)getParameter(ParameterType.TRACE_TYPE); +traceType = traceType == null ? Tracing.TraceType.QUERY : traceType; + Tracing.instance.trace(ByteBuffer.wrap(UUIDGen.decompose(sessionId)), logMessage, traceType.getTTL()); +} +else +{ +state.trace(logMessage); +if (verb == MessagingService.Verb.REQUEST_RESPONSE) +Tracing.instance.doneWithNonLocalSession(state); +} +} +} +catch (Exception e) +{ +logger.warn("failed to capture the tracing info for an outbound message to {}, ignoring", destinationId, e); +} +} + +private Object getParameter(ParameterType type) +{ +for (int ii = 0; ii < parameters.size(); ii += PARAMETER_TUPLE_SIZE) +{ +if (((ParameterType)parameters.get(ii + PARAMETER_TUPLE_TYPE_OFFSET)).equals(type)) --- End diff -- Don't need the typecast to `ParameterType` --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #253: 13630
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/253#discussion_r216106796 --- Diff: src/java/org/apache/cassandra/net/MessageOut.java --- @@ -180,6 +199,73 @@ public String toString() return sbuf.toString(); } +/** + * The main entry point for sending an internode message to a peer node in the cluster. + */ +public void serialize(DataOutputPlus out, int messagingVersion, OutboundConnectionIdentifier destinationId, int id, long timestampNanos) throws IOException +{ +captureTracingInfo(destinationId); + +out.writeInt(MessagingService.PROTOCOL_MAGIC); +out.writeInt(id); + +// int cast cuts off the high-order half of the timestamp, which we can assume remains +// the same between now and when the recipient reconstructs it. +out.writeInt((int) NanoTimeToCurrentTimeMillis.convert(timestampNanos)); +serialize(out, messagingVersion); +} + +/** + * Record any tracing data, if enabled on this message. + */ +@VisibleForTesting +void captureTracingInfo(OutboundConnectionIdentifier destinationId) +{ +try +{ +UUID sessionId = (UUID)getParameter(ParameterType.TRACE_SESSION); +if (sessionId != null) --- End diff -- I assume `sessionId != null` means that tracing is enabled? Otherwise we should explicitly check whether tracing is enabled. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #253: 13630
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/253#discussion_r216102238 --- Diff: test/unit/org/apache/cassandra/net/MessageInProcessorPre40Test.java --- @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.net; + +import java.io.IOException; +import java.net.UnknownHostException; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.net.MessageIn.MessageInProcessorPre40; +import org.apache.cassandra.net.async.ByteBufDataOutputPlus; + +public class MessageInProcessorPre40Test +{ +private static InetAddressAndPort addr; + +private MessageInProcessorPre40 processor; +private ByteBuf buf; + +@BeforeClass +public static void before() throws UnknownHostException +{ +DatabaseDescriptor.daemonInitialization(); +addr = InetAddressAndPort.getByName("127.0.0.1"); +} + +@Before +public void setup() +{ +processor = new MessageInProcessorPre40(addr, MessagingService.VERSION_30, (messageIn, integer) -> {}); +} + +@After +public void tearDown() +{ +if (buf != null && buf.refCnt() > 0) +buf.release(); +} + +@Test +public void canReadNextParam_HappyPath() throws IOException +{ +buildParamBufPre40(13); +Assert.assertTrue(processor.canReadNextParam(buf)); +} + +@Test +public void canReadNextParam_OnlyFirstByte() throws IOException +{ +buildParamBufPre40(13); +buf.writerIndex(1); +Assert.assertFalse(processor.canReadNextParam(buf)); +} + +@Test +public void canReadNextParam_PartialUTF() throws IOException +{ +buildParamBufPre40(13); +buf.writerIndex(5); +Assert.assertFalse(processor.canReadNextParam(buf)); +} + +@Test +public void canReadNextParam_TruncatedValueLength() throws IOException +{ +buildParamBufPre40(13); +buf.writerIndex(buf.writerIndex() - 13 - 2); +Assert.assertFalse(processor.canReadNextParam(buf)); +} + +@Test +public void canReadNextParam_MissingLastBytes() throws IOException +{ +buildParamBufPre40(13); +buf.writerIndex(buf.writerIndex() - 2); +Assert.assertFalse(processor.canReadNextParam(buf)); +} + +private void buildParamBufPre40(int valueLength) throws IOException +{ +buf = Unpooled.buffer(1024, 1024); // 1k should be enough for everybody! --- End diff -- 🤣 --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #253: 13630
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/253#discussion_r216086521 --- Diff: src/java/org/apache/cassandra/net/async/RebufferingByteBufDataInputPlus.java --- @@ -36,6 +37,11 @@ public class RebufferingByteBufDataInputPlus extends RebufferingInputStream implements ReadableByteChannel { +/** + * Default to a very large value. + */ +private static final long DEFAULT_REBUFFER_BLOCK_IN_MILLIS = TimeUnit.DAYS.toMillis(2); --- End diff -- I think it doesn't make sense to wait for more than 5 minutes to rebuffer. Given buffer sizes are on the order of a few megabytes at most, 5 minutes is an eternity. It's best to keep it short so failures are exposed sooner than later and rather than threads being stuck waiting on a call that doesn't timeout in a reasonable amount of time. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #253: 13630
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/253#discussion_r216047177 --- Diff: src/java/org/apache/cassandra/net/async/MessageOutHandler.java --- @@ -140,8 +125,17 @@ public void write(ChannelHandlerContext ctx, Object o, ChannelPromise promise) out = ctx.alloc().ioBuffer((int)currentFrameSize); -captureTracingInfo(msg); -serializeMessage(msg, out); +@SuppressWarnings("resource") +ByteBufDataOutputPlus outputPlus = new ByteBufDataOutputPlus(out); +msg.message.serialize(outputPlus, targetMessagingVersion, connectionId, msg.id, msg.timestampNanos); + +// next few lines are for debugging ... massively helpful!! +// if we allocated too much buffer for this message, we'll log here. +// if we allocated to little buffer space, we would have hit an exception when trying to write more bytes to it +if (out.isWritable()) +errorLogger.error("{} reported message size {}, actual message size {}, msg {}", --- End diff -- It's fine. We can leave it in. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #253: 13630
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/253#discussion_r215733891 --- Diff: src/java/org/apache/cassandra/net/async/RebufferingByteBufDataInputPlus.java --- @@ -36,6 +37,11 @@ public class RebufferingByteBufDataInputPlus extends RebufferingInputStream implements ReadableByteChannel { +/** + * Default to a very large value. + */ +private static final long DEFAULT_REBUFFER_BLOCK_IN_MILLIS = TimeUnit.DAYS.toMillis(2); --- End diff -- Its risky to default to a very large value. Why do we want to default to such a large value? --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #253: 13630
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/253#discussion_r215735492 --- Diff: src/java/org/apache/cassandra/streaming/async/NettyStreamingMessageSender.java --- @@ -393,6 +393,18 @@ private Channel getOrCreateChannel() } } +private void onError(Throwable t) +{ +try +{ +session.onError(t).get(5, TimeUnit.MINUTES); --- End diff -- Why do we have a 5 minute timeout? We should pull this out as a constant. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #253: 13630
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/253#discussion_r215739220 --- Diff: src/java/org/apache/cassandra/net/MessageIn.java --- @@ -231,4 +241,437 @@ public String toString() sbuf.append("FROM:").append(from).append(" TYPE:").append(getMessageType()).append(" VERB:").append(verb); return sbuf.toString(); } + +public static MessageInProcessor getProcessor(InetAddressAndPort peer, int messagingVersion) +{ +return getProcessor(peer, messagingVersion, MessageInProcessor.MESSAGING_SERVICE_CONSUMER); + +} + +public static MessageInProcessor getProcessor(InetAddressAndPort peer, int messagingVersion, BiConsumer messageConsumer) +{ +return messagingVersion >= MessagingService.VERSION_40 + ? new MessageInProcessorAsOf40(peer, messagingVersion, messageConsumer) + : new MessageInProcessorPre40(peer, messagingVersion, messageConsumer); + +} + +/** + * Implementations contain the mechanics and logic of parsing incoming messages. Allows for both non-blocking + * and blocking styles of interaction via the {@link #process(ByteBuf)} and {@link #process(RebufferingByteBufDataInputPlus)} + * methods, respectively. + * + * Does not contain the actual deserialization code for message fields nor payload. That is left to the + * {@link MessageIn#read(DataInputPlus, int, int)} family of methods. + */ +public static abstract class MessageInProcessor +{ +/** + * The current state of deserializing an incoming message. This enum is only used in the nonblocking versions. + */ +public enum State +{ +READ_PREFIX, +READ_IP_ADDRESS, +READ_VERB, +READ_PARAMETERS_SIZE, +READ_PARAMETERS_DATA, +READ_PAYLOAD_SIZE, +READ_PAYLOAD +} + +static final int VERB_LENGTH = Integer.BYTES; + +/** + * The default target for consuming deserialized {@link MessageIn}. + */ +private static final BiConsumer MESSAGING_SERVICE_CONSUMER = (messageIn, id) -> MessagingService.instance().receive(messageIn, id); + +final InetAddressAndPort peer; +final int messagingVersion; + +/** + * Abstracts out depending directly on {@link MessagingService#receive(MessageIn, int)}; this makes tests more sane + * as they don't require nor trigger the entire message processing circus. + */ +final BiConsumer messageConsumer; + +/** + * Captures the current {@link State} of processing a message. Primarily useful in the non-blocking use case. + */ +State state = State.READ_PREFIX; + +/** + * Captures the current data we've parsed out of in incoming message. Primarily useful in the non-blocking use case. + */ +MessageHeader messageHeader; + +/** + * Process the buffer in a non-blocking manner. Will try to read out as much of a message(s) as possible, + * and send any fully deserialized messages to {@link #messageConsumer}. + */ +public abstract void process(ByteBuf in) throws IOException; + +/** + * Process the buffer in a blocking manner. Will read as many messages as possible, blocking for more data, + * and send any fully deserialized messages to {@link #messageConsumer}. + */ +public abstract void process(RebufferingByteBufDataInputPlus in) throws IOException; + +MessageInProcessor(InetAddressAndPort peer, int messagingVersion, BiConsumer messageConsumer) +{ +this.peer = peer; +this.messagingVersion = messagingVersion; +this.messageConsumer = messageConsumer; +} + +/** + * Only applicable in the non-blocking use case, and should ony be used for testing!!! + */ +@VisibleForTesting +public MessageHeader getMessageHeader() +{ +return messageHeader; +} + +/** + * A simple struct to hold the message header data as it is being built up. + */ +public static class MessageHeader +{ +public int messageId; +long constructionTime; +public InetAddressAndPort from; +public MessagingService.Verb verb; +int payloadSize;
[GitHub] cassandra pull request #253: 13630
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/253#discussion_r215733539 --- Diff: src/java/org/apache/cassandra/net/async/MessageOutHandler.java --- @@ -140,8 +125,17 @@ public void write(ChannelHandlerContext ctx, Object o, ChannelPromise promise) out = ctx.alloc().ioBuffer((int)currentFrameSize); -captureTracingInfo(msg); -serializeMessage(msg, out); +@SuppressWarnings("resource") +ByteBufDataOutputPlus outputPlus = new ByteBufDataOutputPlus(out); +msg.message.serialize(outputPlus, targetMessagingVersion, connectionId, msg.id, msg.timestampNanos); + +// next few lines are for debugging ... massively helpful!! +// if we allocated too much buffer for this message, we'll log here. +// if we allocated to little buffer space, we would have hit an exception when trying to write more bytes to it +if (out.isWritable()) +errorLogger.error("{} reported message size {}, actual message size {}, msg {}", --- End diff -- How likely are we to cause log spam? --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #253: 13630
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/253#discussion_r215734926 --- Diff: src/java/org/apache/cassandra/net/async/RebufferingByteBufDataInputPlus.java --- @@ -183,6 +195,11 @@ public int available() throws EOFException return availableBytes; } +public boolean isEmpty() throws EOFException +{ +return available() == 0; --- End diff -- The method `available()` has a side effect of `channelConfig.setAutoRead(true)` when the `availableBytes` falls below the `lowWatermark`. Are you sure that is ok? --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #255: Marcuse/14618
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/255#discussion_r213558154 --- Diff: src/java/org/apache/cassandra/tools/fqltool/QueryReplayer.java --- @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tools.fqltool; + +import java.io.Closeable; +import java.io.File; +import java.io.PrintStream; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +import javax.annotation.Nullable; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; + +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.Timer; +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.Session; +import org.apache.cassandra.utils.FBUtilities; + +public class QueryReplayer implements Closeable +{ +private final ExecutorService es = Executors.newFixedThreadPool(1); +private final Iterator> queryIterator; +private final List targetHosts; +private final List targetClusters; +private final List> filters; +private final List sessions; +private final ResultHandler resultHandler; +private final MetricRegistry metrics = new MetricRegistry(); +private final boolean debug; +private final PrintStream out; + +public QueryReplayer(Iterator> queryIterator, + List targetHosts, + List resultPaths, + List> filters, + PrintStream out, + String useKeyspace, + String queryFilePathString, + boolean debug) +{ +this.queryIterator = queryIterator; +this.targetHosts = targetHosts; +targetClusters = targetHosts.stream().map(h -> Cluster.builder().addContactPoint(h).build()).collect(Collectors.toList()); +this.filters = filters; +sessions = useKeyspace != null ? + targetClusters.stream().map(c -> c.connect(useKeyspace)).collect(Collectors.toList()) : + targetClusters.stream().map(Cluster::connect).collect(Collectors.toList()); +File queryFilePath = queryFilePathString != null ? new File(queryFilePathString) : null; +resultHandler = new ResultHandler(targetHosts, resultPaths, queryFilePath); +this.debug = debug; +this.out = out; +} + +public void replay() +{ +while (queryIterator.hasNext()) +{ +List queries = queryIterator.next(); +for (FQLQuery query : queries) +{ +if (filters.stream().anyMatch(f -> !f.test(query))) +continue; +try (Timer.Context ctx = metrics.timer("queries").time()) +{ + List> results = new ArrayList<>(sessions.size()); +for (Session session : sessions) +{ +try +{ +if (query.keyspace != null && !query.keyspace.equals(session.getLoggedKeyspace())) +{ +if (debug) +{ +out.println(String.format("Switching keyspace from %s to %s", session.getLoggedKeyspace(), query.keyspace)); +
[GitHub] cassandra pull request #255: Marcuse/14618
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/255#discussion_r213557602 --- Diff: src/java/org/apache/cassandra/tools/fqltool/QueryReplayer.java --- @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tools.fqltool; + +import java.io.Closeable; +import java.io.File; +import java.io.PrintStream; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +import javax.annotation.Nullable; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; + +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.Timer; +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.Session; +import org.apache.cassandra.utils.FBUtilities; + +public class QueryReplayer implements Closeable +{ +private final ExecutorService es = Executors.newFixedThreadPool(1); +private final Iterator> queryIterator; +private final List targetHosts; +private final List targetClusters; +private final List> filters; +private final List sessions; +private final ResultHandler resultHandler; +private final MetricRegistry metrics = new MetricRegistry(); +private final boolean debug; +private final PrintStream out; + +public QueryReplayer(Iterator> queryIterator, + List targetHosts, + List resultPaths, + List> filters, + PrintStream out, + String useKeyspace, + String queryFilePathString, + boolean debug) +{ +this.queryIterator = queryIterator; +this.targetHosts = targetHosts; +targetClusters = targetHosts.stream().map(h -> Cluster.builder().addContactPoint(h).build()).collect(Collectors.toList()); +this.filters = filters; +sessions = useKeyspace != null ? + targetClusters.stream().map(c -> c.connect(useKeyspace)).collect(Collectors.toList()) : + targetClusters.stream().map(Cluster::connect).collect(Collectors.toList()); +File queryFilePath = queryFilePathString != null ? new File(queryFilePathString) : null; +resultHandler = new ResultHandler(targetHosts, resultPaths, queryFilePath); +this.debug = debug; +this.out = out; +} + +public void replay() +{ +while (queryIterator.hasNext()) +{ +List queries = queryIterator.next(); +for (FQLQuery query : queries) +{ +if (filters.stream().anyMatch(f -> !f.test(query))) +continue; +try (Timer.Context ctx = metrics.timer("queries").time()) +{ + List> results = new ArrayList<>(sessions.size()); +for (Session session : sessions) +{ +try +{ +if (query.keyspace != null && !query.keyspace.equals(session.getLoggedKeyspace())) +{ +if (debug) +{ --- End diff -- Single statement blocks do not require braces. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #255: Marcuse/14618
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/255#discussion_r213559247 --- Diff: src/java/org/apache/cassandra/tools/fqltool/FQLQueryIterator.java --- @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tools.fqltool; + +import java.util.PriorityQueue; + +import net.openhft.chronicle.queue.ExcerptTailer; +import org.apache.cassandra.utils.AbstractIterator; + +public class FQLQueryIterator extends AbstractIterator +{ +private final PriorityQueue pq; +private final ExcerptTailer tailer; +private final FQLQueryReader reader; + +/** + * Create an iterator over the FQLQueries in tailer + * + * Reads up to readAhead queries in to memory to be able to sort them (the files are mostly sorted already) + */ +public FQLQueryIterator(ExcerptTailer tailer, int readAhead, boolean legacyFiles) +{ +assert readAhead > 0 : "ReadAhead needs to be > 0"; --- End diff -- Nit: `ReadAhead` -> `readAhead` in the message --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #255: Marcuse/14618
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/255#discussion_r213557051 --- Diff: src/java/org/apache/cassandra/tools/fqltool/QueryReplayer.java --- @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tools.fqltool; + +import java.io.Closeable; +import java.io.File; +import java.io.PrintStream; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +import javax.annotation.Nullable; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; + +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.Timer; +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.Session; +import org.apache.cassandra.utils.FBUtilities; + +public class QueryReplayer implements Closeable +{ +private final ExecutorService es = Executors.newFixedThreadPool(1); +private final Iterator> queryIterator; +private final List targetHosts; --- End diff -- This variable is unused. Its only used in the constructor. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #255: Marcuse/14618
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/255#discussion_r213564852 --- Diff: src/java/org/apache/cassandra/tools/fqltool/ResultStore.java --- @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tools.fqltool; + + +import java.io.File; +import java.nio.ByteBuffer; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +import net.openhft.chronicle.bytes.BytesStore; +import net.openhft.chronicle.core.io.Closeable; +import net.openhft.chronicle.queue.ChronicleQueue; +import net.openhft.chronicle.queue.ChronicleQueueBuilder; +import net.openhft.chronicle.queue.ExcerptAppender; +import net.openhft.chronicle.wire.ValueOut; +import org.apache.cassandra.utils.binlog.BinLog; + +/** + * see FQLReplayTest#readResultFile for how to read files produced by this class + */ +public class ResultStore +{ +private final List queues; +private final List appenders; +private final ChronicleQueue queryStoreQueue; +private final ExcerptAppender queryStoreAppender; +private final Set finishedHosts = new HashSet<>(); +private final File queryFilePath; --- End diff -- Unused variable. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #255: Marcuse/14618
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/255#discussion_r213562282 --- Diff: src/java/org/apache/cassandra/tools/fqltool/DriverResultSet.java --- @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tools.fqltool; + +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.stream.Collectors; + +import com.google.common.collect.AbstractIterator; + +import com.datastax.driver.core.ColumnDefinitions; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Row; +import org.apache.cassandra.utils.ByteBufferUtil; + + +/** + * Wraps a result set from the driver so that we can reuse the compare code when reading + * up a result set produced by ResultStore. + */ +public class DriverResultSet implements ResultHandler.ComparableResultSet +{ +private final ResultSet resultSet; +private final Throwable failureException; + +public DriverResultSet(ResultSet resultSet) +{ +this(resultSet, null); +} + +private DriverResultSet(ResultSet res, Throwable failureException) +{ +resultSet = res; +this.failureException = failureException; +} + +public static DriverResultSet failed(Throwable ex) +{ +return new DriverResultSet(null, ex); +} + +public ResultHandler.ComparableColumnDefinitions getColumnDefinitions() +{ +if (wasFailed()) +return new DriverColumnDefinitions(null, true); + +return new DriverColumnDefinitions(resultSet.getColumnDefinitions()); +} + +public boolean wasFailed() +{ +return failureException != null; +} + +public Throwable getFailureException() +{ +return failureException; +} + +public Iterator iterator() +{ +if (wasFailed()) +return Collections.emptyListIterator(); +return new AbstractIterator() +{ +Iterator iter = resultSet.iterator(); +protected ResultHandler.ComparableRow computeNext() +{ +if (iter.hasNext()) +{ --- End diff -- Unnecessary braces for single statement block (there are bunch of these in this file). --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #255: Marcuse/14618
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/255#discussion_r213563398 --- Diff: src/java/org/apache/cassandra/tools/fqltool/FQLQuery.java --- @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tools.fqltool; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +import com.google.common.primitives.Longs; +import com.google.common.util.concurrent.FluentFuture; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; + +import com.datastax.driver.core.BatchStatement; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.SimpleStatement; +import org.apache.cassandra.audit.FullQueryLogger; +import org.apache.cassandra.cql3.QueryOptions; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.binlog.BinLog; + +public abstract class FQLQuery implements Comparable +{ +public final long queryTime; +public final QueryOptions queryOptions; +public final int protocolVersion; +public final String keyspace; + +public FQLQuery(String keyspace, int protocolVersion, QueryOptions queryOptions, long queryTime) +{ +this.queryTime = queryTime; +this.queryOptions = queryOptions; +this.protocolVersion = protocolVersion; +this.keyspace = keyspace; +} + +public abstract ListenableFuture execute(Session session); + +/** + * used when storing the queries executed + */ +public abstract BinLog.ReleaseableWriteMarshallable toMarshallable(); + +/** + * Make sure we catch any query errors + * + * On error, this creates a failed ComparableResultSet with the exception set to be able to store + * this fact in the result file and handle comparison of failed result sets. + */ +ListenableFuture handleErrors(ListenableFuture result) +{ +FluentFuture fluentFuture = FluentFuture.from(result) + .transform(DriverResultSet::new, MoreExecutors.directExecutor()); +return fluentFuture.catching(Throwable.class, DriverResultSet::failed, MoreExecutors.directExecutor()); +} + +public boolean equals(Object o) +{ +if (this == o) return true; +if (!(o instanceof FQLQuery)) return false; +FQLQuery fqlQuery = (FQLQuery) o; +return queryTime == fqlQuery.queryTime && + protocolVersion == fqlQuery.protocolVersion && + queryOptions.getValues().equals(fqlQuery.queryOptions.getValues()) && + Objects.equals(keyspace, fqlQuery.keyspace); +} + +public int hashCode() +{ +return Objects.hash(queryTime, queryOptions, protocolVersion, keyspace); +} + +public int compareTo(FQLQuery other) +{ +return Longs.compare(queryTime, other.queryTime); +} + +public static class Single extends FQLQuery +{ +public final String query; +public final List values; + +public Single(String keyspace, int protocolVersion, QueryOptions queryOptions, long queryTime, String queryString, List values) +{ +super(keyspace, protocolVersion, queryOptions, queryTime); +this.query = queryString; +this.values = values; +}
[GitHub] cassandra pull request #255: Marcuse/14618
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/255#discussion_r213562501 --- Diff: src/java/org/apache/cassandra/tools/fqltool/FQLQuery.java --- @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tools.fqltool; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +import com.google.common.primitives.Longs; +import com.google.common.util.concurrent.FluentFuture; +import com.google.common.util.concurrent.Futures; --- End diff -- unused import. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #255: Marcuse/14618
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/255#discussion_r213558579 --- Diff: src/java/org/apache/cassandra/tools/fqltool/QueryReplayer.java --- @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tools.fqltool; + +import java.io.Closeable; +import java.io.File; +import java.io.PrintStream; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +import javax.annotation.Nullable; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; + +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.Timer; +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.Session; +import org.apache.cassandra.utils.FBUtilities; + +public class QueryReplayer implements Closeable +{ +private final ExecutorService es = Executors.newFixedThreadPool(1); +private final Iterator> queryIterator; +private final List targetHosts; +private final List targetClusters; +private final List> filters; +private final List sessions; +private final ResultHandler resultHandler; +private final MetricRegistry metrics = new MetricRegistry(); +private final boolean debug; +private final PrintStream out; + +public QueryReplayer(Iterator> queryIterator, + List targetHosts, + List resultPaths, + List> filters, + PrintStream out, + String useKeyspace, + String queryFilePathString, + boolean debug) +{ +this.queryIterator = queryIterator; +this.targetHosts = targetHosts; +targetClusters = targetHosts.stream().map(h -> Cluster.builder().addContactPoint(h).build()).collect(Collectors.toList()); +this.filters = filters; +sessions = useKeyspace != null ? + targetClusters.stream().map(c -> c.connect(useKeyspace)).collect(Collectors.toList()) : + targetClusters.stream().map(Cluster::connect).collect(Collectors.toList()); +File queryFilePath = queryFilePathString != null ? new File(queryFilePathString) : null; +resultHandler = new ResultHandler(targetHosts, resultPaths, queryFilePath); +this.debug = debug; +this.out = out; +} + +public void replay() +{ +while (queryIterator.hasNext()) +{ +List queries = queryIterator.next(); +for (FQLQuery query : queries) +{ +if (filters.stream().anyMatch(f -> !f.test(query))) +continue; +try (Timer.Context ctx = metrics.timer("queries").time()) +{ + List> results = new ArrayList<>(sessions.size()); +for (Session session : sessions) +{ +try +{ +if (query.keyspace != null && !query.keyspace.equals(session.getLoggedKeyspace())) +{ +if (debug) +{ +out.println(String.format("Switching keyspace from %s to %s", session.getLoggedKeyspace(), query.keyspace)); +
[GitHub] cassandra pull request #255: Marcuse/14618
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/255#discussion_r213558075 --- Diff: src/java/org/apache/cassandra/tools/fqltool/QueryReplayer.java --- @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tools.fqltool; + +import java.io.Closeable; +import java.io.File; +import java.io.PrintStream; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +import javax.annotation.Nullable; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; + +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.Timer; +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.Session; +import org.apache.cassandra.utils.FBUtilities; + +public class QueryReplayer implements Closeable +{ +private final ExecutorService es = Executors.newFixedThreadPool(1); +private final Iterator> queryIterator; +private final List targetHosts; +private final List targetClusters; +private final List> filters; +private final List sessions; +private final ResultHandler resultHandler; +private final MetricRegistry metrics = new MetricRegistry(); +private final boolean debug; +private final PrintStream out; + +public QueryReplayer(Iterator> queryIterator, + List targetHosts, + List resultPaths, + List> filters, + PrintStream out, + String useKeyspace, + String queryFilePathString, + boolean debug) +{ +this.queryIterator = queryIterator; +this.targetHosts = targetHosts; +targetClusters = targetHosts.stream().map(h -> Cluster.builder().addContactPoint(h).build()).collect(Collectors.toList()); +this.filters = filters; +sessions = useKeyspace != null ? + targetClusters.stream().map(c -> c.connect(useKeyspace)).collect(Collectors.toList()) : + targetClusters.stream().map(Cluster::connect).collect(Collectors.toList()); +File queryFilePath = queryFilePathString != null ? new File(queryFilePathString) : null; +resultHandler = new ResultHandler(targetHosts, resultPaths, queryFilePath); +this.debug = debug; +this.out = out; +} + +public void replay() +{ +while (queryIterator.hasNext()) +{ +List queries = queryIterator.next(); +for (FQLQuery query : queries) +{ +if (filters.stream().anyMatch(f -> !f.test(query))) +continue; +try (Timer.Context ctx = metrics.timer("queries").time()) +{ + List> results = new ArrayList<>(sessions.size()); +for (Session session : sessions) +{ +try +{ +if (query.keyspace != null && !query.keyspace.equals(session.getLoggedKeyspace())) +{ +if (debug) +{ +out.println(String.format("Switching keyspace from %s to %s", session.getLoggedKeyspace(), query.keyspac
[GitHub] cassandra pull request #255: Marcuse/14618
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/255#discussion_r213563877 --- Diff: src/java/org/apache/cassandra/tools/fqltool/FQLQueryReader.java --- @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tools.fqltool; + + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +import com.datastax.driver.core.BatchStatement; +import io.netty.buffer.Unpooled; +import net.openhft.chronicle.core.io.IORuntimeException; +import net.openhft.chronicle.wire.ReadMarshallable; +import net.openhft.chronicle.wire.ValueIn; +import net.openhft.chronicle.wire.WireIn; +import org.apache.cassandra.cql3.QueryOptions; +import org.apache.cassandra.transport.ProtocolVersion; + +public class FQLQueryReader implements ReadMarshallable +{ +private FQLQuery query; +private final boolean legacyFiles; + +public FQLQueryReader() +{ +this(false); +} + +public FQLQueryReader(boolean legacyFiles) +{ +this.legacyFiles = legacyFiles; +} + +public void readMarshallable(WireIn wireIn) throws IORuntimeException +{ +String type = wireIn.read("type").text(); +int protocolVersion = wireIn.read("protocol-version").int32(); +QueryOptions queryOptions = QueryOptions.codec.decode(Unpooled.wrappedBuffer(wireIn.read("query-options").bytes()), ProtocolVersion.decode(protocolVersion)); +long queryTime = wireIn.read("query-time").int64(); +String keyspace = legacyFiles ? null : wireIn.read("keyspace").text(); +switch (type) +{ +case "single": +String queryString = wireIn.read("query").text(); +query = new FQLQuery.Single(keyspace, protocolVersion, queryOptions, queryTime, queryString, queryOptions.getValues()); +break; +case "batch": +BatchStatement.Type batchType = BatchStatement.Type.valueOf(wireIn.read("batch-type").text()); +ValueIn in = wireIn.read("queries"); +int queryCount = in.int32(); + +List queries = new ArrayList<>(queryCount); +for (int i = 0; i < queryCount; i++) +queries.add(in.text()); +in = wireIn.read("values"); +int valueCount = in.int32(); +List> values = new ArrayList<>(valueCount); +for (int ii = 0; ii < valueCount; ii++) +{ +List subValues = new ArrayList<>(); +values.add(subValues); +int numSubValues = in.int32(); +for (int zz = 0; zz < numSubValues; zz++) +{ --- End diff -- Redundant braces --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #255: Marcuse/14618
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/255#discussion_r213564246 --- Diff: src/java/org/apache/cassandra/tools/fqltool/ResultComparator.java --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tools.fqltool; + + +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +import com.google.common.collect.Streams; + +public class ResultComparator +{ +/** + * Compares the rows in rows + * the row at position x in rows will have come from host at position x in targetHosts + */ +public boolean compareRows(List targetHosts, FQLQuery query, List rows) +{ +if (rows.size() < 2 || rows.stream().allMatch(Objects::isNull)) +return true; + +if (rows.stream().anyMatch(Objects::isNull)) +{ +handleMismatch(targetHosts, query, rows); +return false; +} + +ResultHandler.ComparableRow ref = rows.get(0); +boolean equal = true; +for (int i = 1; i < rows.size(); i++) +{ +ResultHandler.ComparableRow compare = rows.get(i); +if (!ref.equals(compare)) +{ +equal = false; +handleMismatch(targetHosts, query, rows); +} +} +return equal; +} + +/** + * Compares the column definitions + * + * the column definitions at position x in cds will have come from host at position x in targetHosts + */ +public boolean compareColumnDefinitions(List targetHosts, FQLQuery query, List cds) +{ +if (cds.size() < 2) +return true; + +boolean equal = true; +List refDefs = cds.get(0).asList(); +for (int i = 1; i < cds.size(); i++) +{ +List toCompare = cds.get(i).asList(); +if (!refDefs.equals(toCompare)) +{ +handleColumnDefMismatch(targetHosts, query, cds); +equal = false; +} +} +return equal; +} + +private void handleMismatch(List targetHosts, FQLQuery query, List rows) +{ +System.out.println("MISMATCH:"); +System.out.println("Query = " + query); +System.out.println("Results:"); +System.out.println(Streams.zip(rows.stream(), targetHosts.stream(), (r, host) -> String.format("%s: %s%n", host, r == null ? "null" : r)).collect(Collectors.joining())); +} + +private void handleColumnDefMismatch(List targetHosts, FQLQuery query, List cds) +{ +System.out.println("COLUMN DEFINITION MISMATCH:"); +System.out.println("Query = " + query); +System.out.println("Results: "); +System.out.println(Streams.zip(cds.stream(), targetHosts.stream(), (cd, host) -> String.format("%s: %s%n", host, columnDefinitionsString(cd))).collect(Collectors.joining())); +} + +private String columnDefinitionsString(ResultHandler.ComparableColumnDefinitions cd) +{ +StringBuilder sb = new StringBuilder(); +if (cd == null) +{ --- End diff -- Redundant braces --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #255: Marcuse/14618
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/255#discussion_r213562548 --- Diff: src/java/org/apache/cassandra/tools/fqltool/FQLQuery.java --- @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tools.fqltool; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +import com.google.common.primitives.Longs; +import com.google.common.util.concurrent.FluentFuture; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; + +import com.datastax.driver.core.BatchStatement; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.ResultSetFuture; --- End diff -- unused import. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #255: Marcuse/14618
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/255#discussion_r213564366 --- Diff: src/java/org/apache/cassandra/tools/fqltool/ResultHandler.java --- @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tools.fqltool; + +import java.io.File; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +import com.google.common.annotations.VisibleForTesting; + +import com.datastax.driver.core.ResultSet; --- End diff -- Unused import. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #255: Marcuse/14618
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/255#discussion_r213558547 --- Diff: src/java/org/apache/cassandra/tools/fqltool/QueryReplayer.java --- @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tools.fqltool; + +import java.io.Closeable; +import java.io.File; +import java.io.PrintStream; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +import javax.annotation.Nullable; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; + +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.Timer; +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.Session; +import org.apache.cassandra.utils.FBUtilities; + +public class QueryReplayer implements Closeable +{ +private final ExecutorService es = Executors.newFixedThreadPool(1); +private final Iterator> queryIterator; +private final List targetHosts; +private final List targetClusters; +private final List> filters; +private final List sessions; +private final ResultHandler resultHandler; +private final MetricRegistry metrics = new MetricRegistry(); +private final boolean debug; +private final PrintStream out; + +public QueryReplayer(Iterator> queryIterator, + List targetHosts, + List resultPaths, + List> filters, + PrintStream out, + String useKeyspace, + String queryFilePathString, + boolean debug) +{ +this.queryIterator = queryIterator; +this.targetHosts = targetHosts; +targetClusters = targetHosts.stream().map(h -> Cluster.builder().addContactPoint(h).build()).collect(Collectors.toList()); +this.filters = filters; +sessions = useKeyspace != null ? + targetClusters.stream().map(c -> c.connect(useKeyspace)).collect(Collectors.toList()) : + targetClusters.stream().map(Cluster::connect).collect(Collectors.toList()); +File queryFilePath = queryFilePathString != null ? new File(queryFilePathString) : null; +resultHandler = new ResultHandler(targetHosts, resultPaths, queryFilePath); +this.debug = debug; +this.out = out; +} + +public void replay() +{ +while (queryIterator.hasNext()) +{ +List queries = queryIterator.next(); +for (FQLQuery query : queries) +{ +if (filters.stream().anyMatch(f -> !f.test(query))) +continue; +try (Timer.Context ctx = metrics.timer("queries").time()) +{ + List> results = new ArrayList<>(sessions.size()); +for (Session session : sessions) +{ +try +{ +if (query.keyspace != null && !query.keyspace.equals(session.getLoggedKeyspace())) +{ +if (debug) +{ +out.println(String.format("Switching keyspace from %s to %s", session.getLoggedKeyspace(), query.keyspace)); +
[GitHub] cassandra pull request #255: Marcuse/14618
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/255#discussion_r213562817 --- Diff: src/java/org/apache/cassandra/tools/fqltool/FQLQuery.java --- @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tools.fqltool; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +import com.google.common.primitives.Longs; +import com.google.common.util.concurrent.FluentFuture; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; + +import com.datastax.driver.core.BatchStatement; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.SimpleStatement; +import org.apache.cassandra.audit.FullQueryLogger; +import org.apache.cassandra.cql3.QueryOptions; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.binlog.BinLog; + +public abstract class FQLQuery implements Comparable +{ +public final long queryTime; +public final QueryOptions queryOptions; +public final int protocolVersion; +public final String keyspace; + +public FQLQuery(String keyspace, int protocolVersion, QueryOptions queryOptions, long queryTime) --- End diff -- Is there a reason why you didn't use `o.a.c.t.ProtocolVersion`? --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #244: Refactor and add samplers for CASSANDRA-14436
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/244#discussion_r210488658 --- Diff: src/java/org/apache/cassandra/metrics/MaxSampler.java --- @@ -0,0 +1,59 @@ +package org.apache.cassandra.metrics; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; + +import com.google.common.collect.MinMaxPriorityQueue; + +public abstract class MaxSampler extends Sampler +{ +private int capacity; +private MinMaxPriorityQueue> queue; +private long endTimeMillis = -1; +private final Comparator> comp = Collections.reverseOrder(Comparator.comparing(p -> p.count)); + +public boolean isEnabled() +{ +return endTimeMillis != -1 && clock.currentTimeMillis() <= endTimeMillis; +} + --- End diff -- Nit: Extra space. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #244: Refactor and add samplers for CASSANDRA-14436
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/244#discussion_r210487768 --- Diff: src/java/org/apache/cassandra/db/ReadExecutionController.java --- @@ -132,6 +145,15 @@ public void close() { if (baseOp != null) baseOp.close(); + +if (startTimeNanos != -1) +{ +String cql = command.toCQLString(); +int timeMicros = (int) Math.min(TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - startTimeNanos), Integer.MAX_VALUE); --- End diff -- This should be `o.a.c.u.Clock` --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra issue #244: Refactor and add samplers for CASSANDRA-14436
Github user dineshjoshi commented on the issue: https://github.com/apache/cassandra/pull/244 Stepping back a bit, I see the samplers are stateful classes that are enabled and disabled. This means, if there is an exception in the rmi thread that is executing the `beginLocalSampling` and `finishLocalSampling`, the samplers will continue to run indefinitely and this might cause issues. It would be best to instantiate Samplers on demand with a specific Duration. Each sampler can stop accepting new samples once the duration expires. This would also mean that you no longer have to keep enabling disabling samplers - allowing you to get rid of `enabled` and other internal state variables could be made immutable for example in `FrequencySampler`, `StreamSummary` can be declared as `final` and initialized in the constructor. If you want all samplers to start sampling exactly at the same moment (not sure if that is a requirement) then you could potentially use a shared countdown latch. The thread instantiating the samplers can decrement it once it is done creating and initializing all samplers. WDYT? --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #244: Refactor and add samplers for CASSANDRA-14436
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/244#discussion_r207697381 --- Diff: src/java/org/apache/cassandra/metrics/FrequencySampler.java --- @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.metrics; + +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.clearspring.analytics.stream.StreamSummary; + +/** + * Find the most frequent sample. A sample adds to the sum of its key ie + * add("x", 10); and add("x", 20); will result in "x" = 30 This uses StreamSummary to only store the + * approximate cardinality (capacity) of keys. If the number of distinct keys exceed the capacity, the error of the + * sample may increase depending on distribution of keys among the total set. + * + * @param + */ +public abstract class FrequencySampler extends Sampler +{ +private static final Logger logger = LoggerFactory.getLogger(FrequencySampler.class); +private boolean enabled = false; + +private StreamSummary summary; + +/** + * Start to record samples + * + * @param capacity + *Number of sample items to keep in memory, the lower this is + *the less accurate results are. For best results use value + *close to cardinality, but understand the memory trade offs. + */ +public synchronized void beginSampling(int capacity) +{ +if (!enabled) +{ +summary = new StreamSummary(capacity); +enabled = true; +} +} + +/** + * Call to stop collecting samples, and gather the results + * @param count Number of most frequent items to return + */ +public synchronized List> finishSampling(int count) +{ +List> results = Collections.EMPTY_LIST; --- End diff -- `Collections.emptyList()` is safer. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #244: Refactor and add samplers for CASSANDRA-14436
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/244#discussion_r207697017 --- Diff: src/java/org/apache/cassandra/metrics/TableMetrics.java --- @@ -281,7 +301,7 @@ public Long getValue() public final Meter readRepairRequests; public final Meter shortReadProtectionRequests; -public final Map> samplers; +public final Map> samplers; --- End diff -- This can be replaced with an `EnumMap`. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #244: Refactor and add samplers for CASSANDRA-14436
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/244#discussion_r207697707 --- Diff: src/java/org/apache/cassandra/metrics/Sampler.java --- @@ -0,0 +1,67 @@ +package org.apache.cassandra.metrics; + +import java.io.Serializable; +import java.util.List; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor; +import org.apache.cassandra.concurrent.NamedThreadFactory; + +import com.google.common.annotations.VisibleForTesting; + +public abstract class Sampler --- End diff -- It would be nice to have a jmh benchmark for the new `Sampler`s --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #244: Refactor and add samplers for CASSANDRA-14436
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/244#discussion_r207697487 --- Diff: src/java/org/apache/cassandra/metrics/Sampler.java --- @@ -0,0 +1,67 @@ +package org.apache.cassandra.metrics; + +import java.io.Serializable; +import java.util.List; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor; +import org.apache.cassandra.concurrent.NamedThreadFactory; + +import com.google.common.annotations.VisibleForTesting; + +public abstract class Sampler +{ +public enum SamplerType +{ +READS, WRITES, LOCAL_READ_TIME, WRITE_SIZE, CAS_CONTENTIONS +} + +@VisibleForTesting +static final ThreadPoolExecutor samplerExecutor = new JMXEnabledThreadPoolExecutor(1, 1, +TimeUnit.SECONDS, +new LinkedBlockingQueue(), +new NamedThreadFactory("Sampler"), +"internal"); + +public void addSample(final T item, final int value) +{ +if (isEnabled()) +samplerExecutor.execute(() -> insert(item, value)); +} + +protected abstract void insert(T item, long value); + +public abstract boolean isEnabled(); + +public abstract void beginSampling(int capacity); + +public abstract List> finishSampling(int count); + +public abstract String toString(T value); + +/** + * Represents the ranked items collected during a sample period + */ +public static class Sample implements Serializable +{ + --- End diff -- Extra white space? --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #244: Refactor and add samplers for CASSANDRA-14436
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/244#discussion_r207697651 --- Diff: src/java/org/apache/cassandra/tools/nodetool/ProfileLoad.java --- @@ -0,0 +1,178 @@ +package org.apache.cassandra.tools.nodetool; + +import static com.google.common.base.Preconditions.checkArgument; +import static org.apache.commons.lang3.StringUtils.join; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; + +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.OpenDataException; + +import org.apache.cassandra.metrics.Sampler.SamplerType; +import org.apache.cassandra.tools.NodeProbe; +import org.apache.cassandra.tools.NodeTool.NodeToolCmd; +import org.apache.cassandra.tools.nodetool.formatter.TableBuilder; +import org.apache.cassandra.utils.Pair; + +import com.google.common.collect.Lists; + +import io.airlift.airline.Arguments; +import io.airlift.airline.Command; +import io.airlift.airline.Option; + +@Command(name = "profileload", description = "Low footprint profiling of activity for a period of time") +public class ProfileLoad extends NodeToolCmd +{ +@Arguments(usage = " ", description = "The keyspace, column family name, and duration in milliseconds") +private List args = new ArrayList<>(); + +@Option(name = "-s", description = "Capacity of the sampler, higher for more accuracy (Default: 256)") +private int capacity = 256; + +@Option(name = "-k", description = "Number of the top samples to list (Default: 10)") +private int topCount = 10; + +@Option(name = "-a", description = "Comma separated list of samplers to use (Default: all)") +private String samplers = join(SamplerType.values(), ','); + +@Override +public void execute(NodeProbe probe) +{ +checkArgument(args.size() == 3 || args.size() == 1 || args.size() == 0, "Invalid arguments, either [keyspace table duration] or [duration] or no args"); +checkArgument(topCount < capacity, "TopK count (-k) option must be smaller then the summary capacity (-s)"); +String keyspace = null; +String table = null; +Integer duration = 1; +if(args.size() == 3) +{ +keyspace = args.get(0); +table = args.get(1); +duration = Integer.valueOf(args.get(2)); +} +else if (args.size() == 1) +{ +duration = Integer.valueOf(args.get(0)); +} +// generate the list of samplers +List targets = Lists.newArrayList(); +List available = Arrays.stream(SamplerType.values()).map(Enum::toString).collect(Collectors.toList()); +for (String s : samplers.split(",")) +{ +String sampler = s.trim().toUpperCase(); +checkArgument(available.contains(sampler), String.format("'%s' sampler is not available from: %s", s, Arrays.toString(SamplerType.values(; +targets.add(sampler); +} + +Map> results; +try +{ +if (keyspace == null) +{ --- End diff -- Braces are unnecessary. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #244: Refactor and add samplers for CASSANDRA-14436
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/244#discussion_r207697217 --- Diff: src/java/org/apache/cassandra/db/ReadExecutionController.java --- @@ -113,6 +123,11 @@ static ReadExecutionController forCommand(ReadCommand command) throw e; } } +if (baseCfs.metric.topLocalReadQueryTime.isEnabled()) +{ --- End diff -- Single line if conditions don't need braces. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #244: Refactor and add samplers for CASSANDRA-14436
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/244#discussion_r207697430 --- Diff: src/java/org/apache/cassandra/metrics/FrequencySampler.java --- @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.metrics; + +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.clearspring.analytics.stream.StreamSummary; + +/** + * Find the most frequent sample. A sample adds to the sum of its key ie + * add("x", 10); and add("x", 20); will result in "x" = 30 This uses StreamSummary to only store the + * approximate cardinality (capacity) of keys. If the number of distinct keys exceed the capacity, the error of the + * sample may increase depending on distribution of keys among the total set. + * + * @param + */ +public abstract class FrequencySampler extends Sampler +{ +private static final Logger logger = LoggerFactory.getLogger(FrequencySampler.class); +private boolean enabled = false; + +private StreamSummary summary; + +/** + * Start to record samples + * + * @param capacity + *Number of sample items to keep in memory, the lower this is + *the less accurate results are. For best results use value + *close to cardinality, but understand the memory trade offs. + */ +public synchronized void beginSampling(int capacity) +{ +if (!enabled) +{ +summary = new StreamSummary(capacity); +enabled = true; +} +} + +/** + * Call to stop collecting samples, and gather the results + * @param count Number of most frequent items to return + */ +public synchronized List> finishSampling(int count) +{ +List> results = Collections.EMPTY_LIST; +if (enabled) +{ +enabled = false; +results = summary.topK(count) + .stream() + .map(c -> new Sample(c.getItem(), c.getCount(), c.getError())) + .collect(Collectors.toList()); +} +return results; +} + +protected synchronized void insert(final T item, final long value) +{ +// samplerExecutor is single threaded but still need +// synchronization against jmx calls to finishSampling +if (enabled && value > 0) +{ +try +{ +summary.offer(item, (int) Math.min(value, Integer.MAX_VALUE)); +} catch (Exception e) +{ +logger.trace("Failure to offer sample", e); +} +} +} + +public boolean isEnabled() +{ +return enabled; +} + +public void setEnabled(boolean enabled) +{ +this.enabled = enabled; --- End diff -- This allows the user of the class to enable the `FrequencySampler` without actually initializing the `summary` variable. This will cause an NPE. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #244: Refactor and add samplers for CASSANDRA-14436
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/244#discussion_r207697637 --- Diff: src/java/org/apache/cassandra/tools/nodetool/ProfileLoad.java --- @@ -0,0 +1,178 @@ +package org.apache.cassandra.tools.nodetool; + +import static com.google.common.base.Preconditions.checkArgument; +import static org.apache.commons.lang3.StringUtils.join; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; + +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.OpenDataException; + +import org.apache.cassandra.metrics.Sampler.SamplerType; +import org.apache.cassandra.tools.NodeProbe; +import org.apache.cassandra.tools.NodeTool.NodeToolCmd; +import org.apache.cassandra.tools.nodetool.formatter.TableBuilder; +import org.apache.cassandra.utils.Pair; + +import com.google.common.collect.Lists; + +import io.airlift.airline.Arguments; +import io.airlift.airline.Command; +import io.airlift.airline.Option; + +@Command(name = "profileload", description = "Low footprint profiling of activity for a period of time") +public class ProfileLoad extends NodeToolCmd +{ +@Arguments(usage = " ", description = "The keyspace, column family name, and duration in milliseconds") +private List args = new ArrayList<>(); + +@Option(name = "-s", description = "Capacity of the sampler, higher for more accuracy (Default: 256)") +private int capacity = 256; + +@Option(name = "-k", description = "Number of the top samples to list (Default: 10)") +private int topCount = 10; + +@Option(name = "-a", description = "Comma separated list of samplers to use (Default: all)") +private String samplers = join(SamplerType.values(), ','); + +@Override +public void execute(NodeProbe probe) +{ +checkArgument(args.size() == 3 || args.size() == 1 || args.size() == 0, "Invalid arguments, either [keyspace table duration] or [duration] or no args"); +checkArgument(topCount < capacity, "TopK count (-k) option must be smaller then the summary capacity (-s)"); +String keyspace = null; +String table = null; +Integer duration = 1; --- End diff -- What is the unit for `duration`? It might be better to just use `java.time.Duration`? --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #244: Refactor and add samplers for CASSANDRA-14436
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/244#discussion_r207697287 --- Diff: src/java/org/apache/cassandra/db/ReadExecutionController.java --- @@ -132,6 +147,17 @@ public void close() { if (baseOp != null) baseOp.close(); + +if (startTime != -1) +{ +String cql = command.toCQLString(); +int time = (int) Math.min(TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - startTime), Integer.MAX_VALUE); --- End diff -- `timeMillis` or `millis` for brevity? --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #244: Refactor and add samplers for CASSANDRA-14436
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/244#discussion_r207697244 --- Diff: src/java/org/apache/cassandra/db/ReadExecutionController.java --- @@ -113,6 +123,11 @@ static ReadExecutionController forCommand(ReadCommand command) throw e; } } +if (baseCfs.metric.topLocalReadQueryTime.isEnabled()) +{ +result.startTime = System.nanoTime(); --- End diff -- I prefer naming primitive variables with units for example - `startTimeNanos`. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #244: Refactor and add samplers for CASSANDRA-14436
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/244#discussion_r207697297 --- Diff: src/java/org/apache/cassandra/db/ReadExecutionController.java --- @@ -132,6 +147,17 @@ public void close() { if (baseOp != null) baseOp.close(); + +if (startTime != -1) +{ +String cql = command.toCQLString(); +int time = (int) Math.min(TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - startTime), Integer.MAX_VALUE); +ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(baseMetadata.id); +if(cfs != null) +{ --- End diff -- You can skip braces for single line if conditions. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra issue #239: [CASSANDRA-14556] Optimize Streaming
Github user dineshjoshi commented on the issue: https://github.com/apache/cassandra/pull/239 @iamaleksey made a few more changes - 1. Got rid of `IStreamWriter` 2. Ensured we're logging the configuration warning only once at start up iff zero copy streaming is enabled 3. Few stylistic changes --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #239: [CASSANDRA-14556] Optimize Streaming
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r205646170 --- Diff: src/java/org/apache/cassandra/db/streaming/ComponentManifest.java --- @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.streaming; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +import com.google.common.collect.Iterators; + +import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; + +public final class ComponentManifest implements Iterable +{ +private final LinkedHashMap components; + +public ComponentManifest(Map components) +{ +this.components = new LinkedHashMap<>(components); +} + +public long sizeOf(Component component) +{ +Long size = components.get(component); +if (size == null) +throw new IllegalArgumentException("Component " + component + " is not present in the manifest"); +return size; +} + +public long totalSize() +{ +long totalSize = 0; +for (Long size : components.values()) +totalSize += size; +return totalSize; +} + +public List components() +{ +return new ArrayList<>(components.keySet()); +} + +@Override +public boolean equals(Object o) +{ +if (this == o) +return true; + +if (!(o instanceof ComponentManifest)) +return false; + +ComponentManifest that = (ComponentManifest) o; +return components.equals(that.components); +} + +@Override +public int hashCode() +{ +return components.hashCode(); +} + +public static final IVersionedSerializer serializer = new IVersionedSerializer() +{ +public void serialize(ComponentManifest manifest, DataOutputPlus out, int version) throws IOException +{ +out.writeUnsignedVInt(manifest.components.size()); +for (Map.Entry entry : manifest.components.entrySet()) +{ +out.writeByte(entry.getKey().type.id); --- End diff -- Done. I'm just using `component.name`. I think this should be sufficient for this PR's scope. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #239: [CASSANDRA-14556] Optimize Streaming
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r205639791 --- Diff: src/java/org/apache/cassandra/db/streaming/CassandraBlockStreamReader.java --- @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.streaming; + +import java.io.File; +import java.io.IOException; +import java.util.Collection; + +import com.google.common.base.Throwables; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Directories; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.SSTableMultiWriter; +import org.apache.cassandra.io.sstable.format.big.BigTableBlockWriter; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.streaming.ProgressInfo; +import org.apache.cassandra.streaming.StreamReceiver; +import org.apache.cassandra.streaming.StreamSession; +import org.apache.cassandra.streaming.messages.StreamMessageHeader; + +import static java.lang.String.format; +import static org.apache.cassandra.utils.FBUtilities.prettyPrintMemory; + +/** + * CassandraBlockStreamReader reads SSTable off the wire and writes it to disk. + */ +public class CassandraBlockStreamReader implements IStreamReader +{ +private static final Logger logger = LoggerFactory.getLogger(CassandraBlockStreamReader.class); + +private final TableId tableId; +private final StreamSession session; +private final CassandraStreamHeader header; +private final int fileSequenceNumber; + +public CassandraBlockStreamReader(StreamMessageHeader messageHeader, CassandraStreamHeader streamHeader, StreamSession session) +{ +if (session.getPendingRepair() != null) +{ +// we should only ever be streaming pending repair sstables if the session has a pending repair id +if (!session.getPendingRepair().equals(messageHeader.pendingRepair)) +throw new IllegalStateException(format("Stream Session & SSTable (%s) pendingRepair UUID mismatch.", messageHeader.tableId)); +} + +this.header = streamHeader; +this.session = session; +this.tableId = messageHeader.tableId; +this.fileSequenceNumber = messageHeader.sequenceNumber; +} + +/** + * @param inputPlus where this reads data from + * @return SSTable transferred + * @throws IOException if reading the remote sstable fails. Will throw an RTE if local write fails. + */ +@SuppressWarnings("resource") // input needs to remain open, streams on top of it can't be closed +@Override +public SSTableMultiWriter read(DataInputPlus inputPlus) throws IOException +{ +ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(tableId); +if (cfs == null) +{ +// schema was dropped during streaming +throw new IOException("Table " + tableId + " was dropped during streaming"); +} + +ComponentManifest manifest = header.componentManifest; +long totalSize = manifest.totalSize(); + +logger.debug("[Stream #{}] Started receiving sstable #{} from {}, size = {}, table = {}", + session.planId(), + fileSequenceNumber, + session.peer, + prettyPrintMemory(totalSize), + cfs.metadata()); + +
[GitHub] cassandra pull request #239: [CASSANDRA-14556] Optimize Streaming
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r205532095 --- Diff: src/java/org/apache/cassandra/db/streaming/CassandraBlockStreamWriter.java --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.streaming; + +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.channels.FileChannel; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.util.DataOutputStreamPlus; +import org.apache.cassandra.net.async.ByteBufDataOutputStreamPlus; +import org.apache.cassandra.streaming.ProgressInfo; +import org.apache.cassandra.streaming.StreamManager; +import org.apache.cassandra.streaming.StreamSession; + +import static org.apache.cassandra.streaming.StreamManager.StreamRateLimiter; +import static org.apache.cassandra.utils.FBUtilities.prettyPrintMemory; + +/** + * CassandraBlockStreamWriter streams the entire SSTable to given channel. + */ +public class CassandraBlockStreamWriter implements IStreamWriter +{ +private static final Logger logger = LoggerFactory.getLogger(CassandraBlockStreamWriter.class); + +private final SSTableReader sstable; +private final ComponentManifest manifest; +private final StreamSession session; +private final StreamRateLimiter limiter; + +public CassandraBlockStreamWriter(SSTableReader sstable, StreamSession session, ComponentManifest manifest) +{ +this.session = session; +this.sstable = sstable; +this.manifest = manifest; +this.limiter = StreamManager.getRateLimiter(session.peer); +} + +/** + * Stream the entire file to given channel. + * + * + * @param output where this writes data to + * @throws IOException on any I/O error + */ +@Override +public void write(DataOutputStreamPlus output) throws IOException +{ +long totalSize = manifest.totalSize(); +logger.debug("[Stream #{}] Start streaming sstable {} to {}, repairedAt = {}, totalSize = {}", + session.planId(), + sstable.getFilename(), + session.peer, + sstable.getSSTableMetadata().repairedAt, + prettyPrintMemory(totalSize)); + +long progress = 0L; +ByteBufDataOutputStreamPlus byteBufDataOutputStreamPlus = (ByteBufDataOutputStreamPlus) output; + +for (Component component : manifest.components()) +{ +@SuppressWarnings("resource") // this is closed after the file is transferred by ByteBufDataOutputStreamPlus +FileChannel in = new RandomAccessFile(sstable.descriptor.filenameFor(component), "r").getChannel(); + +// Total Length to transmit for this file +long length = in.size(); + +// tracks write progress +logger.debug("[Stream #{}] Block streaming {}.{} gen {} component {} size {}", session.planId(), + sstable.getKeyspaceName(), + sstable.getColumnFamilyName(), + sstable.descriptor.generation, + component, length); --- End diff -- Fixed. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #239: [CASSANDRA-14556] Optimize Streaming
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r205532049 --- Diff: test/unit/org/apache/cassandra/db/streaming/CassandraStreamHeaderTest.java --- @@ -43,8 +51,38 @@ public void serializerTest() new ArrayList<>(), ((CompressionMetadata) null), 0, - SerializationHeader.makeWithoutStats(metadata).toComponent()); + SerializationHeader.makeWithoutStats(metadata).toComponent(), + metadata.id); SerializationUtils.assertSerializationCycle(header, CassandraStreamHeader.serializer); } + +@Test +public void serializerTest_FullSSTableTransfer() +{ +String ddl = "CREATE TABLE tbl (k INT PRIMARY KEY, v INT)"; +TableMetadata metadata = CreateTableStatement.parse(ddl, "ks").build(); + +ComponentManifest manifest = new ComponentManifest(new HashMap(ImmutableMap.of(Component.DATA, 100L))); --- End diff -- Fixed. Not sure why I did this in the first place. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #239: [CASSANDRA-14556] Optimize Streaming
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r205532001 --- Diff: src/java/org/apache/cassandra/db/streaming/CassandraStreamHeader.java --- @@ -183,9 +261,26 @@ public CassandraStreamHeader deserialize(DataInputPlus in, int version) throws I sections.add(new SSTableReader.PartitionPositionBounds(in.readLong(), in.readLong())); CompressionInfo compressionInfo = CompressionInfo.serializer.deserialize(in, version); int sstableLevel = in.readInt(); + SerializationHeader.Component header = SerializationHeader.serializer.deserialize(sstableVersion, in); -return new CassandraStreamHeader(sstableVersion, format, estimatedKeys, sections, compressionInfo, sstableLevel, header); +TableId tableId = TableId.deserialize(in); +boolean fullStream = in.readBoolean(); +ComponentManifest manifest = null; +DecoratedKey firstKey = null; + +if (fullStream) +{ +manifest = ComponentManifest.serializer.deserialize(in, version); +ByteBuffer keyBuf = ByteBufferUtil.readWithVIntLength(in); +IPartitioner partitioner = partitionerMapper.apply(tableId); +if (partitioner == null) +throw new IllegalArgumentException(String.format("Could not determine partitioner for tableId {}", tableId)); --- End diff -- Fixed. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra issue #239: [CASSANDRA-14556] Optimize Streaming
Github user dineshjoshi commented on the issue: https://github.com/apache/cassandra/pull/239 @iamaleksey I've addressed your comments including the one about disabling faster streaming for legacy counter shards. I did add a much less expensive check for STCS. It won't get all SSTables accurately but it is way cheaper than what I have for LCS. Let me know your thoughts. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #239: [CASSANDRA-14556] Optimize Streaming
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r205339231 --- Diff: src/java/org/apache/cassandra/db/streaming/CassandraBlockStreamWriter.java --- @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.streaming; + +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.channels.FileChannel; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.util.DataOutputStreamPlus; +import org.apache.cassandra.net.async.ByteBufDataOutputStreamPlus; +import org.apache.cassandra.streaming.ProgressInfo; +import org.apache.cassandra.streaming.StreamManager; +import org.apache.cassandra.streaming.StreamSession; +import org.apache.cassandra.utils.FBUtilities; + +import static org.apache.cassandra.streaming.StreamManager.StreamRateLimiter; + +/** + * CassandraBlockStreamWriter streams the entire SSTable to given channel. + */ +public class CassandraBlockStreamWriter implements IStreamWriter +{ +private static final Logger logger = LoggerFactory.getLogger(CassandraBlockStreamWriter.class); + +private final SSTableReader sstable; +private final ComponentManifest manifest; +private final StreamSession session; +private final StreamRateLimiter limiter; + +public CassandraBlockStreamWriter(SSTableReader sstable, StreamSession session, ComponentManifest manifest) +{ +this.session = session; +this.sstable = sstable; +this.manifest = manifest; +this.limiter = StreamManager.getRateLimiter(session.peer); +} + +/** + * Stream the entire file to given channel. + * + * + * @param output where this writes data to + * @throws IOException on any I/O error + */ +@Override +public void write(DataOutputStreamPlus output) throws IOException +{ +long totalSize = manifest.getTotalSize(); +logger.debug("[Stream #{}] Start streaming sstable {} to {}, repairedAt = {}, totalSize = {}", session.planId(), + sstable.getFilename(), session.peer, sstable.getSSTableMetadata().repairedAt, totalSize); + +long progress = 0L; +ByteBufDataOutputStreamPlus byteBufDataOutputStreamPlus = (ByteBufDataOutputStreamPlus) output; + +for (Component component : manifest.getComponents()) +{ +@SuppressWarnings("resource") // this is closed after the file is transferred by ByteBufDataOutputStreamPlus +FileChannel in = new RandomAccessFile(sstable.descriptor.filenameFor(component), "r").getChannel(); + +// Total Length to transmit for this file +long length = in.size(); + +// tracks write progress +long bytesRead = 0; --- End diff -- I did rename this. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #239: [CASSANDRA-14556] Optimize Streaming
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r205299016 --- Diff: src/java/org/apache/cassandra/db/streaming/CassandraBlockStreamWriter.java --- @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.streaming; + +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.channels.FileChannel; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.util.DataOutputStreamPlus; +import org.apache.cassandra.net.async.ByteBufDataOutputStreamPlus; +import org.apache.cassandra.streaming.ProgressInfo; +import org.apache.cassandra.streaming.StreamManager; +import org.apache.cassandra.streaming.StreamSession; +import org.apache.cassandra.utils.FBUtilities; + +import static org.apache.cassandra.streaming.StreamManager.StreamRateLimiter; + +/** + * CassandraBlockStreamWriter streams the entire SSTable to given channel. + */ +public class CassandraBlockStreamWriter implements IStreamWriter +{ +private static final Logger logger = LoggerFactory.getLogger(CassandraBlockStreamWriter.class); + +private final SSTableReader sstable; +private final ComponentManifest manifest; +private final StreamSession session; +private final StreamRateLimiter limiter; + +public CassandraBlockStreamWriter(SSTableReader sstable, StreamSession session, ComponentManifest manifest) +{ +this.session = session; +this.sstable = sstable; +this.manifest = manifest; +this.limiter = StreamManager.getRateLimiter(session.peer); +} + +/** + * Stream the entire file to given channel. + * + * + * @param output where this writes data to + * @throws IOException on any I/O error + */ +@Override +public void write(DataOutputStreamPlus output) throws IOException +{ +long totalSize = manifest.getTotalSize(); +logger.debug("[Stream #{}] Start streaming sstable {} to {}, repairedAt = {}, totalSize = {}", session.planId(), + sstable.getFilename(), session.peer, sstable.getSSTableMetadata().repairedAt, totalSize); + +long progress = 0L; +ByteBufDataOutputStreamPlus byteBufDataOutputStreamPlus = (ByteBufDataOutputStreamPlus) output; + +for (Component component : manifest.getComponents()) +{ +@SuppressWarnings("resource") // this is closed after the file is transferred by ByteBufDataOutputStreamPlus +FileChannel in = new RandomAccessFile(sstable.descriptor.filenameFor(component), "r").getChannel(); + +// Total Length to transmit for this file +long length = in.size(); + +// tracks write progress +long bytesRead = 0; +logger.debug("[Stream #{}] Block streaming {}.{} gen {} component {} size {}", session.planId(), +sstable.getKeyspaceName(), sstable.getColumnFamilyName(), sstable.descriptor.generation, component, length); + +bytesRead += byteBufDataOutputStreamPlus.writeToChannel(in, limiter); --- End diff -- Yes, this *should* be the same. I avoided making that assumption and instead use what `writeToChannel()` returns. It is also used to update the `session.progress()`. Ideally, `bytesRead == length`, else we have some bug. I did rewrite this to make it concise. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #239: [CASSANDRA-14556] Optimize Streaming
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r205298316 --- Diff: src/java/org/apache/cassandra/db/streaming/CassandraBlockStreamWriter.java --- @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.streaming; + +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.channels.FileChannel; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.util.DataOutputStreamPlus; +import org.apache.cassandra.net.async.ByteBufDataOutputStreamPlus; +import org.apache.cassandra.streaming.ProgressInfo; +import org.apache.cassandra.streaming.StreamManager; +import org.apache.cassandra.streaming.StreamSession; +import org.apache.cassandra.utils.FBUtilities; + +import static org.apache.cassandra.streaming.StreamManager.StreamRateLimiter; + +/** + * CassandraBlockStreamWriter streams the entire SSTable to given channel. + */ +public class CassandraBlockStreamWriter implements IStreamWriter +{ +private static final Logger logger = LoggerFactory.getLogger(CassandraBlockStreamWriter.class); + +private final SSTableReader sstable; +private final ComponentManifest manifest; +private final StreamSession session; +private final StreamRateLimiter limiter; + +public CassandraBlockStreamWriter(SSTableReader sstable, StreamSession session, ComponentManifest manifest) +{ +this.session = session; +this.sstable = sstable; +this.manifest = manifest; +this.limiter = StreamManager.getRateLimiter(session.peer); +} + +/** + * Stream the entire file to given channel. + * + * + * @param output where this writes data to + * @throws IOException on any I/O error + */ +@Override +public void write(DataOutputStreamPlus output) throws IOException +{ +long totalSize = manifest.getTotalSize(); +logger.debug("[Stream #{}] Start streaming sstable {} to {}, repairedAt = {}, totalSize = {}", session.planId(), + sstable.getFilename(), session.peer, sstable.getSSTableMetadata().repairedAt, totalSize); --- End diff -- Fixed. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #239: [CASSANDRA-14556] Optimize Streaming
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r205298028 --- Diff: src/java/org/apache/cassandra/db/streaming/CassandraBlockStreamWriter.java --- @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.streaming; + +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.channels.FileChannel; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.util.DataOutputStreamPlus; +import org.apache.cassandra.net.async.ByteBufDataOutputStreamPlus; +import org.apache.cassandra.streaming.ProgressInfo; +import org.apache.cassandra.streaming.StreamManager; +import org.apache.cassandra.streaming.StreamSession; +import org.apache.cassandra.utils.FBUtilities; + +import static org.apache.cassandra.streaming.StreamManager.StreamRateLimiter; + +/** + * CassandraBlockStreamWriter streams the entire SSTable to given channel. + */ +public class CassandraBlockStreamWriter implements IStreamWriter +{ +private static final Logger logger = LoggerFactory.getLogger(CassandraBlockStreamWriter.class); + +private final SSTableReader sstable; +private final ComponentManifest manifest; +private final StreamSession session; +private final StreamRateLimiter limiter; + +public CassandraBlockStreamWriter(SSTableReader sstable, StreamSession session, ComponentManifest manifest) +{ +this.session = session; +this.sstable = sstable; +this.manifest = manifest; +this.limiter = StreamManager.getRateLimiter(session.peer); +} + +/** + * Stream the entire file to given channel. + * + * + * @param output where this writes data to + * @throws IOException on any I/O error + */ +@Override +public void write(DataOutputStreamPlus output) throws IOException +{ +long totalSize = manifest.getTotalSize(); +logger.debug("[Stream #{}] Start streaming sstable {} to {}, repairedAt = {}, totalSize = {}", session.planId(), + sstable.getFilename(), session.peer, sstable.getSSTableMetadata().repairedAt, totalSize); + +long progress = 0L; +ByteBufDataOutputStreamPlus byteBufDataOutputStreamPlus = (ByteBufDataOutputStreamPlus) output; + +for (Component component : manifest.getComponents()) +{ +@SuppressWarnings("resource") // this is closed after the file is transferred by ByteBufDataOutputStreamPlus +FileChannel in = new RandomAccessFile(sstable.descriptor.filenameFor(component), "r").getChannel(); + +// Total Length to transmit for this file +long length = in.size(); + +// tracks write progress +long bytesRead = 0; +logger.debug("[Stream #{}] Block streaming {}.{} gen {} component {} size {}", session.planId(), +sstable.getKeyspaceName(), sstable.getColumnFamilyName(), sstable.descriptor.generation, component, length); + +bytesRead += byteBufDataOutputStreamPlus.writeToChannel(in, limiter); +progress += bytesRead; + +session.progress(sstable.descriptor.filenameFor(component), ProgressInfo.Direction.OUT, bytesRead, + length); + +logger.debug("[Stream #{}] Finished block streaming {}.{} gen {} component {} to {}, xfered = {}, length = {}, totalSize = {}", + session.planId(), sstable.getKeyspaceName(), sstable.getColumnFamilyName(),
[GitHub] cassandra pull request #239: [CASSANDRA-14556] Optimize Streaming
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r205297956 --- Diff: src/java/org/apache/cassandra/db/streaming/CassandraBlockStreamWriter.java --- @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.streaming; + +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.channels.FileChannel; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.util.DataOutputStreamPlus; +import org.apache.cassandra.net.async.ByteBufDataOutputStreamPlus; +import org.apache.cassandra.streaming.ProgressInfo; +import org.apache.cassandra.streaming.StreamManager; +import org.apache.cassandra.streaming.StreamSession; +import org.apache.cassandra.utils.FBUtilities; + +import static org.apache.cassandra.streaming.StreamManager.StreamRateLimiter; + +/** + * CassandraBlockStreamWriter streams the entire SSTable to given channel. + */ +public class CassandraBlockStreamWriter implements IStreamWriter +{ +private static final Logger logger = LoggerFactory.getLogger(CassandraBlockStreamWriter.class); + +private final SSTableReader sstable; +private final ComponentManifest manifest; +private final StreamSession session; +private final StreamRateLimiter limiter; + +public CassandraBlockStreamWriter(SSTableReader sstable, StreamSession session, ComponentManifest manifest) +{ +this.session = session; +this.sstable = sstable; +this.manifest = manifest; +this.limiter = StreamManager.getRateLimiter(session.peer); +} + +/** + * Stream the entire file to given channel. + * + * + * @param output where this writes data to + * @throws IOException on any I/O error + */ +@Override +public void write(DataOutputStreamPlus output) throws IOException +{ +long totalSize = manifest.getTotalSize(); +logger.debug("[Stream #{}] Start streaming sstable {} to {}, repairedAt = {}, totalSize = {}", session.planId(), + sstable.getFilename(), session.peer, sstable.getSSTableMetadata().repairedAt, totalSize); + +long progress = 0L; +ByteBufDataOutputStreamPlus byteBufDataOutputStreamPlus = (ByteBufDataOutputStreamPlus) output; + +for (Component component : manifest.getComponents()) +{ +@SuppressWarnings("resource") // this is closed after the file is transferred by ByteBufDataOutputStreamPlus +FileChannel in = new RandomAccessFile(sstable.descriptor.filenameFor(component), "r").getChannel(); + +// Total Length to transmit for this file +long length = in.size(); + +// tracks write progress +long bytesRead = 0; +logger.debug("[Stream #{}] Block streaming {}.{} gen {} component {} size {}", session.planId(), +sstable.getKeyspaceName(), sstable.getColumnFamilyName(), sstable.descriptor.generation, component, length); + +bytesRead += byteBufDataOutputStreamPlus.writeToChannel(in, limiter); +progress += bytesRead; + +session.progress(sstable.descriptor.filenameFor(component), ProgressInfo.Direction.OUT, bytesRead, + length); + +logger.debug("[Stream #{}] Finished block streaming {}.{} gen {} component {} to {}, xfered = {}, length = {}, totalSize = {}", + session.planId(), sstable.getKeyspaceName(), sstable.getColumnFamilyName(),
[GitHub] cassandra pull request #239: [CASSANDRA-14556] Optimize Streaming
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r205297558 --- Diff: src/java/org/apache/cassandra/db/streaming/ComponentManifest.java --- @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.streaming; + +import java.io.IOException; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; + +public class ComponentManifest +{ +private final LinkedHashMap manifest; +private final Set components = new LinkedHashSet<>(Component.Type.values().length); +private final long totalSize; + +public ComponentManifest(Map componentManifest) +{ +this.manifest = new LinkedHashMap<>(componentManifest); + +long size = 0; +for (Map.Entry entry : this.manifest.entrySet()) +{ +size += entry.getValue(); +this.components.add(Component.parse(entry.getKey().repr)); +} + +this.totalSize = size; +} + +public Long getSizeForType(Component.Type type) +{ +return manifest.get(type); +} + +public long getTotalSize() +{ +return totalSize; +} + +public Set getComponents() +{ +return Collections.unmodifiableSet(components); +} + +public boolean equals(Object o) +{ +if (this == o) return true; +if (o == null || getClass() != o.getClass()) return false; +ComponentManifest that = (ComponentManifest) o; +return totalSize == that.totalSize && + Objects.equals(manifest, that.manifest); +} + +public int hashCode() +{ + +return Objects.hash(manifest, totalSize); +} + +public static final IVersionedSerializer serializer = new IVersionedSerializer() +{ +public void serialize(ComponentManifest manifest, DataOutputPlus out, int version) throws IOException +{ +out.writeInt(manifest.manifest.size()); +for (Map.Entry entry : manifest.manifest.entrySet()) +serialize(entry.getKey(), entry.getValue(), out); +} + +public ComponentManifest deserialize(DataInputPlus in, int version) throws IOException +{ +LinkedHashMap components = new LinkedHashMap<>(Component.Type.values().length); + +int size = in.readInt(); +assert size >= 0 : "Invalid number of components"; + +for (int i = 0; i < size; i++) +{ +Component.Type type = Component.Type.fromRepresentation(in.readByte()); +long length = in.readLong(); +components.put(type, length); +} + +return new ComponentManifest(components); +} + +public long serializedSize(ComponentManifest manifest, int version) +{ +long size = 0; +size += TypeSizes.sizeof(manifest.manifest.size()); +for (Map.Entry entry : manifest.manifest.entrySet()) +{ +size += TypeSizes.sizeof(entry.getKey().id); +size += TypeSizes.sizeof(entry.getValue()); +} +return size; +} + +private void serialize(Component.Type type, long size, DataOutputPlus out) th
[GitHub] cassandra pull request #239: [CASSANDRA-14556] Optimize Streaming
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r205297151 --- Diff: src/java/org/apache/cassandra/db/streaming/ComponentManifest.java --- @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.streaming; + +import java.io.IOException; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; + +public class ComponentManifest +{ +private final LinkedHashMap manifest; --- End diff -- I had originally thought of going in this direction. I have rewritten this class and incorporated your suggestion including most stylistic changes. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #239: [CASSANDRA-14556] Optimize Streaming
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r205297184 --- Diff: src/java/org/apache/cassandra/db/streaming/ComponentManifest.java --- @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.streaming; + +import java.io.IOException; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; + +public class ComponentManifest +{ +private final LinkedHashMap manifest; +private final Set components = new LinkedHashSet<>(Component.Type.values().length); --- End diff -- Done. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #239: [CASSANDRA-14556] Optimize Streaming
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r205283093 --- Diff: src/java/org/apache/cassandra/io/sstable/format/big/BigTableBlockWriter.java --- @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io.sstable.format.big; + +import java.io.File; +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.io.FSWriteError; +import org.apache.cassandra.io.compress.BufferType; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.SSTable; +import org.apache.cassandra.io.sstable.SSTableMultiWriter; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.SequentialWriter; +import org.apache.cassandra.io.util.SequentialWriterOption; +import org.apache.cassandra.net.async.RebufferingByteBufDataInputPlus; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.schema.TableMetadataRef; + +public class BigTableBlockWriter extends SSTable implements SSTableMultiWriter +{ +private final TableMetadataRef metadata; +private final LifecycleTransaction txn; +private volatile SSTableReader finalReader; +private final Map componentWriters; + +private final Logger logger = LoggerFactory.getLogger(BigTableBlockWriter.class); + +private final SequentialWriterOption writerOption = SequentialWriterOption.newBuilder() + .trickleFsync(false) + .bufferSize(2 * 1024 * 1024) + .bufferType(BufferType.OFF_HEAP) + .build(); +public static final ImmutableSet supportedComponents = ImmutableSet.of(Component.DATA, Component.PRIMARY_INDEX, Component.STATS, + Component.COMPRESSION_INFO, Component.FILTER, Component.SUMMARY, + Component.DIGEST, Component.CRC); + +public BigTableBlockWriter(Descriptor descriptor, + TableMetadataRef metadata, + LifecycleTransaction txn, + final Set components) +{ +super(descriptor, ImmutableSet.copyOf(components), metadata, + DatabaseDescriptor.getDiskOptimizationStrategy()); +txn.trackNew(this); +this.metadata = metadata; +this.txn = txn; +this.componentWriters = new HashMap<>(components.size()); + +assert supportedComponents.containsAll(components) : String.format("Unsupported streaming component detected %s", + new HashSet(components).removeAll(supportedComponents)); + +for (Component c : components) +componentWriters.put(c.type, makeWriter(descriptor, c, writerOption)); +} + +private static SequentialWriter makeWrit
[GitHub] cassandra pull request #239: [CASSANDRA-14556] Optimize Streaming
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r205282807 --- Diff: src/java/org/apache/cassandra/io/sstable/format/big/BigTableBlockWriter.java --- @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io.sstable.format.big; + +import java.io.File; +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.io.FSWriteError; +import org.apache.cassandra.io.compress.BufferType; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.SSTable; +import org.apache.cassandra.io.sstable.SSTableMultiWriter; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.SequentialWriter; +import org.apache.cassandra.io.util.SequentialWriterOption; +import org.apache.cassandra.net.async.RebufferingByteBufDataInputPlus; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.schema.TableMetadataRef; + +public class BigTableBlockWriter extends SSTable implements SSTableMultiWriter +{ +private final TableMetadataRef metadata; +private final LifecycleTransaction txn; +private volatile SSTableReader finalReader; +private final Map componentWriters; + +private final Logger logger = LoggerFactory.getLogger(BigTableBlockWriter.class); + +private final SequentialWriterOption writerOption = SequentialWriterOption.newBuilder() + .trickleFsync(false) + .bufferSize(2 * 1024 * 1024) + .bufferType(BufferType.OFF_HEAP) + .build(); +public static final ImmutableSet supportedComponents = ImmutableSet.of(Component.DATA, Component.PRIMARY_INDEX, Component.STATS, + Component.COMPRESSION_INFO, Component.FILTER, Component.SUMMARY, + Component.DIGEST, Component.CRC); + +public BigTableBlockWriter(Descriptor descriptor, + TableMetadataRef metadata, + LifecycleTransaction txn, + final Set components) +{ +super(descriptor, ImmutableSet.copyOf(components), metadata, + DatabaseDescriptor.getDiskOptimizationStrategy()); +txn.trackNew(this); +this.metadata = metadata; +this.txn = txn; +this.componentWriters = new HashMap<>(components.size()); + +assert supportedComponents.containsAll(components) : String.format("Unsupported streaming component detected %s", + new HashSet(components).removeAll(supportedComponents)); --- End diff -- Fixed. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #239: [CASSANDRA-14556] Optimize Streaming
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r205282784 --- Diff: src/java/org/apache/cassandra/io/sstable/format/big/BigTableBlockWriter.java --- @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io.sstable.format.big; + +import java.io.File; +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.io.FSWriteError; +import org.apache.cassandra.io.compress.BufferType; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.SSTable; +import org.apache.cassandra.io.sstable.SSTableMultiWriter; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.SequentialWriter; +import org.apache.cassandra.io.util.SequentialWriterOption; +import org.apache.cassandra.net.async.RebufferingByteBufDataInputPlus; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.schema.TableMetadataRef; + +public class BigTableBlockWriter extends SSTable implements SSTableMultiWriter +{ +private final TableMetadataRef metadata; +private final LifecycleTransaction txn; +private volatile SSTableReader finalReader; +private final Map componentWriters; + +private final Logger logger = LoggerFactory.getLogger(BigTableBlockWriter.class); + +private final SequentialWriterOption writerOption = SequentialWriterOption.newBuilder() + .trickleFsync(false) + .bufferSize(2 * 1024 * 1024) + .bufferType(BufferType.OFF_HEAP) + .build(); +public static final ImmutableSet supportedComponents = ImmutableSet.of(Component.DATA, Component.PRIMARY_INDEX, Component.STATS, + Component.COMPRESSION_INFO, Component.FILTER, Component.SUMMARY, + Component.DIGEST, Component.CRC); + +public BigTableBlockWriter(Descriptor descriptor, + TableMetadataRef metadata, + LifecycleTransaction txn, + final Set components) +{ +super(descriptor, ImmutableSet.copyOf(components), metadata, + DatabaseDescriptor.getDiskOptimizationStrategy()); +txn.trackNew(this); +this.metadata = metadata; +this.txn = txn; --- End diff -- Removed. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #239: [CASSANDRA-14556] Optimize Streaming
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r205277660 --- Diff: src/java/org/apache/cassandra/db/streaming/CassandraBlockStreamReader.java --- @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.streaming; + +import java.io.File; +import java.io.IOException; +import java.util.Set; + +import com.google.common.base.Throwables; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.Directories; +import org.apache.cassandra.db.SerializationHeader; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.SSTableMultiWriter; +import org.apache.cassandra.io.sstable.format.SSTableFormat; +import org.apache.cassandra.io.sstable.format.Version; +import org.apache.cassandra.io.sstable.format.big.BigTableBlockWriter; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.streaming.ProgressInfo; +import org.apache.cassandra.streaming.StreamReceiver; +import org.apache.cassandra.streaming.StreamSession; +import org.apache.cassandra.streaming.messages.StreamMessageHeader; +import org.apache.cassandra.utils.FBUtilities; + +/** + * CassandraBlockStreamReader reads SSTable off the wire and writes it to disk. + */ +public class CassandraBlockStreamReader implements IStreamReader +{ +private static final Logger logger = LoggerFactory.getLogger(CassandraBlockStreamReader.class); +private final TableId tableId; +private final StreamSession session; +private final int sstableLevel; +private final SerializationHeader.Component header; +private final int fileSeqNum; +private final ComponentManifest manifest; +private final SSTableFormat.Type format; +private final Version version; +private final DecoratedKey firstKey; + +public CassandraBlockStreamReader(StreamMessageHeader header, CassandraStreamHeader streamHeader, StreamSession session) +{ +if (session.getPendingRepair() != null) +{ +// we should only ever be streaming pending repair +// sstables if the session has a pending repair id +if (!session.getPendingRepair().equals(header.pendingRepair)) +throw new IllegalStateException(String.format("Stream Session & SSTable ({}) pendingRepair UUID mismatch.", + header.tableId)); +} +this.session = session; +this.tableId = header.tableId; +this.manifest = streamHeader.componentManifest; +this.sstableLevel = streamHeader.sstableLevel; +this.header = streamHeader.header; +this.format = streamHeader.format; +this.fileSeqNum = header.sequenceNumber; +this.version = streamHeader.version; +this.firstKey = streamHeader.firstKey; +} + +/** + * @param inputPlus where this reads data from + * @return SSTable transferred + * @throws IOException if reading the remote sstable fails. Will throw an RTE if local write fails. + */ +@SuppressWarnings("resource") // input needs to remain open, streams on top of it can't be closed +@Override +public SSTableMultiWriter read(DataInputPlus inputPlus) throws IOException +{ +long totalSize = manifest.getTotalSize(); + +ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(tableId); + +if (cfs == nul
[GitHub] cassandra pull request #239: [CASSANDRA-14556] Optimize Streaming
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r205277545 --- Diff: src/java/org/apache/cassandra/db/streaming/CassandraBlockStreamReader.java --- @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.streaming; + +import java.io.File; +import java.io.IOException; +import java.util.Set; + +import com.google.common.base.Throwables; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.Directories; +import org.apache.cassandra.db.SerializationHeader; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.SSTableMultiWriter; +import org.apache.cassandra.io.sstable.format.SSTableFormat; +import org.apache.cassandra.io.sstable.format.Version; +import org.apache.cassandra.io.sstable.format.big.BigTableBlockWriter; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.streaming.ProgressInfo; +import org.apache.cassandra.streaming.StreamReceiver; +import org.apache.cassandra.streaming.StreamSession; +import org.apache.cassandra.streaming.messages.StreamMessageHeader; +import org.apache.cassandra.utils.FBUtilities; + +/** + * CassandraBlockStreamReader reads SSTable off the wire and writes it to disk. + */ +public class CassandraBlockStreamReader implements IStreamReader +{ +private static final Logger logger = LoggerFactory.getLogger(CassandraBlockStreamReader.class); +private final TableId tableId; +private final StreamSession session; +private final int sstableLevel; +private final SerializationHeader.Component header; --- End diff -- Cherry picked. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #239: [CASSANDRA-14556] Optimize Streaming
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r205277492 --- Diff: src/java/org/apache/cassandra/db/streaming/CassandraBlockStreamReader.java --- @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.streaming; + +import java.io.File; +import java.io.IOException; +import java.util.Set; + +import com.google.common.base.Throwables; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.Directories; +import org.apache.cassandra.db.SerializationHeader; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.SSTableMultiWriter; +import org.apache.cassandra.io.sstable.format.SSTableFormat; +import org.apache.cassandra.io.sstable.format.Version; +import org.apache.cassandra.io.sstable.format.big.BigTableBlockWriter; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.streaming.ProgressInfo; +import org.apache.cassandra.streaming.StreamReceiver; +import org.apache.cassandra.streaming.StreamSession; +import org.apache.cassandra.streaming.messages.StreamMessageHeader; +import org.apache.cassandra.utils.FBUtilities; + +/** + * CassandraBlockStreamReader reads SSTable off the wire and writes it to disk. + */ +public class CassandraBlockStreamReader implements IStreamReader +{ +private static final Logger logger = LoggerFactory.getLogger(CassandraBlockStreamReader.class); +private final TableId tableId; +private final StreamSession session; +private final int sstableLevel; +private final SerializationHeader.Component header; +private final int fileSeqNum; +private final ComponentManifest manifest; +private final SSTableFormat.Type format; +private final Version version; +private final DecoratedKey firstKey; + +public CassandraBlockStreamReader(StreamMessageHeader header, CassandraStreamHeader streamHeader, StreamSession session) +{ +if (session.getPendingRepair() != null) +{ +// we should only ever be streaming pending repair +// sstables if the session has a pending repair id +if (!session.getPendingRepair().equals(header.pendingRepair)) +throw new IllegalStateException(String.format("Stream Session & SSTable ({}) pendingRepair UUID mismatch.", + header.tableId)); +} +this.session = session; +this.tableId = header.tableId; +this.manifest = streamHeader.componentManifest; +this.sstableLevel = streamHeader.sstableLevel; +this.header = streamHeader.header; +this.format = streamHeader.format; +this.fileSeqNum = header.sequenceNumber; +this.version = streamHeader.version; +this.firstKey = streamHeader.firstKey; +} + +/** + * @param inputPlus where this reads data from + * @return SSTable transferred + * @throws IOException if reading the remote sstable fails. Will throw an RTE if local write fails. + */ +@SuppressWarnings("resource") // input needs to remain open, streams on top of it can't be closed +@Override +public SSTableMultiWriter read(DataInputPlus inputPlus) throws IOException +{ +long totalSize = manifest.getTotalSize(); + +ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(tableId); + +if (cfs == nul
[GitHub] cassandra pull request #239: [CASSANDRA-14556] Optimize Streaming
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r205276519 --- Diff: src/java/org/apache/cassandra/db/streaming/CassandraBlockStreamReader.java --- @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.streaming; + +import java.io.File; +import java.io.IOException; +import java.util.Set; + +import com.google.common.base.Throwables; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.Directories; +import org.apache.cassandra.db.SerializationHeader; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.SSTableMultiWriter; +import org.apache.cassandra.io.sstable.format.SSTableFormat; +import org.apache.cassandra.io.sstable.format.Version; +import org.apache.cassandra.io.sstable.format.big.BigTableBlockWriter; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.streaming.ProgressInfo; +import org.apache.cassandra.streaming.StreamReceiver; +import org.apache.cassandra.streaming.StreamSession; +import org.apache.cassandra.streaming.messages.StreamMessageHeader; +import org.apache.cassandra.utils.FBUtilities; + +/** + * CassandraBlockStreamReader reads SSTable off the wire and writes it to disk. + */ +public class CassandraBlockStreamReader implements IStreamReader +{ +private static final Logger logger = LoggerFactory.getLogger(CassandraBlockStreamReader.class); +private final TableId tableId; +private final StreamSession session; +private final int sstableLevel; +private final SerializationHeader.Component header; +private final int fileSeqNum; +private final ComponentManifest manifest; +private final SSTableFormat.Type format; +private final Version version; +private final DecoratedKey firstKey; + +public CassandraBlockStreamReader(StreamMessageHeader header, CassandraStreamHeader streamHeader, StreamSession session) +{ +if (session.getPendingRepair() != null) +{ +// we should only ever be streaming pending repair +// sstables if the session has a pending repair id +if (!session.getPendingRepair().equals(header.pendingRepair)) +throw new IllegalStateException(String.format("Stream Session & SSTable ({}) pendingRepair UUID mismatch.", --- End diff -- Great catch. Yes, it used to be a logging call. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #239: [CASSANDRA-14556] Optimize Streaming
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r205276269 --- Diff: src/java/org/apache/cassandra/io/sstable/format/big/BigTableBlockWriter.java --- @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io.sstable.format.big; + +import java.io.File; +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.io.FSWriteError; +import org.apache.cassandra.io.compress.BufferType; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.SSTable; +import org.apache.cassandra.io.sstable.SSTableMultiWriter; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.SequentialWriter; +import org.apache.cassandra.io.util.SequentialWriterOption; +import org.apache.cassandra.net.async.RebufferingByteBufDataInputPlus; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.schema.TableMetadataRef; + +public class BigTableBlockWriter extends SSTable implements SSTableMultiWriter +{ +private final TableMetadataRef metadata; +private final LifecycleTransaction txn; +private volatile SSTableReader finalReader; +private final Map componentWriters; + +private final Logger logger = LoggerFactory.getLogger(BigTableBlockWriter.class); + +private final SequentialWriterOption writerOption = SequentialWriterOption.newBuilder() + .trickleFsync(false) + .bufferSize(2 * 1024 * 1024) + .bufferType(BufferType.OFF_HEAP) + .build(); +public static final ImmutableSet supportedComponents = ImmutableSet.of(Component.DATA, Component.PRIMARY_INDEX, Component.STATS, + Component.COMPRESSION_INFO, Component.FILTER, Component.SUMMARY, + Component.DIGEST, Component.CRC); + +public BigTableBlockWriter(Descriptor descriptor, + TableMetadataRef metadata, + LifecycleTransaction txn, + final Set components) +{ +super(descriptor, ImmutableSet.copyOf(components), metadata, + DatabaseDescriptor.getDiskOptimizationStrategy()); +txn.trackNew(this); +this.metadata = metadata; +this.txn = txn; +this.componentWriters = new HashMap<>(components.size()); + +assert supportedComponents.containsAll(components) : String.format("Unsupported streaming component detected %s", + new HashSet(components).removeAll(supportedComponents)); + +for (Component c : components) +componentWriters.put(c.type, makeWriter(descriptor, c, writerOption)); +} + +private static SequentialWriter makeWrit
[GitHub] cassandra pull request #239: [CASSANDRA-14556] Optimize Streaming
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r205276122 --- Diff: src/java/org/apache/cassandra/db/streaming/CassandraBlockStreamReader.java --- @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.streaming; + +import java.io.File; +import java.io.IOException; +import java.util.Set; + +import com.google.common.base.Throwables; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.Directories; +import org.apache.cassandra.db.SerializationHeader; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.SSTableMultiWriter; +import org.apache.cassandra.io.sstable.format.SSTableFormat; +import org.apache.cassandra.io.sstable.format.Version; +import org.apache.cassandra.io.sstable.format.big.BigTableBlockWriter; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.streaming.ProgressInfo; +import org.apache.cassandra.streaming.StreamReceiver; +import org.apache.cassandra.streaming.StreamSession; +import org.apache.cassandra.streaming.messages.StreamMessageHeader; +import org.apache.cassandra.utils.FBUtilities; + +/** + * CassandraBlockStreamReader reads SSTable off the wire and writes it to disk. + */ +public class CassandraBlockStreamReader implements IStreamReader +{ +private static final Logger logger = LoggerFactory.getLogger(CassandraBlockStreamReader.class); +private final TableId tableId; +private final StreamSession session; +private final int sstableLevel; +private final SerializationHeader.Component header; +private final int fileSeqNum; +private final ComponentManifest manifest; +private final SSTableFormat.Type format; +private final Version version; +private final DecoratedKey firstKey; + +public CassandraBlockStreamReader(StreamMessageHeader header, CassandraStreamHeader streamHeader, StreamSession session) +{ +if (session.getPendingRepair() != null) +{ +// we should only ever be streaming pending repair +// sstables if the session has a pending repair id +if (!session.getPendingRepair().equals(header.pendingRepair)) +throw new IllegalStateException(String.format("Stream Session & SSTable ({}) pendingRepair UUID mismatch.", + header.tableId)); +} +this.session = session; +this.tableId = header.tableId; +this.manifest = streamHeader.componentManifest; +this.sstableLevel = streamHeader.sstableLevel; +this.header = streamHeader.header; +this.format = streamHeader.format; +this.fileSeqNum = header.sequenceNumber; +this.version = streamHeader.version; +this.firstKey = streamHeader.firstKey; +} + +/** + * @param inputPlus where this reads data from + * @return SSTable transferred + * @throws IOException if reading the remote sstable fails. Will throw an RTE if local write fails. + */ +@SuppressWarnings("resource") // input needs to remain open, streams on top of it can't be closed +@Override +public SSTableMultiWriter read(DataInputPlus inputPlus) throws IOException +{ +long totalSize = manifest.getTotalSize(); + +ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(tableId); + +if (cfs == nul
[GitHub] cassandra pull request #239: [CASSANDRA-14556] Optimize Streaming
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r205275130 --- Diff: src/java/org/apache/cassandra/db/compaction/Verifier.java --- @@ -361,12 +361,26 @@ public RangeOwnHelper(List> normalizedRanges) * @throws RuntimeException if the key is not contained */ public void check(DecoratedKey key) +{ +if (!checkBoolean(key)) --- End diff -- I am cherry picking this change. The only concern I have is we're now changing the signature and behavior of a pre-existing method. I generally try to avoid such changes to avoid confusing other authors. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra issue #239: [CASSANDRA-14556] Optimize Streaming
Github user dineshjoshi commented on the issue: https://github.com/apache/cassandra/pull/239 @iamaleksey I *think* I've resolved all your comments. The last dtest was all green. I'm not sure about the latest push but I don't forsee any breakages. Please let me know if there are more changes required. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #239: [CASSANDRA-14556] Optimize Streaming
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r204998639 --- Diff: src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java --- @@ -62,7 +90,23 @@ public CassandraOutgoingFile(StreamOperation operation, Ref ref, sections, sstable.compression ? sstable.getCompressionMetadata() : null, keepSSTableLevel ? sstable.getSSTableLevel() : 0, - sstable.header.toComponent()); + sstable.header.toComponent(), manifest, shouldStreamFullSSTable(), +sstable.first, +sstable.metadata().id); +} + +@VisibleForTesting +public static ComponentManifest getComponentManifest(SSTableReader sstable) +{ +LinkedHashMap components = new LinkedHashMap<>(STREAM_COMPONENTS.size()); +for (Component component : STREAM_COMPONENTS) --- End diff -- @iamaleksey would it be worth having a list of required components here? Any one of the missing components will be a fatal? For example, when I reset the level on the receiving side, I expect stats to exist and is sent over. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #239: [CASSANDRA-14556] Optimize Streaming
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r204955316 --- Diff: src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java --- @@ -114,13 +155,51 @@ public void write(StreamSession session, DataOutputStreamPlus out, int version) CassandraStreamHeader.serializer.serialize(header, out, version); out.flush(); -CassandraStreamWriter writer = header.compressionInfo == null ? - new CassandraStreamWriter(sstable, header.sections, session) : - new CompressedCassandraStreamWriter(sstable, header.sections, - header.compressionInfo, session); +IStreamWriter writer; +if (shouldStreamFullSSTable()) +{ +writer = new CassandraBlockStreamWriter(sstable, session, components); +} +else +{ +writer = (header.compressionInfo == null) ? + new CassandraStreamWriter(sstable, header.sections, session) : + new CompressedCassandraStreamWriter(sstable, header.sections, + header.compressionInfo, session); +} writer.write(out); } +@VisibleForTesting +public boolean shouldStreamFullSSTable() +{ +return isFullSSTableTransfersEnabled && isFullyContained; +} + +@VisibleForTesting +public boolean fullyContainedIn(List> normalizedRanges, SSTableReader sstable) +{ +if (normalizedRanges == null) +return false; + +RangeOwnHelper rangeOwnHelper = new RangeOwnHelper(normalizedRanges); +try (KeyIterator iter = new KeyIterator(sstable.descriptor, sstable.metadata())) +{ +while (iter.hasNext()) +{ +DecoratedKey key = iter.next(); +try +{ +rangeOwnHelper.check(key); +} catch(RuntimeException e) --- End diff -- I will disable Zero Copy streaming for STCS and I've created [CASSANDRA-14586](https://issues.apache.org/jira/browse/CASSANDRA-14586) to further discuss fixes. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #239: [CASSANDRA-14556] Optimize Streaming
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r204955090 --- Diff: src/java/org/apache/cassandra/db/streaming/ComponentInfo.java --- @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.streaming; + +import java.io.IOException; + +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; + +public class ComponentInfo --- End diff -- I have refactored this to `ComponentManifest` --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #239: [CASSANDRA-14556] Optimize Streaming
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r204955043 --- Diff: src/java/org/apache/cassandra/db/lifecycle/LogFile.java --- @@ -66,7 +66,7 @@ private final LogReplicaSet replicas = new LogReplicaSet(); // The transaction records, this set must be ORDER PRESERVING -private final LinkedHashSet records = new LinkedHashSet<>(); +private final Set records = Collections.synchronizedSet(new LinkedHashSet<>()); // TODO: Hack until we fix CASSANDRA-14554 --- End diff -- I should've worded the comment differently but it is a hack to avoid a `ConcurrentModificationException` ([CASSANDRA-14554](https://issues.apache.org/jira/browse/CASSANDRA-14554)). This currently exists in trunk and was not introduced as part of this PR hence the separate ticket. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #239: [CASSANDRA-14556] Optimize Streaming
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r204954805 --- Diff: src/java/org/apache/cassandra/db/streaming/CassandraBlockStreamReader.java --- @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.streaming; + +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.Set; + +import com.google.common.base.Throwables; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.Directories; +import org.apache.cassandra.db.SerializationHeader; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.SSTableMultiWriter; +import org.apache.cassandra.io.sstable.format.SSTableFormat; +import org.apache.cassandra.io.sstable.format.Version; +import org.apache.cassandra.io.sstable.format.big.BigTableBlockWriter; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.streaming.ProgressInfo; +import org.apache.cassandra.streaming.StreamReceiver; +import org.apache.cassandra.streaming.StreamSession; +import org.apache.cassandra.streaming.messages.StreamMessageHeader; +import org.apache.cassandra.utils.Collectors3; +import org.apache.cassandra.utils.FBUtilities; + +/** + * CassandraBlockStreamReader reads SSTable off the wire and writes it to disk. + */ +public class CassandraBlockStreamReader implements IStreamReader +{ +private static final Logger logger = LoggerFactory.getLogger(CassandraBlockStreamReader.class); +protected final TableId tableId; +protected final StreamSession session; +protected final int sstableLevel; --- End diff -- Cleaned up the protected fields. I still need to reset the level. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #239: [CASSANDRA-14556] Optimize Streaming
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r204954749 --- Diff: src/java/org/apache/cassandra/config/DatabaseDescriptor.java --- @@ -2260,6 +2260,20 @@ public static int getStreamingConnectionsPerHost() return conf.streaming_connections_per_host; } +public static boolean isFullSSTableTransfersEnabled() +{ +if (conf.server_encryption_options.enabled || conf.server_encryption_options.optional) +{ +logger.debug("Internode encryption enabled. Disabling zero copy SSTable transfers for streaming."); --- End diff -- Done --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #239: [CASSANDRA-14556] Optimize Streaming
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/239#discussion_r204954729 --- Diff: src/java/org/apache/cassandra/db/streaming/ComponentInfo.java --- @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.streaming; + +import java.io.IOException; + +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; + +public class ComponentInfo +{ +final Component.Type type; +final long length; + +public ComponentInfo(Component.Type type, long length) +{ +assert length >= 0 : "Component length cannot be negative"; +this.type = type; +this.length = length; +} + +@Override +public String toString() +{ +return "ComponentInfo{" + + "type=" + type + + ", length=" + length + + '}'; +} + +public boolean equals(Object o) --- End diff -- `CassandraStreamHeader`'s `equals()` and `hashCode()` method are not dead code. They're used in `CassandraStreamHeaderTest::serializerTest` which eventually ends up in [`SerializationUtils::assertSerializationCycle`](https://github.com/apache/cassandra/blob/9714a7c817b64a3358f69e536535c756c5c6df48/test/unit/org/apache/cassandra/serializers/SerializationUtils.java#L60). This is what originally forced me to implement those methods. The only other choice was to test the equality for `ComponentInfo` in `CassandraStreamHeader::equal()` which would have been a bad choice. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra issue #239: [CASSANDRA-14556] Optimize Streaming
Github user dineshjoshi commented on the issue: https://github.com/apache/cassandra/pull/239 @iamaleksey I have resolved most of your comments. I still have a couple to go. I will update this PR when I am done with those and get a clean dtest run. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org