[GitHub] cassandra pull request #281: Sep worker shutdown
Github user ifesdjeen closed the pull request at: https://github.com/apache/cassandra/pull/281 --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #175: Fix: Page width exceeding the viewport width
Github user asfgit closed the pull request at: https://github.com/apache/cassandra/pull/175 --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #:
Github user michaelsembwever commented on the pull request: https://github.com/apache/cassandra/commit/f30a7dab7f2871fd4b88352f4b17129c02425484#commitcomment-31847255 In doc/source/operating/read_repair.rst: In doc/source/operating/read_repair.rst on line 24: "fatest replica" ? that's not objectively true. maybe just drop the "fastest"? --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #:
Github user michaelsembwever commented on the pull request: https://github.com/apache/cassandra/commit/f30a7dab7f2871fd4b88352f4b17129c02425484#commitcomment-31847253 In doc/source/operating/read_repair.rst: In doc/source/operating/read_repair.rst on line 26: Is this accurate? While read repairs initially ask just those replicas it got digests from (hence matching the consistency level of the request) for full data responses, but it can request additional replicas (a la speculative execution)? --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #:
Github user michaelsembwever commented on the pull request: https://github.com/apache/cassandra/commit/f30a7dab7f2871fd4b88352f4b17129c02425484#commitcomment-31847204 In doc/source/operating/hints.rst: In doc/source/operating/hints.rst on line 36: I would drop all mention of `ANY`. It's not a consistency level that the C* community wants to announce anymore, afaik. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #:
Github user michaelsembwever commented on the pull request: https://github.com/apache/cassandra/commit/f30a7dab7f2871fd4b88352f4b17129c02425484#commitcomment-31847200 In doc/source/operating/hints.rst: In doc/source/operating/hints.rst on line 22: This sentence could be more exact. Hints don't "ensure consistency", per se. Rather they reduce inconsistencies at rest (on disk). As explained below, hints take time to reply so nothing is immediate or guaranteed. But in practice it minimises inconsistencies on disk when nodes are not down for longer than the max hint window. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #297: Update URLs for gitbox.apache.org migration
GitHub user mshuler opened a pull request: https://github.com/apache/cassandra/pull/297 Update URLs for gitbox.apache.org migration You can merge this pull request into a Git repository by running: $ git pull https://github.com/mshuler/cassandra git-wip2gitbox Alternatively you can review and apply these changes as the patch at: https://github.com/apache/cassandra/pull/297.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #297 commit 9fd6659633f87b32fde2704346ac156806d19a67 Author: Michael Shuler Date: 2019-01-04T20:50:29Z Update URLs for gitbox.apache.org migration --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #296: Missing 'hppc' transitive dependency in 'cassan...
GitHub user jojczykp opened a pull request: https://github.com/apache/cassandra/pull/296 Missing 'hppc' transitive dependency in 'cassandra-all' When I run: `$ ant mvn-install` I get, as expected, cassandra-all-4.0-SNAPSHOT generated and installed in my local repo. Unfortunately, to make use of it as a dependency in my gradle project, I also have to add `com.carrotsearch:hppc:0.5.4` as a dependency in that project. As hppc is required by cassandra, I'd rather expected it to be automatically resolved as a dependency, and appear in my project without any extra effort (same way as it works for all other cassandra deps, like joda-time, sigar, etc.) This PR makes it working :) You can merge this pull request into a Git repository by running: $ git pull https://github.com/jojczykp/cassandra missing-hppc-transitive-dependency-in-produced-pom Alternatively you can review and apply these changes as the patch at: https://github.com/apache/cassandra/pull/296.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #296 commit 57f8fa1ff77aebedc397b33f06f83ba1e6c4690b Author: jojczykp Date: 2018-12-20T22:23:37Z Missing 'hppc' transitive dependency in 'cassandra-all' generated with 'maven-install' --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #292: provide ZSTD compression support to cassandra (...
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/292#discussion_r241232030 --- Diff: src/java/org/apache/cassandra/io/compress/ZSTDCompressor.java --- @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io.compress; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import com.google.common.annotations.VisibleForTesting; +import com.github.luben.zstd.Zstd; + +public class ZSTDCompressor implements ICompressor +{ +public static final int FAST_COMPRESSION = 1; // fastest compression time +public static final int DEFAULT_LEVEL = 3; +public static final int BEST_COMPRESSION = 22;// very good compression ratio +private static final ZSTDCompressor instance = new ZSTDCompressor(); +private static final String COMPRESSION_LEVEL_OPTION_NAME = "compression_level"; +@VisibleForTesting +protected static final int compressionLevel = DEFAULT_LEVEL; + +public static ZSTDCompressor create(Map compressionOptions) +{ + validateCompressionLevel(parseCompressionLevelOption(compressionOptions)); +return instance; --- End diff -- From my brief reading of the JNI Bindings it seems that zstd-jni uses `ZSTD_compress` and not `ZSTD_compressCCtx`. The latter allows you to pass in a context which would've been more efficient. For now, I think it would be ok to simply pass in the compression level. The only value of keeping multiple compressor objects around is to retain the `CompressionParams`. It would be useful to have a caching factory to avoid creating multiple objects with the same compression params. As far as thread safety goes, the JNI Code looks thread safe specifically compress and decompress methods. As pointed out earlier they create and destroy the Compression Context on each invocation which is memory unfriendly. So, although the zstd benchmarks may look great, I am not so sure about this JNI binding. We should definitely add a JMH Benchmark for zstd. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #292: provide ZSTD compression support to cassandra (...
Github user dineshjoshi commented on a diff in the pull request: https://github.com/apache/cassandra/pull/292#discussion_r241224459 --- Diff: build.xml --- @@ -412,6 +412,7 @@ + --- End diff -- The latest version is `1.3.7`. Let's upgrade to it. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #292: provide ZSTD compression support to cassandra (...
Github user sushmaad commented on a diff in the pull request: https://github.com/apache/cassandra/pull/292#discussion_r240890567 --- Diff: src/java/org/apache/cassandra/io/compress/ZSTDCompressor.java --- @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io.compress; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import com.google.common.annotations.VisibleForTesting; +import com.github.luben.zstd.Zstd; + +public class ZSTDCompressor implements ICompressor +{ +public static final int FAST_COMPRESSION = 1; // fastest compression time +public static final int DEFAULT_LEVEL = 3; +public static final int BEST_COMPRESSION = 22;// very good compression ratio +private static final ZSTDCompressor instance = new ZSTDCompressor(); +private static final String COMPRESSION_LEVEL_OPTION_NAME = "compression_level"; +@VisibleForTesting +protected static final int compressionLevel = DEFAULT_LEVEL; + +public static ZSTDCompressor create(Map compressionOptions) +{ + validateCompressionLevel(parseCompressionLevelOption(compressionOptions)); +return instance; +} + +private static void validateCompressionLevel(int compressionLevel) +{ +if (compressionLevel < FAST_COMPRESSION || compressionLevel > BEST_COMPRESSION) --- End diff -- Zstd accepts integer levels from negative infinity up to 22. Levels 1 to 22 are normal levels, ranging from fastest at level 1 to highest compression ratio at level 22. 0 selects the default compression level (which is level 3). Negative levels are a new addition, and are even faster than level 1, getting faster the more negative you go. Definitely negative compression levels can be supported here. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #292: provide ZSTD compression support to cassandra (...
Github user sushmaad commented on a diff in the pull request: https://github.com/apache/cassandra/pull/292#discussion_r240889512 --- Diff: src/java/org/apache/cassandra/io/compress/ZSTDCompressor.java --- @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io.compress; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import com.google.common.annotations.VisibleForTesting; +import com.github.luben.zstd.Zstd; + +public class ZSTDCompressor implements ICompressor +{ +public static final int FAST_COMPRESSION = 1; // fastest compression time +public static final int DEFAULT_LEVEL = 3; +public static final int BEST_COMPRESSION = 22;// very good compression ratio +private static final ZSTDCompressor instance = new ZSTDCompressor(); +private static final String COMPRESSION_LEVEL_OPTION_NAME = "compression_level"; +@VisibleForTesting +protected static final int compressionLevel = DEFAULT_LEVEL; + +public static ZSTDCompressor create(Map compressionOptions) +{ + validateCompressionLevel(parseCompressionLevelOption(compressionOptions)); +return instance; --- End diff -- @jolynch @alexott indeed compression_level is currently always set to default, I will refactor this a bit so that each CF will have CompressionParams with dedicated compressor by removing the static reference. Seems to me that there is a tradeoff between having static reference with statefulness based on compression_level versus just creating multiple compressor with each CompressionParams. Having individual compressor might increase the number of compressor objects floating around but we have the flexibility of directly using compression_level without any complicated pre-determined conditions. let me know what you think. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #292: provide ZSTD compression support to cassandra (...
Github user sushmaad commented on a diff in the pull request: https://github.com/apache/cassandra/pull/292#discussion_r240885996 --- Diff: src/java/org/apache/cassandra/io/compress/ZSTDCompressor.java --- @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io.compress; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import com.google.common.annotations.VisibleForTesting; +import com.github.luben.zstd.Zstd; + +public class ZSTDCompressor implements ICompressor +{ +public static final int FAST_COMPRESSION = 1; // fastest compression time +public static final int DEFAULT_LEVEL = 3; +public static final int BEST_COMPRESSION = 22;// very good compression ratio +private static final ZSTDCompressor instance = new ZSTDCompressor(); +private static final String COMPRESSION_LEVEL_OPTION_NAME = "compression_level"; +@VisibleForTesting +protected static final int compressionLevel = DEFAULT_LEVEL; + +public static ZSTDCompressor create(Map compressionOptions) +{ + validateCompressionLevel(parseCompressionLevelOption(compressionOptions)); +return instance; +} + +private static void validateCompressionLevel(int compressionLevel) +{ +if (compressionLevel < FAST_COMPRESSION || compressionLevel > BEST_COMPRESSION) +{ +throw new IllegalArgumentException( +"ZSTD compression_level " + Integer.toString(compressionLevel) + " invalid ", +null +); +} +} + +private static int parseCompressionLevelOption(Map compressionOptions) +{ +return Integer.parseInt(compressionOptions.getOrDefault(COMPRESSION_LEVEL_OPTION_NAME, + Integer.toString(DEFAULT_LEVEL))); +} + +@Override +public int initialCompressedBufferLength(int chunkLength) +{ +return (int)Zstd.compressBound(chunkLength); +} + +@Override +public int uncompress(byte[] input, int inputOffset, int inputLength, byte[] output, int outputOffset) +throws IOException +{ +long decompressSize = Zstd.decompressByteArray(output, + outputOffset, + output.length - outputOffset, + input, + inputOffset, + inputLength); +if (Zstd.isError(decompressSize)) +{ +throw new IOException("ZSTD uncompress failed with error reason " + Zstd.getErrorName(decompressSize)); +} + +return (int) decompressSize; +} + +@Override +public void uncompress(ByteBuffer input, ByteBuffer output) throws IOException +{ +Zstd.decompress(output, input); +} + +@Override +public void compress(ByteBuffer input, ByteBuffer output) throws IOException --- End diff -- Yes will do that, that seems to simplify the code here --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #295: add missing commands to nodetool-completion (CA...
GitHub user carlo4002 opened a pull request: https://github.com/apache/cassandra/pull/295 add missing commands to nodetool-completion (CASSANDRA-14916) You can merge this pull request into a Git repository by running: $ git pull https://github.com/carlo4002/cassandra trunk Alternatively you can review and apply these changes as the patch at: https://github.com/apache/cassandra/pull/295.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #295 commit 6f4c22a6af19421d113432837b5f16b6db830439 Author: jean carlo rivera ura Date: 2018-12-11T08:40:14Z add missing commands to nodetool-completion (CASSANDRA-14916) --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #292: provide ZSTD compression support to cassandra (...
Github user jolynch commented on a diff in the pull request: https://github.com/apache/cassandra/pull/292#discussion_r240495812 --- Diff: src/java/org/apache/cassandra/io/compress/ZSTDCompressor.java --- @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io.compress; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import com.google.common.annotations.VisibleForTesting; +import com.github.luben.zstd.Zstd; + +public class ZSTDCompressor implements ICompressor +{ +public static final int FAST_COMPRESSION = 1; // fastest compression time +public static final int DEFAULT_LEVEL = 3; +public static final int BEST_COMPRESSION = 22;// very good compression ratio +private static final ZSTDCompressor instance = new ZSTDCompressor(); +private static final String COMPRESSION_LEVEL_OPTION_NAME = "compression_level"; +@VisibleForTesting +protected static final int compressionLevel = DEFAULT_LEVEL; + +public static ZSTDCompressor create(Map compressionOptions) +{ + validateCompressionLevel(parseCompressionLevelOption(compressionOptions)); +return instance; +} + +private static void validateCompressionLevel(int compressionLevel) +{ +if (compressionLevel < FAST_COMPRESSION || compressionLevel > BEST_COMPRESSION) +{ +throw new IllegalArgumentException( +"ZSTD compression_level " + Integer.toString(compressionLevel) + " invalid ", +null +); +} +} + +private static int parseCompressionLevelOption(Map compressionOptions) +{ +return Integer.parseInt(compressionOptions.getOrDefault(COMPRESSION_LEVEL_OPTION_NAME, + Integer.toString(DEFAULT_LEVEL))); +} + +@Override +public int initialCompressedBufferLength(int chunkLength) +{ +return (int)Zstd.compressBound(chunkLength); +} + +@Override +public int uncompress(byte[] input, int inputOffset, int inputLength, byte[] output, int outputOffset) +throws IOException +{ +long decompressSize = Zstd.decompressByteArray(output, + outputOffset, + output.length - outputOffset, + input, + inputOffset, + inputLength); +if (Zstd.isError(decompressSize)) +{ +throw new IOException("ZSTD uncompress failed with error reason " + Zstd.getErrorName(decompressSize)); +} + +return (int) decompressSize; +} + +@Override +public void uncompress(ByteBuffer input, ByteBuffer output) throws IOException +{ +Zstd.decompress(output, input); +} + +@Override +public void compress(ByteBuffer input, ByteBuffer output) throws IOException --- End diff -- Indeed, but I think you can simplify this implementation by using the built in `Zstd.compress` method I linked about which afaict does what the code here does. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #292: provide ZSTD compression support to cassandra (...
Github user sushmaad commented on a diff in the pull request: https://github.com/apache/cassandra/pull/292#discussion_r240494855 --- Diff: src/java/org/apache/cassandra/io/compress/ZSTDCompressor.java --- @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io.compress; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import com.google.common.annotations.VisibleForTesting; +import com.github.luben.zstd.Zstd; + +public class ZSTDCompressor implements ICompressor +{ +public static final int FAST_COMPRESSION = 1; // fastest compression time +public static final int DEFAULT_LEVEL = 3; +public static final int BEST_COMPRESSION = 22;// very good compression ratio +private static final ZSTDCompressor instance = new ZSTDCompressor(); +private static final String COMPRESSION_LEVEL_OPTION_NAME = "compression_level"; +@VisibleForTesting +protected static final int compressionLevel = DEFAULT_LEVEL; + +public static ZSTDCompressor create(Map compressionOptions) +{ + validateCompressionLevel(parseCompressionLevelOption(compressionOptions)); +return instance; +} + +private static void validateCompressionLevel(int compressionLevel) +{ +if (compressionLevel < FAST_COMPRESSION || compressionLevel > BEST_COMPRESSION) +{ +throw new IllegalArgumentException( +"ZSTD compression_level " + Integer.toString(compressionLevel) + " invalid ", +null +); +} +} + +private static int parseCompressionLevelOption(Map compressionOptions) +{ +return Integer.parseInt(compressionOptions.getOrDefault(COMPRESSION_LEVEL_OPTION_NAME, + Integer.toString(DEFAULT_LEVEL))); +} + +@Override +public int initialCompressedBufferLength(int chunkLength) +{ +return (int)Zstd.compressBound(chunkLength); +} + +@Override +public int uncompress(byte[] input, int inputOffset, int inputLength, byte[] output, int outputOffset) +throws IOException +{ +long decompressSize = Zstd.decompressByteArray(output, + outputOffset, + output.length - outputOffset, + input, + inputOffset, + inputLength); +if (Zstd.isError(decompressSize)) +{ +throw new IOException("ZSTD uncompress failed with error reason " + Zstd.getErrorName(decompressSize)); +} + +return (int) decompressSize; +} + +@Override +public void uncompress(ByteBuffer input, ByteBuffer output) throws IOException +{ +Zstd.decompress(output, input); +} + +@Override +public void compress(ByteBuffer input, ByteBuffer output) throws IOException --- End diff -- ICompressor Interface supports only compress(ByteBuffer input, ByteBuffer output) --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #292: provide ZSTD compression support to cassandra (...
Github user jolynch commented on a diff in the pull request: https://github.com/apache/cassandra/pull/292#discussion_r240344277 --- Diff: src/java/org/apache/cassandra/io/compress/ZSTDCompressor.java --- @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io.compress; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import com.google.common.annotations.VisibleForTesting; +import com.github.luben.zstd.Zstd; + +public class ZSTDCompressor implements ICompressor +{ +public static final int FAST_COMPRESSION = 1; // fastest compression time +public static final int DEFAULT_LEVEL = 3; +public static final int BEST_COMPRESSION = 22;// very good compression ratio +private static final ZSTDCompressor instance = new ZSTDCompressor(); +private static final String COMPRESSION_LEVEL_OPTION_NAME = "compression_level"; +@VisibleForTesting +protected static final int compressionLevel = DEFAULT_LEVEL; + +public static ZSTDCompressor create(Map compressionOptions) +{ + validateCompressionLevel(parseCompressionLevelOption(compressionOptions)); +return instance; --- End diff -- Sorry, just to clarify this is a Cassandra level thing, not a Zstd level thing. Cassandra calls the `create` method via reflection [here](https://github.com/apache/cassandra/blob/caf50de31b034ed77140b3c1597e7ca6ddc44e17/src/java/org/apache/cassandra/schema/CompressionParams.java#L288-L289), and since the `ICompressors` are "stateless" we can make a reasonable cache using a `ConcurrentHashMap` like [LZ4Compressor](https://github.com/apache/cassandra/blob/06209037ea56b5a2a49615a99f1542d6ea1b2947/src/java/org/apache/cassandra/io/compress/LZ4Compressor.java#L61) does keyed by parameter. This assumes of course that the `Zstd` apis are threadsafe, but I'm pretty confident they are. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #292: provide ZSTD compression support to cassandra (...
Github user jolynch commented on a diff in the pull request: https://github.com/apache/cassandra/pull/292#discussion_r240341941 --- Diff: src/java/org/apache/cassandra/io/compress/ZSTDCompressor.java --- @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io.compress; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import com.google.common.annotations.VisibleForTesting; +import com.github.luben.zstd.Zstd; + +public class ZSTDCompressor implements ICompressor +{ +public static final int FAST_COMPRESSION = 1; // fastest compression time +public static final int DEFAULT_LEVEL = 3; +public static final int BEST_COMPRESSION = 22;// very good compression ratio +private static final ZSTDCompressor instance = new ZSTDCompressor(); +private static final String COMPRESSION_LEVEL_OPTION_NAME = "compression_level"; +@VisibleForTesting +protected static final int compressionLevel = DEFAULT_LEVEL; + +public static ZSTDCompressor create(Map compressionOptions) +{ + validateCompressionLevel(parseCompressionLevelOption(compressionOptions)); +return instance; +} + +private static void validateCompressionLevel(int compressionLevel) +{ +if (compressionLevel < FAST_COMPRESSION || compressionLevel > BEST_COMPRESSION) +{ +throw new IllegalArgumentException( +"ZSTD compression_level " + Integer.toString(compressionLevel) + " invalid ", +null +); +} +} + +private static int parseCompressionLevelOption(Map compressionOptions) +{ +return Integer.parseInt(compressionOptions.getOrDefault(COMPRESSION_LEVEL_OPTION_NAME, + Integer.toString(DEFAULT_LEVEL))); +} + +@Override +public int initialCompressedBufferLength(int chunkLength) +{ +return (int)Zstd.compressBound(chunkLength); +} + +@Override +public int uncompress(byte[] input, int inputOffset, int inputLength, byte[] output, int outputOffset) +throws IOException +{ +long decompressSize = Zstd.decompressByteArray(output, + outputOffset, + output.length - outputOffset, + input, + inputOffset, + inputLength); +if (Zstd.isError(decompressSize)) +{ +throw new IOException("ZSTD uncompress failed with error reason " + Zstd.getErrorName(decompressSize)); +} + +return (int) decompressSize; +} + +@Override +public void uncompress(ByteBuffer input, ByteBuffer output) throws IOException +{ +Zstd.decompress(output, input); --- End diff -- My reading of the [`decompress(ByteBuffer dstBuffer, ByteBuffer srcBuf)`](https://github.com/luben/zstd-jni/blob/c642dc38eddd9142c55644034e6485444a25f052/src/main/java/com/github/luben/zstd/Zstd.java#L879) method used here is that it calls `Zstd.isError` and throws a `RuntimeException`. To be consistent with the `ICompressor` contract these methods should afaict throw `IOException` instead. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #283: CASSANDRA-14459: DynamicEndpointSnitch should n...
Github user aweisberg commented on a diff in the pull request: https://github.com/apache/cassandra/pull/283#discussion_r240295987 --- Diff: src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java --- @@ -21,133 +21,505 @@ import java.net.InetAddress; import java.net.UnknownHostException; import java.util.*; +import java.util.concurrent.CancellationException; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; -import com.codahale.metrics.ExponentiallyDecayingReservoir; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.RateLimiter; +import com.google.common.util.concurrent.SettableFuture; -import com.codahale.metrics.Snapshot; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor; import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.exceptions.RequestFailureReason; import org.apache.cassandra.gms.ApplicationState; import org.apache.cassandra.gms.EndpointState; import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.gms.VersionedValue; +import org.apache.cassandra.locator.dynamicsnitch.DynamicEndpointSnitchHistogram; +import org.apache.cassandra.net.IAsyncCallbackWithFailure; +import org.apache.cassandra.net.LatencyMeasurementType; +import org.apache.cassandra.net.MessageIn; +import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.net.PingMessage; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.MBeanWrapper; +import static org.apache.cassandra.net.MessagingService.Verb.PING; +import static org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType.LARGE_MESSAGE; +import static org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType.SMALL_MESSAGE; + + /** * A dynamic snitch that sorts endpoints by latency with an adapted phi failure detector + * Note that the subclasses (e.g. {@link DynamicEndpointSnitchHistogram}) are responsible for actually measuring + * latency and providing an ISnitchMeasurement implementation back to this class. */ -public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILatencySubscriber, DynamicEndpointSnitchMBean +public abstract class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILatencySubscriber, DynamicEndpointSnitchMBean { -private static final boolean USE_SEVERITY = !Boolean.getBoolean("cassandra.ignore_dynamic_snitch_severity"); +private static final Logger logger = LoggerFactory.getLogger(DynamicEndpointSnitch.class); + +// Latency measurement and ranking. The samples contain latency measurements and the scores are used for ranking +protected boolean registered = false; +protected static final boolean USE_SEVERITY = !Boolean.getBoolean("cassandra.ignore_dynamic_snitch_severity"); +protected volatile Map scores = new HashMap<>(); -private static final double ALPHA = 0.75; // set to 0.75 to make EDS more biased to towards the newer values -private static final int WINDOW_SIZE = 100; +protected final Map samples = new ConcurrentHashMap<>(); -private volatile int dynamicUpdateInterval = DatabaseDescriptor.getDynamicUpdateInterval(); -private volatile int dynamicResetInterval = DatabaseDescriptor.getDynamicResetInterval(); -private volatile double dynamicBadnessThreshold = DatabaseDescriptor.getDynamicBadnessThreshold(); +// Latency probe functionality for actively probing endpoints that we haven't measured recently but are ranking +public static final long MAX_PROBE_INTERVAL_MS = Long.getLong("cassandra.dynamic_snitch_max_probe_interval_ms", 60 * 10 * 1000L); +public static final long MIN_PROBE_INTERVAL_MS = Long.getLong("cassandra.dynamic_snitch_min_probe_interval_ms", 60 * 1000L) ; +// The probe rate is set later when configuration is read in applyConfigChanges +protected static final RateLimiter probeRateLimiter = RateLimiter.create(1); +protected static final DebuggableScheduledThreadPoolExecutor latencyProbe
[GitHub] cassandra pull request #283: CASSANDRA-14459: DynamicEndpointSnitch should n...
Github user aweisberg commented on a diff in the pull request: https://github.com/apache/cassandra/pull/283#discussion_r240308095 --- Diff: src/java/org/apache/cassandra/net/ResponseVerbHandler.java --- @@ -49,7 +49,7 @@ public void doVerb(MessageIn message, int id) else { //TODO: Should we add latency only in success cases? --- End diff -- Do you think we should remove this TODO while are here? --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #283: CASSANDRA-14459: DynamicEndpointSnitch should n...
Github user aweisberg commented on a diff in the pull request: https://github.com/apache/cassandra/pull/283#discussion_r239884316 --- Diff: src/java/org/apache/cassandra/net/MessagingService.java --- @@ -729,20 +730,26 @@ void markTimeout(InetAddressAndPort addr) /** * Track latency information for the dynamic snitch * - * @param cb the callback associated with this message -- this lets us know if it's a message type we're interested in - * @param address the host that replied to the message - * @param latency + * @param cbthe callback associated with this message -- this lets us know if it's a message type we're interested in + * @param address the host that replied to the message + * @param latencyMicros the number of microseconds to record for this host */ -public void maybeAddLatency(IAsyncCallback cb, InetAddressAndPort address, long latency) +public void maybeAddLatency(IAsyncCallback cb, InetAddressAndPort address, long latencyMicros) { -if (cb.isLatencyForSnitch()) -addLatency(address, latency); +if (cb.latencyMeasurementType() != LatencyMeasurementType.IGNORE) +addLatency(address, latencyMicros, cb.latencyMeasurementType()); } -public void addLatency(InetAddressAndPort address, long latency) +// Used on the local read path +public void addLatency(InetAddressAndPort address, long latencyMicros) +{ +addLatency(address, latencyMicros, LatencyMeasurementType.READ); +} + +private void addLatency(InetAddressAndPort address, long latencyMicros, LatencyMeasurementType usable) --- End diff -- Is it usable or is it a type? It's not a boolean anymore. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #283: CASSANDRA-14459: DynamicEndpointSnitch should n...
Github user aweisberg commented on a diff in the pull request: https://github.com/apache/cassandra/pull/283#discussion_r239900862 --- Diff: src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java --- @@ -21,133 +21,505 @@ import java.net.InetAddress; import java.net.UnknownHostException; import java.util.*; +import java.util.concurrent.CancellationException; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; -import com.codahale.metrics.ExponentiallyDecayingReservoir; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.RateLimiter; +import com.google.common.util.concurrent.SettableFuture; -import com.codahale.metrics.Snapshot; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor; import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.exceptions.RequestFailureReason; import org.apache.cassandra.gms.ApplicationState; import org.apache.cassandra.gms.EndpointState; import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.gms.VersionedValue; +import org.apache.cassandra.locator.dynamicsnitch.DynamicEndpointSnitchHistogram; +import org.apache.cassandra.net.IAsyncCallbackWithFailure; +import org.apache.cassandra.net.LatencyMeasurementType; +import org.apache.cassandra.net.MessageIn; +import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.net.PingMessage; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.MBeanWrapper; +import static org.apache.cassandra.net.MessagingService.Verb.PING; +import static org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType.LARGE_MESSAGE; +import static org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType.SMALL_MESSAGE; + + /** * A dynamic snitch that sorts endpoints by latency with an adapted phi failure detector + * Note that the subclasses (e.g. {@link DynamicEndpointSnitchHistogram}) are responsible for actually measuring + * latency and providing an ISnitchMeasurement implementation back to this class. */ -public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILatencySubscriber, DynamicEndpointSnitchMBean +public abstract class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILatencySubscriber, DynamicEndpointSnitchMBean { -private static final boolean USE_SEVERITY = !Boolean.getBoolean("cassandra.ignore_dynamic_snitch_severity"); +private static final Logger logger = LoggerFactory.getLogger(DynamicEndpointSnitch.class); + +// Latency measurement and ranking. The samples contain latency measurements and the scores are used for ranking +protected boolean registered = false; +protected static final boolean USE_SEVERITY = !Boolean.getBoolean("cassandra.ignore_dynamic_snitch_severity"); +protected volatile Map scores = new HashMap<>(); -private static final double ALPHA = 0.75; // set to 0.75 to make EDS more biased to towards the newer values -private static final int WINDOW_SIZE = 100; +protected final Map samples = new ConcurrentHashMap<>(); -private volatile int dynamicUpdateInterval = DatabaseDescriptor.getDynamicUpdateInterval(); -private volatile int dynamicResetInterval = DatabaseDescriptor.getDynamicResetInterval(); -private volatile double dynamicBadnessThreshold = DatabaseDescriptor.getDynamicBadnessThreshold(); +// Latency probe functionality for actively probing endpoints that we haven't measured recently but are ranking +public static final long MAX_PROBE_INTERVAL_MS = Long.getLong("cassandra.dynamic_snitch_max_probe_interval_ms", 60 * 10 * 1000L); +public static final long MIN_PROBE_INTERVAL_MS = Long.getLong("cassandra.dynamic_snitch_min_probe_interval_ms", 60 * 1000L) ; +// The probe rate is set later when configuration is read in applyConfigChanges +protected static final RateLimiter probeRateLimiter = RateLimiter.create(1); +protected static final DebuggableScheduledThreadPoolExecutor latencyProbe
[GitHub] cassandra pull request #283: CASSANDRA-14459: DynamicEndpointSnitch should n...
Github user aweisberg commented on a diff in the pull request: https://github.com/apache/cassandra/pull/283#discussion_r239906662 --- Diff: src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java --- @@ -21,133 +21,505 @@ import java.net.InetAddress; import java.net.UnknownHostException; import java.util.*; +import java.util.concurrent.CancellationException; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; -import com.codahale.metrics.ExponentiallyDecayingReservoir; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.RateLimiter; +import com.google.common.util.concurrent.SettableFuture; -import com.codahale.metrics.Snapshot; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor; import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.exceptions.RequestFailureReason; import org.apache.cassandra.gms.ApplicationState; import org.apache.cassandra.gms.EndpointState; import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.gms.VersionedValue; +import org.apache.cassandra.locator.dynamicsnitch.DynamicEndpointSnitchHistogram; +import org.apache.cassandra.net.IAsyncCallbackWithFailure; +import org.apache.cassandra.net.LatencyMeasurementType; +import org.apache.cassandra.net.MessageIn; +import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.net.PingMessage; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.MBeanWrapper; +import static org.apache.cassandra.net.MessagingService.Verb.PING; +import static org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType.LARGE_MESSAGE; +import static org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType.SMALL_MESSAGE; + + /** * A dynamic snitch that sorts endpoints by latency with an adapted phi failure detector + * Note that the subclasses (e.g. {@link DynamicEndpointSnitchHistogram}) are responsible for actually measuring + * latency and providing an ISnitchMeasurement implementation back to this class. */ -public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILatencySubscriber, DynamicEndpointSnitchMBean +public abstract class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILatencySubscriber, DynamicEndpointSnitchMBean { -private static final boolean USE_SEVERITY = !Boolean.getBoolean("cassandra.ignore_dynamic_snitch_severity"); +private static final Logger logger = LoggerFactory.getLogger(DynamicEndpointSnitch.class); + +// Latency measurement and ranking. The samples contain latency measurements and the scores are used for ranking +protected boolean registered = false; +protected static final boolean USE_SEVERITY = !Boolean.getBoolean("cassandra.ignore_dynamic_snitch_severity"); +protected volatile Map scores = new HashMap<>(); -private static final double ALPHA = 0.75; // set to 0.75 to make EDS more biased to towards the newer values -private static final int WINDOW_SIZE = 100; +protected final Map samples = new ConcurrentHashMap<>(); -private volatile int dynamicUpdateInterval = DatabaseDescriptor.getDynamicUpdateInterval(); -private volatile int dynamicResetInterval = DatabaseDescriptor.getDynamicResetInterval(); -private volatile double dynamicBadnessThreshold = DatabaseDescriptor.getDynamicBadnessThreshold(); +// Latency probe functionality for actively probing endpoints that we haven't measured recently but are ranking +public static final long MAX_PROBE_INTERVAL_MS = Long.getLong("cassandra.dynamic_snitch_max_probe_interval_ms", 60 * 10 * 1000L); +public static final long MIN_PROBE_INTERVAL_MS = Long.getLong("cassandra.dynamic_snitch_min_probe_interval_ms", 60 * 1000L) ; +// The probe rate is set later when configuration is read in applyConfigChanges +protected static final RateLimiter probeRateLimiter = RateLimiter.create(1); +protected static final DebuggableScheduledThreadPoolExecutor latencyProbe
[GitHub] cassandra pull request #283: CASSANDRA-14459: DynamicEndpointSnitch should n...
Github user aweisberg commented on a diff in the pull request: https://github.com/apache/cassandra/pull/283#discussion_r239899040 --- Diff: src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java --- @@ -21,133 +21,505 @@ import java.net.InetAddress; import java.net.UnknownHostException; import java.util.*; +import java.util.concurrent.CancellationException; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; -import com.codahale.metrics.ExponentiallyDecayingReservoir; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.RateLimiter; +import com.google.common.util.concurrent.SettableFuture; -import com.codahale.metrics.Snapshot; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor; import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.exceptions.RequestFailureReason; import org.apache.cassandra.gms.ApplicationState; import org.apache.cassandra.gms.EndpointState; import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.gms.VersionedValue; +import org.apache.cassandra.locator.dynamicsnitch.DynamicEndpointSnitchHistogram; +import org.apache.cassandra.net.IAsyncCallbackWithFailure; +import org.apache.cassandra.net.LatencyMeasurementType; +import org.apache.cassandra.net.MessageIn; +import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.net.PingMessage; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.MBeanWrapper; +import static org.apache.cassandra.net.MessagingService.Verb.PING; +import static org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType.LARGE_MESSAGE; +import static org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType.SMALL_MESSAGE; + + /** * A dynamic snitch that sorts endpoints by latency with an adapted phi failure detector + * Note that the subclasses (e.g. {@link DynamicEndpointSnitchHistogram}) are responsible for actually measuring + * latency and providing an ISnitchMeasurement implementation back to this class. */ -public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILatencySubscriber, DynamicEndpointSnitchMBean +public abstract class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILatencySubscriber, DynamicEndpointSnitchMBean { -private static final boolean USE_SEVERITY = !Boolean.getBoolean("cassandra.ignore_dynamic_snitch_severity"); +private static final Logger logger = LoggerFactory.getLogger(DynamicEndpointSnitch.class); + +// Latency measurement and ranking. The samples contain latency measurements and the scores are used for ranking +protected boolean registered = false; +protected static final boolean USE_SEVERITY = !Boolean.getBoolean("cassandra.ignore_dynamic_snitch_severity"); +protected volatile Map scores = new HashMap<>(); -private static final double ALPHA = 0.75; // set to 0.75 to make EDS more biased to towards the newer values -private static final int WINDOW_SIZE = 100; +protected final Map samples = new ConcurrentHashMap<>(); -private volatile int dynamicUpdateInterval = DatabaseDescriptor.getDynamicUpdateInterval(); -private volatile int dynamicResetInterval = DatabaseDescriptor.getDynamicResetInterval(); -private volatile double dynamicBadnessThreshold = DatabaseDescriptor.getDynamicBadnessThreshold(); +// Latency probe functionality for actively probing endpoints that we haven't measured recently but are ranking +public static final long MAX_PROBE_INTERVAL_MS = Long.getLong("cassandra.dynamic_snitch_max_probe_interval_ms", 60 * 10 * 1000L); +public static final long MIN_PROBE_INTERVAL_MS = Long.getLong("cassandra.dynamic_snitch_min_probe_interval_ms", 60 * 1000L) ; +// The probe rate is set later when configuration is read in applyConfigChanges +protected static final RateLimiter probeRateLimiter = RateLimiter.create(1); +protected static final DebuggableScheduledThreadPoolExecutor latencyProbe
[GitHub] cassandra pull request #283: CASSANDRA-14459: DynamicEndpointSnitch should n...
Github user aweisberg commented on a diff in the pull request: https://github.com/apache/cassandra/pull/283#discussion_r240288561 --- Diff: src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java --- @@ -21,133 +21,505 @@ import java.net.InetAddress; import java.net.UnknownHostException; import java.util.*; +import java.util.concurrent.CancellationException; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; -import com.codahale.metrics.ExponentiallyDecayingReservoir; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.RateLimiter; +import com.google.common.util.concurrent.SettableFuture; -import com.codahale.metrics.Snapshot; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor; import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.exceptions.RequestFailureReason; import org.apache.cassandra.gms.ApplicationState; import org.apache.cassandra.gms.EndpointState; import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.gms.VersionedValue; +import org.apache.cassandra.locator.dynamicsnitch.DynamicEndpointSnitchHistogram; +import org.apache.cassandra.net.IAsyncCallbackWithFailure; +import org.apache.cassandra.net.LatencyMeasurementType; +import org.apache.cassandra.net.MessageIn; +import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.net.PingMessage; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.MBeanWrapper; +import static org.apache.cassandra.net.MessagingService.Verb.PING; +import static org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType.LARGE_MESSAGE; +import static org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType.SMALL_MESSAGE; + + /** * A dynamic snitch that sorts endpoints by latency with an adapted phi failure detector + * Note that the subclasses (e.g. {@link DynamicEndpointSnitchHistogram}) are responsible for actually measuring + * latency and providing an ISnitchMeasurement implementation back to this class. */ -public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILatencySubscriber, DynamicEndpointSnitchMBean +public abstract class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILatencySubscriber, DynamicEndpointSnitchMBean { -private static final boolean USE_SEVERITY = !Boolean.getBoolean("cassandra.ignore_dynamic_snitch_severity"); +private static final Logger logger = LoggerFactory.getLogger(DynamicEndpointSnitch.class); + +// Latency measurement and ranking. The samples contain latency measurements and the scores are used for ranking +protected boolean registered = false; +protected static final boolean USE_SEVERITY = !Boolean.getBoolean("cassandra.ignore_dynamic_snitch_severity"); +protected volatile Map scores = new HashMap<>(); -private static final double ALPHA = 0.75; // set to 0.75 to make EDS more biased to towards the newer values -private static final int WINDOW_SIZE = 100; +protected final Map samples = new ConcurrentHashMap<>(); -private volatile int dynamicUpdateInterval = DatabaseDescriptor.getDynamicUpdateInterval(); -private volatile int dynamicResetInterval = DatabaseDescriptor.getDynamicResetInterval(); -private volatile double dynamicBadnessThreshold = DatabaseDescriptor.getDynamicBadnessThreshold(); +// Latency probe functionality for actively probing endpoints that we haven't measured recently but are ranking +public static final long MAX_PROBE_INTERVAL_MS = Long.getLong("cassandra.dynamic_snitch_max_probe_interval_ms", 60 * 10 * 1000L); +public static final long MIN_PROBE_INTERVAL_MS = Long.getLong("cassandra.dynamic_snitch_min_probe_interval_ms", 60 * 1000L) ; +// The probe rate is set later when configuration is read in applyConfigChanges +protected static final RateLimiter probeRateLimiter = RateLimiter.create(1); +protected static final DebuggableScheduledThreadPoolExecutor latencyProbe
[GitHub] cassandra pull request #283: CASSANDRA-14459: DynamicEndpointSnitch should n...
Github user aweisberg commented on a diff in the pull request: https://github.com/apache/cassandra/pull/283#discussion_r240311669 --- Diff: test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java --- @@ -18,35 +18,99 @@ package org.apache.cassandra.locator; -import java.io.IOException; +import java.net.UnknownHostException; --- End diff -- My code coverage is a bit wonky today, but is sendPingMessage tested? Is it possible? --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #283: CASSANDRA-14459: DynamicEndpointSnitch should n...
Github user aweisberg commented on a diff in the pull request: https://github.com/apache/cassandra/pull/283#discussion_r239953888 --- Diff: src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java --- @@ -21,133 +21,505 @@ import java.net.InetAddress; import java.net.UnknownHostException; import java.util.*; +import java.util.concurrent.CancellationException; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; -import com.codahale.metrics.ExponentiallyDecayingReservoir; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.RateLimiter; +import com.google.common.util.concurrent.SettableFuture; -import com.codahale.metrics.Snapshot; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor; import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.exceptions.RequestFailureReason; import org.apache.cassandra.gms.ApplicationState; import org.apache.cassandra.gms.EndpointState; import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.gms.VersionedValue; +import org.apache.cassandra.locator.dynamicsnitch.DynamicEndpointSnitchHistogram; +import org.apache.cassandra.net.IAsyncCallbackWithFailure; +import org.apache.cassandra.net.LatencyMeasurementType; +import org.apache.cassandra.net.MessageIn; +import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.net.PingMessage; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.MBeanWrapper; +import static org.apache.cassandra.net.MessagingService.Verb.PING; +import static org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType.LARGE_MESSAGE; +import static org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType.SMALL_MESSAGE; + + /** * A dynamic snitch that sorts endpoints by latency with an adapted phi failure detector + * Note that the subclasses (e.g. {@link DynamicEndpointSnitchHistogram}) are responsible for actually measuring + * latency and providing an ISnitchMeasurement implementation back to this class. */ -public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILatencySubscriber, DynamicEndpointSnitchMBean +public abstract class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILatencySubscriber, DynamicEndpointSnitchMBean { -private static final boolean USE_SEVERITY = !Boolean.getBoolean("cassandra.ignore_dynamic_snitch_severity"); +private static final Logger logger = LoggerFactory.getLogger(DynamicEndpointSnitch.class); + +// Latency measurement and ranking. The samples contain latency measurements and the scores are used for ranking +protected boolean registered = false; +protected static final boolean USE_SEVERITY = !Boolean.getBoolean("cassandra.ignore_dynamic_snitch_severity"); +protected volatile Map scores = new HashMap<>(); -private static final double ALPHA = 0.75; // set to 0.75 to make EDS more biased to towards the newer values -private static final int WINDOW_SIZE = 100; +protected final Map samples = new ConcurrentHashMap<>(); -private volatile int dynamicUpdateInterval = DatabaseDescriptor.getDynamicUpdateInterval(); -private volatile int dynamicResetInterval = DatabaseDescriptor.getDynamicResetInterval(); -private volatile double dynamicBadnessThreshold = DatabaseDescriptor.getDynamicBadnessThreshold(); +// Latency probe functionality for actively probing endpoints that we haven't measured recently but are ranking +public static final long MAX_PROBE_INTERVAL_MS = Long.getLong("cassandra.dynamic_snitch_max_probe_interval_ms", 60 * 10 * 1000L); +public static final long MIN_PROBE_INTERVAL_MS = Long.getLong("cassandra.dynamic_snitch_min_probe_interval_ms", 60 * 1000L) ; +// The probe rate is set later when configuration is read in applyConfigChanges +protected static final RateLimiter probeRateLimiter = RateLimiter.create(1); +protected static final DebuggableScheduledThreadPoolExecutor latencyProbe
[GitHub] cassandra pull request #283: CASSANDRA-14459: DynamicEndpointSnitch should n...
Github user aweisberg commented on a diff in the pull request: https://github.com/apache/cassandra/pull/283#discussion_r240308899 --- Diff: src/java/org/apache/cassandra/service/StorageServiceMBean.java --- @@ -463,21 +463,33 @@ public Map getViewBuildStatusesWithPort(String keyspace, String view); /** - * Change endpointsnitch class and dynamic-ness (and dynamic attributes) at runtime. + * Change endpointsnitch class and dynamicsnitch class at runtime. * * This method is used to change the snitch implementation and/or dynamic snitch parameters. * If {@code epSnitchClassName} is specified, it will configure a new snitch instance and make it a - * 'dynamic snitch' if {@code dynamic} is specified and {@code true}. + * 'dynamic snitch' if {@code dynamicSnitchClassName} is not null. * - * The parameters {@code dynamicUpdateInterval}, {@code dynamicResetInterval} and {@code dynamicBadnessThreshold} + * The parameters {@code dynamicUpdateInterval}, {@code dynamicSampleUpdateInterval} and {@code dynamicBadnessThreshold} * can be specified individually to update the parameters of the dynamic snitch during runtime. * - * @param epSnitchClassNamethe canonical path name for a class implementing IEndpointSnitch - * @param dynamic boolean that decides whether dynamicsnitch is used or not - only valid, if {@code epSnitchClassName} is specified - * @param dynamicUpdateIntervalinteger, in ms (defaults to the value configured in cassandra.yaml, which defaults to 100) - * @param dynamicResetInterval integer, in ms (defaults to the value configured in cassandra.yaml, which defaults to 600,000) - * @param dynamicBadnessThreshold double, (defaults to the value configured in cassandra.yaml, which defaults to 0.0) + * @param epSnitchClassNamethe canonical path name for a class implementing IEndpointSnitch or null. + * If null then no snitch change is made. If an empty string the existing + * Snitch class is used. + * @param dynamicSnitchClassName the canonical path name for a class extending DynamicEndpointSnitch. If + * null while epSnitchClassName is not null, this turns off dynamic snitching; + * otherwise just settings are updated. If an empty string is passed then + * dynamic snitching is kept with the default implementation. + * @param dynamicUpdateIntervalinteger, in ms (defaults to the value configured in cassandra.yaml, which defaults to 100) + * @param dynamicSampleUpdateInterval integer, in ms (defaults to the value configured in cassandra.yaml, which defaults to 1,000) --- End diff -- Documenting defaults in too many locations means the docs are likely to be obsoleted. Such as when you fix the bug with the probe rate. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #283: CASSANDRA-14459: DynamicEndpointSnitch should n...
Github user aweisberg commented on a diff in the pull request: https://github.com/apache/cassandra/pull/283#discussion_r239939483 --- Diff: src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java --- @@ -21,133 +21,505 @@ import java.net.InetAddress; import java.net.UnknownHostException; import java.util.*; +import java.util.concurrent.CancellationException; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; -import com.codahale.metrics.ExponentiallyDecayingReservoir; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.RateLimiter; +import com.google.common.util.concurrent.SettableFuture; -import com.codahale.metrics.Snapshot; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor; import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.exceptions.RequestFailureReason; import org.apache.cassandra.gms.ApplicationState; import org.apache.cassandra.gms.EndpointState; import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.gms.VersionedValue; +import org.apache.cassandra.locator.dynamicsnitch.DynamicEndpointSnitchHistogram; +import org.apache.cassandra.net.IAsyncCallbackWithFailure; +import org.apache.cassandra.net.LatencyMeasurementType; +import org.apache.cassandra.net.MessageIn; +import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.net.PingMessage; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.MBeanWrapper; +import static org.apache.cassandra.net.MessagingService.Verb.PING; +import static org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType.LARGE_MESSAGE; +import static org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType.SMALL_MESSAGE; + + /** * A dynamic snitch that sorts endpoints by latency with an adapted phi failure detector + * Note that the subclasses (e.g. {@link DynamicEndpointSnitchHistogram}) are responsible for actually measuring + * latency and providing an ISnitchMeasurement implementation back to this class. */ -public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILatencySubscriber, DynamicEndpointSnitchMBean +public abstract class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILatencySubscriber, DynamicEndpointSnitchMBean { -private static final boolean USE_SEVERITY = !Boolean.getBoolean("cassandra.ignore_dynamic_snitch_severity"); +private static final Logger logger = LoggerFactory.getLogger(DynamicEndpointSnitch.class); + +// Latency measurement and ranking. The samples contain latency measurements and the scores are used for ranking +protected boolean registered = false; +protected static final boolean USE_SEVERITY = !Boolean.getBoolean("cassandra.ignore_dynamic_snitch_severity"); +protected volatile Map scores = new HashMap<>(); -private static final double ALPHA = 0.75; // set to 0.75 to make EDS more biased to towards the newer values -private static final int WINDOW_SIZE = 100; +protected final Map samples = new ConcurrentHashMap<>(); -private volatile int dynamicUpdateInterval = DatabaseDescriptor.getDynamicUpdateInterval(); -private volatile int dynamicResetInterval = DatabaseDescriptor.getDynamicResetInterval(); -private volatile double dynamicBadnessThreshold = DatabaseDescriptor.getDynamicBadnessThreshold(); +// Latency probe functionality for actively probing endpoints that we haven't measured recently but are ranking +public static final long MAX_PROBE_INTERVAL_MS = Long.getLong("cassandra.dynamic_snitch_max_probe_interval_ms", 60 * 10 * 1000L); +public static final long MIN_PROBE_INTERVAL_MS = Long.getLong("cassandra.dynamic_snitch_min_probe_interval_ms", 60 * 1000L) ; +// The probe rate is set later when configuration is read in applyConfigChanges +protected static final RateLimiter probeRateLimiter = RateLimiter.create(1); +protected static final DebuggableScheduledThreadPoolExecutor latencyProbe
[GitHub] cassandra pull request #283: CASSANDRA-14459: DynamicEndpointSnitch should n...
Github user aweisberg commented on a diff in the pull request: https://github.com/apache/cassandra/pull/283#discussion_r240299652 --- Diff: src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java --- @@ -214,19 +600,27 @@ public String getDatacenter(InetAddressAndPort endpoint) { if (subsnitchScore > (sortedScoreIterator.next() * (1.0 + dynamicBadnessThreshold))) { -return sortedByProximityWithScore(address, replicas); +return sortedByProximityWithScore(address, replicas, aliasedScores); } } return replicas; } // Compare endpoints given an immutable snapshot of the scores -private int compareEndpoints(InetAddressAndPort target, Replica a1, Replica a2, Map scores) +public int compareEndpoints(InetAddressAndPort target, Replica a1, Replica a2) --- End diff -- This comment seems wrong and the entire scenario is a little confusing? Why can't this be removed? --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #283: CASSANDRA-14459: DynamicEndpointSnitch should n...
Github user aweisberg commented on a diff in the pull request: https://github.com/apache/cassandra/pull/283#discussion_r240311277 --- Diff: test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java --- @@ -18,35 +18,99 @@ package org.apache.cassandra.locator; -import java.io.IOException; +import java.net.UnknownHostException; --- End diff -- It looks like shutting down the DES is covered a little in the settings test, but it would be good if it validated somewhere the futures are cleaned up and there are no scheduled/running tasks in the executors. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #283: CASSANDRA-14459: DynamicEndpointSnitch should n...
Github user aweisberg commented on a diff in the pull request: https://github.com/apache/cassandra/pull/283#discussion_r239884211 --- Diff: src/java/org/apache/cassandra/net/MessagingService.java --- @@ -729,20 +730,26 @@ void markTimeout(InetAddressAndPort addr) /** * Track latency information for the dynamic snitch * - * @param cb the callback associated with this message -- this lets us know if it's a message type we're interested in - * @param address the host that replied to the message - * @param latency + * @param cbthe callback associated with this message -- this lets us know if it's a message type we're interested in + * @param address the host that replied to the message + * @param latencyMicros the number of microseconds to record for this host */ -public void maybeAddLatency(IAsyncCallback cb, InetAddressAndPort address, long latency) +public void maybeAddLatency(IAsyncCallback cb, InetAddressAndPort address, long latencyMicros) { -if (cb.isLatencyForSnitch()) -addLatency(address, latency); +if (cb.latencyMeasurementType() != LatencyMeasurementType.IGNORE) +addLatency(address, latencyMicros, cb.latencyMeasurementType()); } -public void addLatency(InetAddressAndPort address, long latency) +// Used on the local read path +public void addLatency(InetAddressAndPort address, long latencyMicros) --- End diff -- It's a bit implicit that this is exclusively for adding read latency? --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #283: CASSANDRA-14459: DynamicEndpointSnitch should n...
Github user aweisberg commented on a diff in the pull request: https://github.com/apache/cassandra/pull/283#discussion_r239897722 --- Diff: src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java --- @@ -21,133 +21,505 @@ import java.net.InetAddress; import java.net.UnknownHostException; import java.util.*; +import java.util.concurrent.CancellationException; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; -import com.codahale.metrics.ExponentiallyDecayingReservoir; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.RateLimiter; +import com.google.common.util.concurrent.SettableFuture; -import com.codahale.metrics.Snapshot; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor; import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.exceptions.RequestFailureReason; import org.apache.cassandra.gms.ApplicationState; import org.apache.cassandra.gms.EndpointState; import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.gms.VersionedValue; +import org.apache.cassandra.locator.dynamicsnitch.DynamicEndpointSnitchHistogram; +import org.apache.cassandra.net.IAsyncCallbackWithFailure; +import org.apache.cassandra.net.LatencyMeasurementType; +import org.apache.cassandra.net.MessageIn; +import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.net.PingMessage; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.MBeanWrapper; +import static org.apache.cassandra.net.MessagingService.Verb.PING; +import static org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType.LARGE_MESSAGE; +import static org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType.SMALL_MESSAGE; + + /** * A dynamic snitch that sorts endpoints by latency with an adapted phi failure detector + * Note that the subclasses (e.g. {@link DynamicEndpointSnitchHistogram}) are responsible for actually measuring + * latency and providing an ISnitchMeasurement implementation back to this class. */ -public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILatencySubscriber, DynamicEndpointSnitchMBean +public abstract class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILatencySubscriber, DynamicEndpointSnitchMBean { -private static final boolean USE_SEVERITY = !Boolean.getBoolean("cassandra.ignore_dynamic_snitch_severity"); +private static final Logger logger = LoggerFactory.getLogger(DynamicEndpointSnitch.class); + +// Latency measurement and ranking. The samples contain latency measurements and the scores are used for ranking +protected boolean registered = false; --- End diff -- Should probably have a more specific similar to how mbeanRegistered specifies what is registered. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #292: provide ZSTD compression support to cassandra (...
Github user alexott commented on a diff in the pull request: https://github.com/apache/cassandra/pull/292#discussion_r240125195 --- Diff: src/java/org/apache/cassandra/io/compress/ZSTDCompressor.java --- @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io.compress; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import com.google.common.annotations.VisibleForTesting; +import com.github.luben.zstd.Zstd; + +public class ZSTDCompressor implements ICompressor +{ +public static final int FAST_COMPRESSION = 1; // fastest compression time +public static final int DEFAULT_LEVEL = 3; +public static final int BEST_COMPRESSION = 22;// very good compression ratio +private static final ZSTDCompressor instance = new ZSTDCompressor(); +private static final String COMPRESSION_LEVEL_OPTION_NAME = "compression_level"; +@VisibleForTesting +protected static final int compressionLevel = DEFAULT_LEVEL; + +public static ZSTDCompressor create(Map compressionOptions) +{ + validateCompressionLevel(parseCompressionLevelOption(compressionOptions)); +return instance; +} + +private static void validateCompressionLevel(int compressionLevel) +{ +if (compressionLevel < FAST_COMPRESSION || compressionLevel > BEST_COMPRESSION) +{ +throw new IllegalArgumentException( +"ZSTD compression_level " + Integer.toString(compressionLevel) + " invalid ", +null +); +} +} + +private static int parseCompressionLevelOption(Map compressionOptions) +{ +return Integer.parseInt(compressionOptions.getOrDefault(COMPRESSION_LEVEL_OPTION_NAME, + Integer.toString(DEFAULT_LEVEL))); +} + +@Override +public int initialCompressedBufferLength(int chunkLength) +{ +return (int)Zstd.compressBound(chunkLength); +} + +@Override +public int uncompress(byte[] input, int inputOffset, int inputLength, byte[] output, int outputOffset) +throws IOException +{ +long decompressSize = Zstd.decompressByteArray(output, + outputOffset, + output.length - outputOffset, + input, + inputOffset, + inputLength); +if (Zstd.isError(decompressSize)) +{ +throw new IOException("ZSTD uncompress failed with error reason " + Zstd.getErrorName(decompressSize)); +} + +return (int) decompressSize; +} + +@Override +public void uncompress(ByteBuffer input, ByteBuffer output) throws IOException +{ +Zstd.decompress(output, input); +} + +@Override +public void compress(ByteBuffer input, ByteBuffer output) throws IOException +{ +if (!input.isDirect()) +{ +throw new IllegalArgumentException("input must be a direct buffer"); +} + +if (!output.isDirect()) +{ +throw new IllegalArgumentException("output must be a direct buffer"); +} +long compressedSize = Zstd.compressDirectByteBuffer(output, + output.position(), +output.limit() - output.position(), +inp
[GitHub] cassandra pull request #292: provide ZSTD compression support to cassandra (...
Github user alexott commented on a diff in the pull request: https://github.com/apache/cassandra/pull/292#discussion_r240125007 --- Diff: src/java/org/apache/cassandra/io/compress/ZSTDCompressor.java --- @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io.compress; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import com.google.common.annotations.VisibleForTesting; +import com.github.luben.zstd.Zstd; + +public class ZSTDCompressor implements ICompressor +{ +public static final int FAST_COMPRESSION = 1; // fastest compression time +public static final int DEFAULT_LEVEL = 3; +public static final int BEST_COMPRESSION = 22;// very good compression ratio +private static final ZSTDCompressor instance = new ZSTDCompressor(); +private static final String COMPRESSION_LEVEL_OPTION_NAME = "compression_level"; +@VisibleForTesting +protected static final int compressionLevel = DEFAULT_LEVEL; + +public static ZSTDCompressor create(Map compressionOptions) +{ + validateCompressionLevel(parseCompressionLevelOption(compressionOptions)); +return instance; +} + +private static void validateCompressionLevel(int compressionLevel) +{ +if (compressionLevel < FAST_COMPRESSION || compressionLevel > BEST_COMPRESSION) +{ +throw new IllegalArgumentException( +"ZSTD compression_level " + Integer.toString(compressionLevel) + " invalid ", +null +); +} +} + +private static int parseCompressionLevelOption(Map compressionOptions) +{ +return Integer.parseInt(compressionOptions.getOrDefault(COMPRESSION_LEVEL_OPTION_NAME, + Integer.toString(DEFAULT_LEVEL))); +} + +@Override +public int initialCompressedBufferLength(int chunkLength) +{ +return (int)Zstd.compressBound(chunkLength); +} + +@Override +public int uncompress(byte[] input, int inputOffset, int inputLength, byte[] output, int outputOffset) +throws IOException +{ +long decompressSize = Zstd.decompressByteArray(output, + outputOffset, + output.length - outputOffset, + input, + inputOffset, + inputLength); +if (Zstd.isError(decompressSize)) +{ +throw new IOException("ZSTD uncompress failed with error reason " + Zstd.getErrorName(decompressSize)); +} + +return (int) decompressSize; +} + +@Override +public void uncompress(ByteBuffer input, ByteBuffer output) throws IOException +{ +Zstd.decompress(output, input); --- End diff -- [decompress functions](https://github.com/luben/zstd-jni/blob/master/src/main/java/com/github/luben/zstd/Zstd.java#L189) doesn't throw exceptions - it should be checked the same way as above - with `Zstd.isError`. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #292: provide ZSTD compression support to cassandra (...
Github user alexott commented on a diff in the pull request: https://github.com/apache/cassandra/pull/292#discussion_r240124308 --- Diff: src/java/org/apache/cassandra/io/compress/ZSTDCompressor.java --- @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io.compress; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import com.google.common.annotations.VisibleForTesting; +import com.github.luben.zstd.Zstd; + +public class ZSTDCompressor implements ICompressor +{ +public static final int FAST_COMPRESSION = 1; // fastest compression time +public static final int DEFAULT_LEVEL = 3; +public static final int BEST_COMPRESSION = 22;// very good compression ratio +private static final ZSTDCompressor instance = new ZSTDCompressor(); +private static final String COMPRESSION_LEVEL_OPTION_NAME = "compression_level"; +@VisibleForTesting +protected static final int compressionLevel = DEFAULT_LEVEL; + +public static ZSTDCompressor create(Map compressionOptions) +{ + validateCompressionLevel(parseCompressionLevelOption(compressionOptions)); +return instance; +} + +private static void validateCompressionLevel(int compressionLevel) +{ +if (compressionLevel < FAST_COMPRESSION || compressionLevel > BEST_COMPRESSION) --- End diff -- ZStd also supports negative compression levels that are fast but at cost of compression: > The library supports regular compression levels from 1 up to ZSTD_maxCLevel(), which is currently 22. Levels >= 20, labeled `--ultra`, should be used with caution, as they require more memory. The library also offers negative compression levels, which extend the range of speed vs. ratio preferences. The lower the level, the faster the speed (at the cost of compression). I think that it makes sense to add them as well. I think that the maximal negative value is 17... --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #292: provide ZSTD compression support to cassandra (...
Github user alexott commented on a diff in the pull request: https://github.com/apache/cassandra/pull/292#discussion_r240123941 --- Diff: src/java/org/apache/cassandra/io/compress/ZSTDCompressor.java --- @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io.compress; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import com.google.common.annotations.VisibleForTesting; +import com.github.luben.zstd.Zstd; + +public class ZSTDCompressor implements ICompressor +{ +public static final int FAST_COMPRESSION = 1; // fastest compression time +public static final int DEFAULT_LEVEL = 3; +public static final int BEST_COMPRESSION = 22;// very good compression ratio +private static final ZSTDCompressor instance = new ZSTDCompressor(); +private static final String COMPRESSION_LEVEL_OPTION_NAME = "compression_level"; +@VisibleForTesting +protected static final int compressionLevel = DEFAULT_LEVEL; + +public static ZSTDCompressor create(Map compressionOptions) +{ + validateCompressionLevel(parseCompressionLevelOption(compressionOptions)); +return instance; --- End diff -- Yes, right now the configuration parameter isn't used at all... Regarding the ConcurrentHasMap - it makes sense only when [explicit context](http://facebook.github.io/zstd/zstd_manual.html#Chapter5) is used, but this functionality isn't exposed by JNI bindings yet. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #292: provide ZSTD compression support to cassandra (...
Github user jolynch commented on a diff in the pull request: https://github.com/apache/cassandra/pull/292#discussion_r240091916 --- Diff: build.xml --- @@ -412,6 +412,7 @@ + --- End diff -- Nit: might be worth grabbing the latest version that was recently released. I think latest is 1.3.7-2 or 3. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #292: provide ZSTD compression support to cassandra (...
Github user jolynch commented on a diff in the pull request: https://github.com/apache/cassandra/pull/292#discussion_r240095058 --- Diff: src/java/org/apache/cassandra/io/compress/ZSTDCompressor.java --- @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io.compress; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import com.google.common.annotations.VisibleForTesting; +import com.github.luben.zstd.Zstd; + +public class ZSTDCompressor implements ICompressor +{ +public static final int FAST_COMPRESSION = 1; // fastest compression time +public static final int DEFAULT_LEVEL = 3; +public static final int BEST_COMPRESSION = 22;// very good compression ratio +private static final ZSTDCompressor instance = new ZSTDCompressor(); +private static final String COMPRESSION_LEVEL_OPTION_NAME = "compression_level"; +@VisibleForTesting +protected static final int compressionLevel = DEFAULT_LEVEL; + +public static ZSTDCompressor create(Map compressionOptions) +{ + validateCompressionLevel(parseCompressionLevelOption(compressionOptions)); +return instance; +} + +private static void validateCompressionLevel(int compressionLevel) +{ +if (compressionLevel < FAST_COMPRESSION || compressionLevel > BEST_COMPRESSION) +{ +throw new IllegalArgumentException( +"ZSTD compression_level " + Integer.toString(compressionLevel) + " invalid ", +null +); +} +} + +private static int parseCompressionLevelOption(Map compressionOptions) +{ +return Integer.parseInt(compressionOptions.getOrDefault(COMPRESSION_LEVEL_OPTION_NAME, + Integer.toString(DEFAULT_LEVEL))); +} + +@Override +public int initialCompressedBufferLength(int chunkLength) +{ +return (int)Zstd.compressBound(chunkLength); +} + +@Override +public int uncompress(byte[] input, int inputOffset, int inputLength, byte[] output, int outputOffset) +throws IOException +{ +long decompressSize = Zstd.decompressByteArray(output, + outputOffset, + output.length - outputOffset, + input, + inputOffset, + inputLength); +if (Zstd.isError(decompressSize)) +{ +throw new IOException("ZSTD uncompress failed with error reason " + Zstd.getErrorName(decompressSize)); +} + +return (int) decompressSize; +} + +@Override +public void uncompress(ByteBuffer input, ByteBuffer output) throws IOException +{ +Zstd.decompress(output, input); --- End diff -- I think this needs to catch the [`RuntimeException`](https://github.com/luben/zstd-jni/blob/master/src/main/java/com/github/luben/zstd/Zstd.java#L895) and rethrow as an `IOException`. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #292: provide ZSTD compression support to cassandra (...
Github user jolynch commented on a diff in the pull request: https://github.com/apache/cassandra/pull/292#discussion_r240093947 --- Diff: src/java/org/apache/cassandra/io/compress/ZSTDCompressor.java --- @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io.compress; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import com.google.common.annotations.VisibleForTesting; +import com.github.luben.zstd.Zstd; + +public class ZSTDCompressor implements ICompressor +{ +public static final int FAST_COMPRESSION = 1; // fastest compression time +public static final int DEFAULT_LEVEL = 3; +public static final int BEST_COMPRESSION = 22;// very good compression ratio +private static final ZSTDCompressor instance = new ZSTDCompressor(); +private static final String COMPRESSION_LEVEL_OPTION_NAME = "compression_level"; +@VisibleForTesting +protected static final int compressionLevel = DEFAULT_LEVEL; + +public static ZSTDCompressor create(Map compressionOptions) +{ + validateCompressionLevel(parseCompressionLevelOption(compressionOptions)); +return instance; --- End diff -- If we want to support different sstables/column families having different levels I think that this needs to be something closer to what the LZ4 compressor does where it creates a `ConcurrentHashMap` of instances keyed by parameter rather than just one instance. In practice I _think_ this will lead to every compressor having the default level (might want to TDD and ensure that different parameters yield different instances). --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #292: provide ZSTD compression support to cassandra (...
Github user jolynch commented on a diff in the pull request: https://github.com/apache/cassandra/pull/292#discussion_r240094348 --- Diff: src/java/org/apache/cassandra/io/compress/ZSTDCompressor.java --- @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io.compress; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import com.google.common.annotations.VisibleForTesting; +import com.github.luben.zstd.Zstd; + +public class ZSTDCompressor implements ICompressor +{ +public static final int FAST_COMPRESSION = 1; // fastest compression time +public static final int DEFAULT_LEVEL = 3; +public static final int BEST_COMPRESSION = 22;// very good compression ratio +private static final ZSTDCompressor instance = new ZSTDCompressor(); +private static final String COMPRESSION_LEVEL_OPTION_NAME = "compression_level"; +@VisibleForTesting +protected static final int compressionLevel = DEFAULT_LEVEL; + +public static ZSTDCompressor create(Map compressionOptions) +{ + validateCompressionLevel(parseCompressionLevelOption(compressionOptions)); +return instance; +} + +private static void validateCompressionLevel(int compressionLevel) +{ +if (compressionLevel < FAST_COMPRESSION || compressionLevel > BEST_COMPRESSION) +{ +throw new IllegalArgumentException( +"ZSTD compression_level " + Integer.toString(compressionLevel) + " invalid ", +null +); +} +} + +private static int parseCompressionLevelOption(Map compressionOptions) +{ +return Integer.parseInt(compressionOptions.getOrDefault(COMPRESSION_LEVEL_OPTION_NAME, + Integer.toString(DEFAULT_LEVEL))); +} + +@Override +public int initialCompressedBufferLength(int chunkLength) +{ +return (int)Zstd.compressBound(chunkLength); +} + +@Override +public int uncompress(byte[] input, int inputOffset, int inputLength, byte[] output, int outputOffset) +throws IOException +{ +long decompressSize = Zstd.decompressByteArray(output, + outputOffset, + output.length - outputOffset, + input, + inputOffset, + inputLength); +if (Zstd.isError(decompressSize)) +{ +throw new IOException("ZSTD uncompress failed with error reason " + Zstd.getErrorName(decompressSize)); +} + +return (int) decompressSize; +} + +@Override +public void uncompress(ByteBuffer input, ByteBuffer output) throws IOException +{ +Zstd.decompress(output, input); +} + +@Override +public void compress(ByteBuffer input, ByteBuffer output) throws IOException +{ +if (!input.isDirect()) +{ +throw new IllegalArgumentException("input must be a direct buffer"); +} + +if (!output.isDirect()) +{ +throw new IllegalArgumentException("output must be a direct buffer"); +} +long compressedSize = Zstd.compressDirectByteBuffer(output, + output.position(), +output.limit() - output.position(), +inp
[GitHub] cassandra pull request #292: provide ZSTD compression support to cassandra (...
Github user jolynch commented on a diff in the pull request: https://github.com/apache/cassandra/pull/292#discussion_r240092674 --- Diff: src/java/org/apache/cassandra/io/compress/ZSTDCompressor.java --- @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io.compress; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import com.google.common.annotations.VisibleForTesting; +import com.github.luben.zstd.Zstd; + +public class ZSTDCompressor implements ICompressor +{ +public static final int FAST_COMPRESSION = 1; // fastest compression time +public static final int DEFAULT_LEVEL = 3; +public static final int BEST_COMPRESSION = 22;// very good compression ratio +private static final ZSTDCompressor instance = new ZSTDCompressor(); +private static final String COMPRESSION_LEVEL_OPTION_NAME = "compression_level"; +@VisibleForTesting +protected static final int compressionLevel = DEFAULT_LEVEL; + +public static ZSTDCompressor create(Map compressionOptions) +{ + validateCompressionLevel(parseCompressionLevelOption(compressionOptions)); +return instance; +} + +private static void validateCompressionLevel(int compressionLevel) +{ +if (compressionLevel < FAST_COMPRESSION || compressionLevel > BEST_COMPRESSION) +{ +throw new IllegalArgumentException( +"ZSTD compression_level " + Integer.toString(compressionLevel) + " invalid ", +null +); +} +} + +private static int parseCompressionLevelOption(Map compressionOptions) +{ +return Integer.parseInt(compressionOptions.getOrDefault(COMPRESSION_LEVEL_OPTION_NAME, + Integer.toString(DEFAULT_LEVEL))); +} + +@Override +public int initialCompressedBufferLength(int chunkLength) +{ +return (int)Zstd.compressBound(chunkLength); +} + +@Override +public int uncompress(byte[] input, int inputOffset, int inputLength, byte[] output, int outputOffset) +throws IOException +{ +long decompressSize = Zstd.decompressByteArray(output, + outputOffset, + output.length - outputOffset, + input, + inputOffset, + inputLength); +if (Zstd.isError(decompressSize)) +{ +throw new IOException("ZSTD uncompress failed with error reason " + Zstd.getErrorName(decompressSize)); +} + +return (int) decompressSize; +} + +@Override +public void uncompress(ByteBuffer input, ByteBuffer output) throws IOException +{ +Zstd.decompress(output, input); +} + +@Override +public void compress(ByteBuffer input, ByteBuffer output) throws IOException --- End diff -- I think you may be able to replace this with the [``compress(ByteBuffer dstBuf, ByteBuffer srcBuf, int level)``](https://github.com/luben/zstd-jni/blob/master/src/main/java/com/github/luben/zstd/Zstd.java#L535) method, you just have to make sure to pass the arguments in the opposite order (output comes first) and re-throw the `RuntimeException` as an `IOException`. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #283: CASSANDRA-14459: DynamicEndpointSnitch should n...
Github user jolynch commented on a diff in the pull request: https://github.com/apache/cassandra/pull/283#discussion_r238172698 --- Diff: src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java --- @@ -154,31 +326,203 @@ private void registerMBean() public void close() { -updateSchedular.cancel(false); -resetSchedular.cancel(false); +if (updateScoresScheduler != null) +updateScoresScheduler.cancel(false); +if (updateSamplesScheduler != null) +updateSamplesScheduler.cancel(false); --- End diff -- Ok, I added a cancel+wait method to do this, and added in some synchronization around `close`/`open` that I believe is is needed to ensure that this method works 100% of the time during snitch updates. This actually fixes a bug I believe exists in trunk where we just hoped that we didn't get an exception on Snitch construction in https://github.com/apache/cassandra/blob/f3609995c09570d523527d9bd0fd69c2bc65d986/src/java/org/apache/cassandra/service/StorageService.java#L4975-L4986 --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #283: CASSANDRA-14459: DynamicEndpointSnitch should n...
Github user jolynch commented on a diff in the pull request: https://github.com/apache/cassandra/pull/283#discussion_r238169787 --- Diff: src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java --- @@ -154,31 +326,203 @@ private void registerMBean() public void close() { -updateSchedular.cancel(false); -resetSchedular.cancel(false); +if (updateScoresScheduler != null) +updateScoresScheduler.cancel(false); +if (updateSamplesScheduler != null) +updateSamplesScheduler.cancel(false); + +for (AnnotatedMeasurement measurement : samples.values()) +{ +if (measurement.probeFuture != null) +measurement.probeFuture.cancel(false); + +measurement.millisSinceLastMeasure.set(0); +measurement.millisSinceLastRequest.set(MAX_PROBE_INTERVAL_MS); +measurement.probeTimerMillis = 0; +} MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); try { -mbs.unregisterMBean(new ObjectName(mbeanName)); +if (mbeanRegistered) +mbs.unregisterMBean(new ObjectName(mbeanName)); } catch (Exception e) { throw new RuntimeException(e); } } +/** + * Background task running on the samples dictionary. The default implementation sends latency probes (PING) + * messages to explore nodes that we have not received timings for recently but have ranked in + * {@link DynamicEndpointSnitch#sortedByProximity(InetAddressAndPort, ReplicaCollection)}. + */ +protected void updateSamples() +{ +// Split calculation of probe timers from sending probes for testability +calculateProbes(samples, dynamicLatencyProbeInterval); + +if (!StorageService.instance.isGossipActive()) --- End diff -- This needs to be here so that the bookeeping clocks keep ticking. Basically we do the calculation but don't send any probes (respecting a disabled gossip indicating we shouldn't be sending messages). --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #283: CASSANDRA-14459: DynamicEndpointSnitch should n...
Github user jolynch commented on a diff in the pull request: https://github.com/apache/cassandra/pull/283#discussion_r238167792 --- Diff: src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java --- @@ -154,20 +228,101 @@ private void registerMBean() public void close() { -updateSchedular.cancel(false); -resetSchedular.cancel(false); +if (updateScheduler != null) +updateScheduler.cancel(false); +if (latencyProbeScheduler != null) +latencyProbeScheduler.cancel(false); MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); try { -mbs.unregisterMBean(new ObjectName(mbeanName)); +if (mbeanRegistered) +mbs.unregisterMBean(new ObjectName(mbeanName)); } catch (Exception e) { throw new RuntimeException(e); } } +/** + * Determines if latency probes need to be sent, and potentially sends a single latency probe per invocation + */ +protected void maybeSendLatencyProbe() +{ +long currentTimeNS = System.nanoTime(); +markNextAllowedProbeGenerationTime(currentTimeNS); + +Optional needsProbe = latencyProbeNeeded(currentTimeNS); +needsProbe.ifPresent(this::sendPingMessageToPeer); +} + +/** + * Determines which peers need latency at a particular time. Note that this takes currentTimeNS for testability + * of this code path. + * @param currentTimeNS The current time to evaluate. Used mostly for testing. + * @return An Optional that if present contains a host to probe. + */ +@VisibleForTesting +Optional latencyProbeNeeded(long currentTimeNS) { +if (currentProbePosition >= latencyProbeSequence.size() && (currentTimeNS > nextProbeGenerationTime)) +{ +nextProbeGenerationTime = nextAllowedProbeGenerationTime; +latencyProbeSequence.clear(); + +// Delegate to the subclass to actually figure out what the probe sequence should be +updateLatencyProbeSequence(latencyProbeSequence); + +if (latencyProbeSequence.size() > 0) +Collections.shuffle(latencyProbeSequence); + +currentProbePosition = 0; +} + +if (currentProbePosition < latencyProbeSequence.size()) +{ +try +{ +return Optional.of(latencyProbeSequence.get(currentProbePosition++)); +} +catch (IndexOutOfBoundsException ignored) {} +} + +return Optional.empty(); +} + +private void sendPingMessageToPeer(InetAddressAndPort to) +{ +logger.trace("Sending a small and large PingMessage to {}", to); +IAsyncCallback latencyProbeHandler = new IAsyncCallback() --- End diff -- In the latest version I kept the changed API but I can remove it you think it'll make the PR easier to review/merge. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #283: CASSANDRA-14459: DynamicEndpointSnitch should n...
Github user jolynch commented on a diff in the pull request: https://github.com/apache/cassandra/pull/283#discussion_r237979844 --- Diff: src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java --- @@ -154,31 +326,203 @@ private void registerMBean() public void close() { -updateSchedular.cancel(false); -resetSchedular.cancel(false); +if (updateScoresScheduler != null) +updateScoresScheduler.cancel(false); +if (updateSamplesScheduler != null) +updateSamplesScheduler.cancel(false); + +for (AnnotatedMeasurement measurement : samples.values()) +{ +if (measurement.probeFuture != null) +measurement.probeFuture.cancel(false); + +measurement.millisSinceLastMeasure.set(0); +measurement.millisSinceLastRequest.set(MAX_PROBE_INTERVAL_MS); +measurement.probeTimerMillis = 0; +} MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); try { -mbs.unregisterMBean(new ObjectName(mbeanName)); +if (mbeanRegistered) +mbs.unregisterMBean(new ObjectName(mbeanName)); } catch (Exception e) { throw new RuntimeException(e); } } +/** + * Background task running on the samples dictionary. The default implementation sends latency probes (PING) + * messages to explore nodes that we have not received timings for recently but have ranked in + * {@link DynamicEndpointSnitch#sortedByProximity(InetAddressAndPort, ReplicaCollection)}. + */ +protected void updateSamples() +{ +// Split calculation of probe timers from sending probes for testability +calculateProbes(samples, dynamicLatencyProbeInterval); + +if (!StorageService.instance.isGossipActive()) +return; + +schedulePings(samples); +} + +/** + * This method mutates the passed AnnotatedMeasurements to implement capped exponential backoff per endpoint. + * + * The algorithm is as follows: + * 1. All samples get their millisSinceLastMeasure and millisSinceLastRequest fields + *incremented by the passed interval + * 2. Any recently requested (ranked) endpoints that have not been measured recently (e.g. because the snitch + *has sent them no traffic) get probes with exponential backoff. + * + * The backoff is capped at MAX_PROBE_INTERVAL_MS. Furthermore the probes are stopped after + * MAX_PROBE_INTERVAL_MS of no ranking requests as well. + * + * At the end of this method, any passed AnnotatedMeasurements that need latency probes will have non zero + * probeTimerMillis members set. + */ +@VisibleForTesting +static void calculateProbes(Map samples, long intervalMillis) { +for (Map.Entry entry: samples.entrySet()) +{ +if (entry.getKey().equals(FBUtilities.getBroadcastAddressAndPort())) +continue; + +AnnotatedMeasurement measurement = entry.getValue(); +long lastMeasure = measurement.millisSinceLastMeasure.getAndAdd(intervalMillis); +long lastRequest = measurement.millisSinceLastRequest.getAndAdd(intervalMillis); + +if (lastMeasure >= MIN_PROBE_INTERVAL_MS && lastRequest < MAX_PROBE_INTERVAL_MS) +{ +if (measurement.probeTimerMillis == 0) +{ +measurement.probeTimerMillis = intervalMillis; --- End diff -- It might not be null if we had scheduled a 10 minute probe and then got a real reading. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #283: CASSANDRA-14459: DynamicEndpointSnitch should n...
Github user jolynch commented on a diff in the pull request: https://github.com/apache/cassandra/pull/283#discussion_r237978698 --- Diff: src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java --- @@ -154,31 +326,203 @@ private void registerMBean() public void close() { -updateSchedular.cancel(false); -resetSchedular.cancel(false); +if (updateScoresScheduler != null) +updateScoresScheduler.cancel(false); +if (updateSamplesScheduler != null) +updateSamplesScheduler.cancel(false); + +for (AnnotatedMeasurement measurement : samples.values()) +{ +if (measurement.probeFuture != null) +measurement.probeFuture.cancel(false); + +measurement.millisSinceLastMeasure.set(0); +measurement.millisSinceLastRequest.set(MAX_PROBE_INTERVAL_MS); +measurement.probeTimerMillis = 0; +} MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); try { -mbs.unregisterMBean(new ObjectName(mbeanName)); +if (mbeanRegistered) +mbs.unregisterMBean(new ObjectName(mbeanName)); } catch (Exception e) { throw new RuntimeException(e); } } +/** + * Background task running on the samples dictionary. The default implementation sends latency probes (PING) + * messages to explore nodes that we have not received timings for recently but have ranked in + * {@link DynamicEndpointSnitch#sortedByProximity(InetAddressAndPort, ReplicaCollection)}. + */ +protected void updateSamples() +{ +// Split calculation of probe timers from sending probes for testability +calculateProbes(samples, dynamicLatencyProbeInterval); + +if (!StorageService.instance.isGossipActive()) +return; + +schedulePings(samples); +} + +/** + * This method mutates the passed AnnotatedMeasurements to implement capped exponential backoff per endpoint. + * + * The algorithm is as follows: + * 1. All samples get their millisSinceLastMeasure and millisSinceLastRequest fields + *incremented by the passed interval + * 2. Any recently requested (ranked) endpoints that have not been measured recently (e.g. because the snitch + *has sent them no traffic) get probes with exponential backoff. + * + * The backoff is capped at MAX_PROBE_INTERVAL_MS. Furthermore the probes are stopped after + * MAX_PROBE_INTERVAL_MS of no ranking requests as well. + * + * At the end of this method, any passed AnnotatedMeasurements that need latency probes will have non zero + * probeTimerMillis members set. + */ +@VisibleForTesting +static void calculateProbes(Map samples, long intervalMillis) { +for (Map.Entry entry: samples.entrySet()) +{ +if (entry.getKey().equals(FBUtilities.getBroadcastAddressAndPort())) +continue; + +AnnotatedMeasurement measurement = entry.getValue(); +long lastMeasure = measurement.millisSinceLastMeasure.getAndAdd(intervalMillis); +long lastRequest = measurement.millisSinceLastRequest.getAndAdd(intervalMillis); + +if (lastMeasure >= MIN_PROBE_INTERVAL_MS && lastRequest < MAX_PROBE_INTERVAL_MS) +{ +if (measurement.probeTimerMillis == 0) +{ +measurement.probeTimerMillis = intervalMillis; +} +else if (measurement.probeFuture != null && measurement.probeFuture.isDone()) +{ +measurement.probeTimerMillis = Math.min(MAX_PROBE_INTERVAL_MS, measurement.probeTimerMillis * 2); +} +} +else +{ +measurement.probeTimerMillis = 0; +} +} +} + +@VisibleForTesting +void schedulePings(Map samples) +{ +for (Map.Entry entry: samples.entrySet()) +{ +AnnotatedMeasurement measurement = entry.getValue(); +long delay = measurement.probeTimerMillis; +long millisSinceLastRequest = measurement.millisSinceLastRequest.get(); + +if (millisSinceLastRequest > MAX_PROBE_INTERVAL_MS && !Gossiper.instance.isAlive(entry.getKey())) +{ +samples.remove(entry.getKey()); +} + +if (delay > 0 && millisSinc
[GitHub] cassandra pull request #283: CASSANDRA-14459: DynamicEndpointSnitch should n...
Github user jolynch commented on a diff in the pull request: https://github.com/apache/cassandra/pull/283#discussion_r237971740 --- Diff: src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java --- @@ -154,31 +326,203 @@ private void registerMBean() public void close() { -updateSchedular.cancel(false); -resetSchedular.cancel(false); +if (updateScoresScheduler != null) +updateScoresScheduler.cancel(false); +if (updateSamplesScheduler != null) +updateSamplesScheduler.cancel(false); + +for (AnnotatedMeasurement measurement : samples.values()) +{ +if (measurement.probeFuture != null) +measurement.probeFuture.cancel(false); + +measurement.millisSinceLastMeasure.set(0); +measurement.millisSinceLastRequest.set(MAX_PROBE_INTERVAL_MS); +measurement.probeTimerMillis = 0; +} MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); try { -mbs.unregisterMBean(new ObjectName(mbeanName)); +if (mbeanRegistered) +mbs.unregisterMBean(new ObjectName(mbeanName)); } catch (Exception e) { throw new RuntimeException(e); } } +/** + * Background task running on the samples dictionary. The default implementation sends latency probes (PING) + * messages to explore nodes that we have not received timings for recently but have ranked in + * {@link DynamicEndpointSnitch#sortedByProximity(InetAddressAndPort, ReplicaCollection)}. + */ +protected void updateSamples() +{ +// Split calculation of probe timers from sending probes for testability +calculateProbes(samples, dynamicLatencyProbeInterval); + +if (!StorageService.instance.isGossipActive()) +return; + +schedulePings(samples); +} + +/** + * This method mutates the passed AnnotatedMeasurements to implement capped exponential backoff per endpoint. + * + * The algorithm is as follows: + * 1. All samples get their millisSinceLastMeasure and millisSinceLastRequest fields + *incremented by the passed interval + * 2. Any recently requested (ranked) endpoints that have not been measured recently (e.g. because the snitch + *has sent them no traffic) get probes with exponential backoff. + * + * The backoff is capped at MAX_PROBE_INTERVAL_MS. Furthermore the probes are stopped after + * MAX_PROBE_INTERVAL_MS of no ranking requests as well. + * + * At the end of this method, any passed AnnotatedMeasurements that need latency probes will have non zero + * probeTimerMillis members set. + */ +@VisibleForTesting +static void calculateProbes(Map samples, long intervalMillis) { +for (Map.Entry entry: samples.entrySet()) +{ +if (entry.getKey().equals(FBUtilities.getBroadcastAddressAndPort())) +continue; + +AnnotatedMeasurement measurement = entry.getValue(); +long lastMeasure = measurement.millisSinceLastMeasure.getAndAdd(intervalMillis); --- End diff -- The messaging rate is so low let's punt on the metric for now? I'm happy to add it as a follow up patch but I'd like to get the pluggability and probes in first. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #283: CASSANDRA-14459: DynamicEndpointSnitch should n...
Github user jolynch commented on a diff in the pull request: https://github.com/apache/cassandra/pull/283#discussion_r237961057 --- Diff: src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java --- @@ -154,31 +326,203 @@ private void registerMBean() public void close() { -updateSchedular.cancel(false); -resetSchedular.cancel(false); +if (updateScoresScheduler != null) +updateScoresScheduler.cancel(false); +if (updateSamplesScheduler != null) +updateSamplesScheduler.cancel(false); + +for (AnnotatedMeasurement measurement : samples.values()) +{ +if (measurement.probeFuture != null) +measurement.probeFuture.cancel(false); + +measurement.millisSinceLastMeasure.set(0); +measurement.millisSinceLastRequest.set(MAX_PROBE_INTERVAL_MS); +measurement.probeTimerMillis = 0; +} MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); try { -mbs.unregisterMBean(new ObjectName(mbeanName)); +if (mbeanRegistered) +mbs.unregisterMBean(new ObjectName(mbeanName)); } catch (Exception e) { throw new RuntimeException(e); } } +/** + * Background task running on the samples dictionary. The default implementation sends latency probes (PING) + * messages to explore nodes that we have not received timings for recently but have ranked in + * {@link DynamicEndpointSnitch#sortedByProximity(InetAddressAndPort, ReplicaCollection)}. + */ +protected void updateSamples() +{ +// Split calculation of probe timers from sending probes for testability +calculateProbes(samples, dynamicLatencyProbeInterval); + +if (!StorageService.instance.isGossipActive()) +return; + +schedulePings(samples); +} + +/** + * This method mutates the passed AnnotatedMeasurements to implement capped exponential backoff per endpoint. + * + * The algorithm is as follows: + * 1. All samples get their millisSinceLastMeasure and millisSinceLastRequest fields + *incremented by the passed interval + * 2. Any recently requested (ranked) endpoints that have not been measured recently (e.g. because the snitch + *has sent them no traffic) get probes with exponential backoff. + * + * The backoff is capped at MAX_PROBE_INTERVAL_MS. Furthermore the probes are stopped after + * MAX_PROBE_INTERVAL_MS of no ranking requests as well. + * + * At the end of this method, any passed AnnotatedMeasurements that need latency probes will have non zero + * probeTimerMillis members set. + */ +@VisibleForTesting +static void calculateProbes(Map samples, long intervalMillis) { +for (Map.Entry entry: samples.entrySet()) +{ +if (entry.getKey().equals(FBUtilities.getBroadcastAddressAndPort())) +continue; + +AnnotatedMeasurement measurement = entry.getValue(); +long lastMeasure = measurement.millisSinceLastMeasure.getAndAdd(intervalMillis); +long lastRequest = measurement.millisSinceLastRequest.getAndAdd(intervalMillis); + +if (lastMeasure >= MIN_PROBE_INTERVAL_MS && lastRequest < MAX_PROBE_INTERVAL_MS) +{ +if (measurement.probeTimerMillis == 0) +{ +measurement.probeTimerMillis = intervalMillis; +} +else if (measurement.probeFuture != null && measurement.probeFuture.isDone()) +{ +measurement.probeTimerMillis = Math.min(MAX_PROBE_INTERVAL_MS, measurement.probeTimerMillis * 2); +} +} +else +{ +measurement.probeTimerMillis = 0; +} +} +} + +@VisibleForTesting +void schedulePings(Map samples) +{ +for (Map.Entry entry: samples.entrySet()) +{ +AnnotatedMeasurement measurement = entry.getValue(); +long delay = measurement.probeTimerMillis; +long millisSinceLastRequest = measurement.millisSinceLastRequest.get(); + +if (millisSinceLastRequest > MAX_PROBE_INTERVAL_MS && !Gossiper.instance.isAlive(entry.getKey())) +{ +samples.remove(entry.getKey()); +} + +if (delay > 0 && millisSinc
[GitHub] cassandra pull request #283: CASSANDRA-14459: DynamicEndpointSnitch should n...
Github user jolynch commented on a diff in the pull request: https://github.com/apache/cassandra/pull/283#discussion_r237960721 --- Diff: src/java/org/apache/cassandra/service/StorageService.java --- @@ -4940,42 +4940,61 @@ public int getDynamicUpdateInterval() return DatabaseDescriptor.getDynamicUpdateInterval(); } +@Override public void updateSnitch(String epSnitchClassName, Boolean dynamic, Integer dynamicUpdateInterval, Integer dynamicResetInterval, Double dynamicBadnessThreshold) throws ClassNotFoundException +{ +updateSnitch(epSnitchClassName, dynamic ? "" : null, dynamicUpdateInterval, null, dynamicBadnessThreshold); +} + +@Override +public void updateSnitch(String epSnitchClassName, String dynamicSnitchClassName, Integer dynamicUpdateInterval, Integer dynamicSampleUpdateInterval, Double dynamicBadnessThreshold) throws ClassNotFoundException { // apply dynamic snitch configuration if (dynamicUpdateInterval != null) DatabaseDescriptor.setDynamicUpdateInterval(dynamicUpdateInterval); -if (dynamicResetInterval != null) - DatabaseDescriptor.setDynamicResetInterval(dynamicResetInterval); if (dynamicBadnessThreshold != null) DatabaseDescriptor.setDynamicBadnessThreshold(dynamicBadnessThreshold); +if (dynamicSampleUpdateInterval != null) --- End diff -- That's what I figured, also makes testing a lot easier. I added a bunch of tests of the various swaps as well. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #283: CASSANDRA-14459: DynamicEndpointSnitch should n...
Github user jolynch commented on a diff in the pull request: https://github.com/apache/cassandra/pull/283#discussion_r237960560 --- Diff: src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java --- @@ -154,20 +310,155 @@ private void registerMBean() public void close() { -updateSchedular.cancel(false); -resetSchedular.cancel(false); +if (updateScheduler != null) +updateScheduler.cancel(false); +if (latencyProbeScheduler != null) +latencyProbeScheduler.cancel(false); MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); try { -mbs.unregisterMBean(new ObjectName(mbeanName)); +if (mbeanRegistered) +mbs.unregisterMBean(new ObjectName(mbeanName)); } catch (Exception e) { throw new RuntimeException(e); } } +/** + * Determines if latency probes need to be sent, and potentially sends a single latency probe per invocation + */ +protected void maybeSendLatencyProbe() +{ +if (!StorageService.instance.isGossipActive()) +return; + +currentProbePosition = latencyProbeNeeded(samples, latencyProbeSequence, currentProbePosition); + +if (currentProbePosition < latencyProbeSequence.size()) +{ +try +{ +InetAddressAndPort peer = latencyProbeSequence.get(currentProbePosition); +sendPingMessageToPeer(peer); +} +catch (IndexOutOfBoundsException ignored) {} +} +} + +/** + * This method (unfortunately) mutates a lot of state so that it doesn't create any garbage and only iterates the + * sample map a single time . In particular on every call we: + * - increment every sample's intervalsSinceLastMeasure + * + * When probes should be generated we also potentially: + * - reset sample's recentlyRequested that have reached the "CONSTANT" phase of probing (10 minutes by default) + * - add any InetAddressAndPort's that need probing to the provided endpointsToProbe + * - shuffle the endpointsToProbe + * + * If there are probes to be sent, this method short circuits all generation of probes and just returns the + * passed probePosition plus one. + * @return The position of the passed endpointsToProbe that should be probed. + */ +@VisibleForTesting +int latencyProbeNeeded(Map samples, + List endpointsToProbe, int probePosition) { +boolean shouldGenerateProbes = (probePosition >= endpointsToProbe.size()); + +if (shouldGenerateProbes) +{ +endpointsToProbe.clear(); +samples.keySet().retainAll(Gossiper.instance.getLiveMembers()); +} + +// We have to increment intervalsSinceLastMeasure regardless of if we generate probes +for (Map.Entry entry: samples.entrySet()) +{ +AnnotatedMeasurement measurement = entry.getValue(); +long intervalsSinceLastMeasure = measurement.intervalsSinceLastMeasure.getAndIncrement(); --- End diff -- I believe that I've addressed this feedback, although I didn't end up looking at the futures in receiveTiming since that's performance critical (we'll maybe send an extra probe, not a big deal imo) --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #283: CASSANDRA-14459: DynamicEndpointSnitch should n...
Github user jolynch commented on a diff in the pull request: https://github.com/apache/cassandra/pull/283#discussion_r237958792 --- Diff: src/java/org/apache/cassandra/locator/dynamicsnitch/DynamicEndpointSnitchEMA.java --- @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.locator.dynamicsnitch; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +import com.google.common.annotations.VisibleForTesting; + +import org.apache.cassandra.locator.DynamicEndpointSnitch; +import org.apache.cassandra.locator.IEndpointSnitch; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.metrics.ExponentialMovingAverage; + + +/** + * A dynamic snitching implementation that uses Exponentially Moving Averages as a low pass filter to prefer + * or de-prefer hosts + * + * This implementation generates a few orders of magnitude less garbage than histograms and is close to 10x faster, + * but as it is not a Median LPF (it is an Average LPF), it is more vulnerable to noise. This may be acceptable but + * given the significant change in behavior this is not the default in 4.0 + */ +public class DynamicEndpointSnitchEMA extends DynamicEndpointSnitch +{ +// A ~10 sample EMA heavily weighted to the past values to minimize noise +private static final double EMA_ALPHA = 0.10; + +protected static class EMASnitchMeasurement implements ISnitchMeasurement +{ +public final ExponentialMovingAverage avg; + +EMASnitchMeasurement(double initial) +{ +avg = new ExponentialMovingAverage(EMA_ALPHA, initial); +} + +@Override +public void sample(long value) +{ +avg.update(value); +} + +@Override +public double measure() +{ +return avg.getAvg(); +} + +@Override +public Iterable measurements() +{ +return Collections.singletonList(avg.getAvg()); +} +} + +// Called via reflection +public DynamicEndpointSnitchEMA(IEndpointSnitch snitch) +{ +this(snitch, "ema"); +} + +public DynamicEndpointSnitchEMA(IEndpointSnitch snitch, String instance) +{ +super(snitch, instance); +} + +@Override +protected ISnitchMeasurement measurementImpl(long initialValue) +{ +return new EMASnitchMeasurement(initialValue); +} + +/** + * Unlike the Histogram implementation, calling this measure method is reasonably cheap (doesn't require a + * Snapshot or anything) so we can skip a round of iterations and just normalize the scores slightly + * differently + */ +@Override +public Map calculateScores() +{ +// We're going to weight the latency for each host against the worst one we see, to +// arrive at sort of a 'badness percentage' for them. First, find the worst for each: +HashMap newScores = new HashMap<>(samples.size()); +Optional maxObservedAvgLatency = samples.values().stream() --- End diff -- I went ahead and removed the EMA entirely, let's follow up on that in another change. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #283: CASSANDRA-14459: DynamicEndpointSnitch should n...
Github user jolynch commented on a diff in the pull request: https://github.com/apache/cassandra/pull/283#discussion_r237956751 --- Diff: src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java --- @@ -154,31 +326,203 @@ private void registerMBean() public void close() { -updateSchedular.cancel(false); -resetSchedular.cancel(false); +if (updateScoresScheduler != null) +updateScoresScheduler.cancel(false); +if (updateSamplesScheduler != null) +updateSamplesScheduler.cancel(false); + +for (AnnotatedMeasurement measurement : samples.values()) +{ +if (measurement.probeFuture != null) +measurement.probeFuture.cancel(false); + +measurement.millisSinceLastMeasure.set(0); +measurement.millisSinceLastRequest.set(MAX_PROBE_INTERVAL_MS); +measurement.probeTimerMillis = 0; +} MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); try { -mbs.unregisterMBean(new ObjectName(mbeanName)); +if (mbeanRegistered) +mbs.unregisterMBean(new ObjectName(mbeanName)); } catch (Exception e) { throw new RuntimeException(e); } } +/** + * Background task running on the samples dictionary. The default implementation sends latency probes (PING) + * messages to explore nodes that we have not received timings for recently but have ranked in + * {@link DynamicEndpointSnitch#sortedByProximity(InetAddressAndPort, ReplicaCollection)}. + */ +protected void updateSamples() +{ +// Split calculation of probe timers from sending probes for testability +calculateProbes(samples, dynamicLatencyProbeInterval); + +if (!StorageService.instance.isGossipActive()) +return; + +schedulePings(samples); +} + +/** + * This method mutates the passed AnnotatedMeasurements to implement capped exponential backoff per endpoint. + * + * The algorithm is as follows: + * 1. All samples get their millisSinceLastMeasure and millisSinceLastRequest fields + *incremented by the passed interval + * 2. Any recently requested (ranked) endpoints that have not been measured recently (e.g. because the snitch + *has sent them no traffic) get probes with exponential backoff. + * + * The backoff is capped at MAX_PROBE_INTERVAL_MS. Furthermore the probes are stopped after + * MAX_PROBE_INTERVAL_MS of no ranking requests as well. + * + * At the end of this method, any passed AnnotatedMeasurements that need latency probes will have non zero + * probeTimerMillis members set. + */ +@VisibleForTesting +static void calculateProbes(Map samples, long intervalMillis) { +for (Map.Entry entry: samples.entrySet()) +{ +if (entry.getKey().equals(FBUtilities.getBroadcastAddressAndPort())) +continue; + +AnnotatedMeasurement measurement = entry.getValue(); +long lastMeasure = measurement.millisSinceLastMeasure.getAndAdd(intervalMillis); +long lastRequest = measurement.millisSinceLastRequest.getAndAdd(intervalMillis); + +if (lastMeasure >= MIN_PROBE_INTERVAL_MS && lastRequest < MAX_PROBE_INTERVAL_MS) +{ +if (measurement.probeTimerMillis == 0) +{ +measurement.probeTimerMillis = intervalMillis; +} +else if (measurement.probeFuture != null && measurement.probeFuture.isDone()) +{ +measurement.probeTimerMillis = Math.min(MAX_PROBE_INTERVAL_MS, measurement.probeTimerMillis * 2); +} +} +else +{ +measurement.probeTimerMillis = 0; +} +} +} + +@VisibleForTesting +void schedulePings(Map samples) +{ +for (Map.Entry entry: samples.entrySet()) +{ +AnnotatedMeasurement measurement = entry.getValue(); +long delay = measurement.probeTimerMillis; +long millisSinceLastRequest = measurement.millisSinceLastRequest.get(); + +if (millisSinceLastRequest > MAX_PROBE_INTERVAL_MS && !Gossiper.instance.isAlive(entry.getKey())) +{ +samples.remove(entry.getKey()); +} + +if (delay > 0 && millisSinc
[GitHub] cassandra pull request #294: Fixing logging of netty client related ioexcept...
Github user sumanth-pasupuleti commented on a diff in the pull request: https://github.com/apache/cassandra/pull/294#discussion_r237562392 --- Diff: src/java/org/apache/cassandra/transport/Message.java --- @@ -710,7 +710,7 @@ public boolean apply(Throwable exception) boolean isIOException = exception instanceof IOException || (exception.getCause() instanceof IOException); if (!alwaysLogAtError && isIOException) { -if (ioExceptionsAtDebugLevel.contains(exception.getMessage())) +if (ioExceptionsAtDebugLevel.stream().anyMatch(ioExceptionAtDebugLevel -> exception.getMessage().contains(ioExceptionAtDebugLevel))) --- End diff -- Thank you, I was under the delusion, streaming API is better than for loop. Fixed. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #294: Fixing logging of netty client related ioexcept...
Github user jasobrown commented on a diff in the pull request: https://github.com/apache/cassandra/pull/294#discussion_r237509397 --- Diff: src/java/org/apache/cassandra/transport/Message.java --- @@ -710,7 +710,7 @@ public boolean apply(Throwable exception) boolean isIOException = exception instanceof IOException || (exception.getCause() instanceof IOException); if (!alwaysLogAtError && isIOException) { -if (ioExceptionsAtDebugLevel.contains(exception.getMessage())) +if (ioExceptionsAtDebugLevel.stream().anyMatch(ioExceptionAtDebugLevel -> exception.getMessage().contains(ioExceptionAtDebugLevel))) --- End diff -- I'd prefer to avoid the streaming API as it creates a bunch of excess garbage. How about something like this instead: ```java // exceptions thrown from the netty epoll transport add the name of the function that failed // to the exception string (which is simply wrapping a JDK exception), so we can't do a simple/naive comparison String errorMessage = exception.getMessage(); boolean logAtTrace = false; for (String s : ioExceptionsAtDebugLevel) { if (errorMessage.contains(s)) { logAtTrace = true; break; } } if (logAtTrace) { // Likely unclean client disconnects logger.trace(message, exception); } else { // Generally unhandled IO exceptions are network issues, not actual ERRORS logger.info(message, exception); } ``` --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #283: CASSANDRA-14459: DynamicEndpointSnitch should n...
Github user jolynch commented on a diff in the pull request: https://github.com/apache/cassandra/pull/283#discussion_r236776225 --- Diff: src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java --- @@ -154,31 +326,203 @@ private void registerMBean() public void close() { -updateSchedular.cancel(false); -resetSchedular.cancel(false); +if (updateScoresScheduler != null) +updateScoresScheduler.cancel(false); +if (updateSamplesScheduler != null) +updateSamplesScheduler.cancel(false); + +for (AnnotatedMeasurement measurement : samples.values()) +{ +if (measurement.probeFuture != null) +measurement.probeFuture.cancel(false); + +measurement.millisSinceLastMeasure.set(0); +measurement.millisSinceLastRequest.set(MAX_PROBE_INTERVAL_MS); +measurement.probeTimerMillis = 0; +} MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); try { -mbs.unregisterMBean(new ObjectName(mbeanName)); +if (mbeanRegistered) +mbs.unregisterMBean(new ObjectName(mbeanName)); } catch (Exception e) { throw new RuntimeException(e); } } +/** + * Background task running on the samples dictionary. The default implementation sends latency probes (PING) + * messages to explore nodes that we have not received timings for recently but have ranked in + * {@link DynamicEndpointSnitch#sortedByProximity(InetAddressAndPort, ReplicaCollection)}. + */ +protected void updateSamples() +{ +// Split calculation of probe timers from sending probes for testability +calculateProbes(samples, dynamicLatencyProbeInterval); + +if (!StorageService.instance.isGossipActive()) +return; + +schedulePings(samples); +} + +/** + * This method mutates the passed AnnotatedMeasurements to implement capped exponential backoff per endpoint. + * + * The algorithm is as follows: + * 1. All samples get their millisSinceLastMeasure and millisSinceLastRequest fields + *incremented by the passed interval + * 2. Any recently requested (ranked) endpoints that have not been measured recently (e.g. because the snitch + *has sent them no traffic) get probes with exponential backoff. + * + * The backoff is capped at MAX_PROBE_INTERVAL_MS. Furthermore the probes are stopped after + * MAX_PROBE_INTERVAL_MS of no ranking requests as well. + * + * At the end of this method, any passed AnnotatedMeasurements that need latency probes will have non zero + * probeTimerMillis members set. + */ +@VisibleForTesting +static void calculateProbes(Map samples, long intervalMillis) { +for (Map.Entry entry: samples.entrySet()) +{ +if (entry.getKey().equals(FBUtilities.getBroadcastAddressAndPort())) +continue; + +AnnotatedMeasurement measurement = entry.getValue(); +long lastMeasure = measurement.millisSinceLastMeasure.getAndAdd(intervalMillis); +long lastRequest = measurement.millisSinceLastRequest.getAndAdd(intervalMillis); + +if (lastMeasure >= MIN_PROBE_INTERVAL_MS && lastRequest < MAX_PROBE_INTERVAL_MS) +{ +if (measurement.probeTimerMillis == 0) +{ +measurement.probeTimerMillis = intervalMillis; +} +else if (measurement.probeFuture != null && measurement.probeFuture.isDone()) +{ +measurement.probeTimerMillis = Math.min(MAX_PROBE_INTERVAL_MS, measurement.probeTimerMillis * 2); --- End diff -- Ack, +1 --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #283: CASSANDRA-14459: DynamicEndpointSnitch should n...
Github user jolynch commented on a diff in the pull request: https://github.com/apache/cassandra/pull/283#discussion_r236775944 --- Diff: src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java --- @@ -23,128 +23,300 @@ import java.net.UnknownHostException; import java.util.*; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; -import com.codahale.metrics.ExponentiallyDecayingReservoir; import javax.management.MBeanServer; import javax.management.ObjectName; -import com.codahale.metrics.Snapshot; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.RateLimiter; +import com.google.common.util.concurrent.SettableFuture; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor; import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.exceptions.RequestFailureReason; import org.apache.cassandra.gms.ApplicationState; import org.apache.cassandra.gms.EndpointState; import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.gms.VersionedValue; +import org.apache.cassandra.locator.dynamicsnitch.DynamicEndpointSnitchHistogram; +import org.apache.cassandra.net.IAsyncCallbackWithFailure; +import org.apache.cassandra.net.LatencyMeasurementType; +import org.apache.cassandra.net.MessageIn; +import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.net.PingMessage; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.FBUtilities; +import static org.apache.cassandra.net.MessagingService.Verb.PING; +import static org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType.LARGE_MESSAGE; +import static org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType.SMALL_MESSAGE; + + /** * A dynamic snitch that sorts endpoints by latency with an adapted phi failure detector + * Note that the subclasses (e.g. {@link DynamicEndpointSnitchHistogram}) are responsible for actually measuring + * latency and providing an ISnitchMeasurement implementation back to this class. */ -public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILatencySubscriber, DynamicEndpointSnitchMBean +public abstract class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILatencySubscriber, DynamicEndpointSnitchMBean { -private static final boolean USE_SEVERITY = !Boolean.getBoolean("cassandra.ignore_dynamic_snitch_severity"); - -private static final double ALPHA = 0.75; // set to 0.75 to make EDS more biased to towards the newer values -private static final int WINDOW_SIZE = 100; - -private volatile int dynamicUpdateInterval = DatabaseDescriptor.getDynamicUpdateInterval(); -private volatile int dynamicResetInterval = DatabaseDescriptor.getDynamicResetInterval(); -private volatile double dynamicBadnessThreshold = DatabaseDescriptor.getDynamicBadnessThreshold(); +private static final Logger logger = LoggerFactory.getLogger(DynamicEndpointSnitch.class); + +// Latency measurement and ranking. The samples contain latency measurements and the scores are used for ranking +protected boolean registered = false; +protected static final boolean USE_SEVERITY = !Boolean.getBoolean("cassandra.ignore_dynamic_snitch_severity"); +protected volatile Map scores = new HashMap<>(); +protected final Map samples = new ConcurrentHashMap<>(); + +// Latency probe functionality for actively probing endpoints that we haven't measured recently but are ranking +public static final long MAX_PROBE_INTERVAL_MS = Long.getLong("cassandra.dynamic_snitch_max_probe_interval_ms", 60 * 10 * 1000L); +public static final long MIN_PROBE_INTERVAL_MS = Long.getLong("cassandra.dynamic_snitch_min_probe_interval_ms", 60 * 1000L) ; +// The probe rate is set later when configuration is read +protected static final RateLimiter probeRateLimiter = RateLimiter.create(1); +protected static final DebuggableScheduledThreadPoolExecutor latencyProbeExecutor = new DebuggableScheduledThreadPoolExecutor(1, "LatencyProbes", Thread.MIN_PRIORITY); --- End diff --
[GitHub] cassandra pull request #283: CASSANDRA-14459: DynamicEndpointSnitch should n...
Github user jolynch commented on a diff in the pull request: https://github.com/apache/cassandra/pull/283#discussion_r236765727 --- Diff: test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java --- @@ -19,34 +19,104 @@ package org.apache.cassandra.locator; import java.io.IOException; +import java.net.UnknownHostException; import java.util.*; -import org.junit.BeforeClass; +import org.junit.After; +import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import org.apache.cassandra.Util; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.gms.Gossiper; +import org.apache.cassandra.locator.dynamicsnitch.DynamicEndpointSnitchEMA; +import org.apache.cassandra.locator.dynamicsnitch.DynamicEndpointSnitchHistogram; +import org.apache.cassandra.locator.dynamicsnitch.DynamicEndpointSnitchLegacyHistogram; +import org.apache.cassandra.net.LatencyMeasurementType; +import org.apache.cassandra.net.async.TestScheduledFuture; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.FBUtilities; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +@RunWith(Parameterized.class) public class DynamicEndpointSnitchTest { +private static InetAddressAndPort[] hosts; +// Reduce the update interval significantly so that tests run quickly +private static final long UPDATE_INTERVAL_MS = 10; +// Intentially 31 and a little bit instead of 30 seconds flat so this doesn't divide evenly into the default +// MAX_PROBE_INTERVAL_MS. Also pretty high so latency probes don't interfere with the unit tests +private static final long PING_INTERVAL_MS = 31 * 1003; + +private final DynamicEndpointSnitch dsnitch; + +public DynamicEndpointSnitchTest(DynamicEndpointSnitch dsnitch) +{ +this.dsnitch = dsnitch; +} + +@Before +public void prepareDES() +{ +for (InetAddressAndPort host : hosts) +{ +Gossiper.instance.initializeNodeUnsafe(host, UUID.randomUUID(), 1); +Gossiper.instance.realMarkAlive(host, Gossiper.instance.getEndpointStateForEndpoint(host)); +} +dsnitch.reset(); +} -@BeforeClass -public static void setupDD() +@Parameterized.Parameters(name="{index}: {0}") +public static Iterable getDESImplementation() throws UnknownHostException { DatabaseDescriptor.daemonInitialization(); +// do this because SS needs to be initialized before DES can work properly. +StorageService.instance.unsafeInitialize(); + +hosts = new InetAddressAndPort[] { +FBUtilities.getBroadcastAddressAndPort(), +InetAddressAndPort.getByName("127.0.0.2"), +InetAddressAndPort.getByName("127.0.0.3"), +InetAddressAndPort.getByName("127.0.0.4"), +InetAddressAndPort.getByName("127.0.0.5"), +}; + +SimpleSnitch ss1 = new SimpleSnitch(); +DynamicEndpointSnitch probeDES = new DynamicEndpointSnitchHistogram(ss1, String.valueOf(ss1.hashCode())); +probeDES.applyConfigChanges((int) UPDATE_INTERVAL_MS, (int) PING_INTERVAL_MS, DatabaseDescriptor.getDynamicBadnessThreshold()); + +SimpleSnitch ss2 = new SimpleSnitch(); +DynamicEndpointSnitch oldDES = new DynamicEndpointSnitchLegacyHistogram(ss2, String.valueOf(ss2.hashCode())); +oldDES.applyConfigChanges((int) UPDATE_INTERVAL_MS, (int) PING_INTERVAL_MS, DatabaseDescriptor.getDynamicBadnessThreshold()); + +SimpleSnitch ss3 = new SimpleSnitch(); +DynamicEndpointSnitch emaDES = new DynamicEndpointSnitchEMA(ss3, String.valueOf(ss3.hashCode())); +emaDES.applyConfigChanges((int) UPDATE_INTERVAL_MS, (int) PING_INTERVAL_MS, DatabaseDescriptor.getDynamicBadnessThreshold()); + +return Arrays.asList(probeDES, oldDES, emaDES); +} + +@After +public void resetDES() +{ +dsnitch.reset(); } private static void setScores(DynamicEndpointSnitch dsnitch, int rounds, List hosts, Integer... scores) throws InterruptedException { for (int round = 0; round < rounds; round++) { for (int i = 0; i < hosts.size(); i++) -dsnitch.receiveTiming(hosts.get(i), scores[i]); +dsnitch.receiveTiming(hosts.get(i), scores[i], LatencyM
[GitHub] cassandra pull request #283: CASSANDRA-14459: DynamicEndpointSnitch should n...
Github user jolynch commented on a diff in the pull request: https://github.com/apache/cassandra/pull/283#discussion_r236764154 --- Diff: test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java --- @@ -60,57 +130,229 @@ private static EndpointsForRange full(InetAddressAndPort... endpoints) } @Test -public void testSnitch() throws InterruptedException, IOException, ConfigurationException +public void testSortedByProximity() throws InterruptedException, IOException, ConfigurationException { -// do this because SS needs to be initialized before DES can work properly. -StorageService.instance.unsafeInitialize(); -SimpleSnitch ss = new SimpleSnitch(); -DynamicEndpointSnitch dsnitch = new DynamicEndpointSnitch(ss, String.valueOf(ss.hashCode())); -InetAddressAndPort self = FBUtilities.getBroadcastAddressAndPort(); -InetAddressAndPort host1 = InetAddressAndPort.getByName("127.0.0.2"); -InetAddressAndPort host2 = InetAddressAndPort.getByName("127.0.0.3"); -InetAddressAndPort host3 = InetAddressAndPort.getByName("127.0.0.4"); -InetAddressAndPort host4 = InetAddressAndPort.getByName("127.0.0.5"); -List hosts = Arrays.asList(host1, host2, host3); +InetAddressAndPort self = hosts[0]; +List allHosts = Arrays.asList(hosts[1], hosts[2], hosts[3]); // first, make all hosts equal -setScores(dsnitch, 1, hosts, 10, 10, 10); -EndpointsForRange order = full(host1, host2, host3); -Util.assertRCEquals(order, dsnitch.sortedByProximity(self, full(host1, host2, host3))); - -// make host1 a little worse -setScores(dsnitch, 1, hosts, 20, 10, 10); -order = full(host2, host3, host1); -Util.assertRCEquals(order, dsnitch.sortedByProximity(self, full(host1, host2, host3))); - -// make host2 as bad as host1 -setScores(dsnitch, 2, hosts, 15, 20, 10); -order = full(host3, host1, host2); -Util.assertRCEquals(order, dsnitch.sortedByProximity(self, full(host1, host2, host3))); - -// make host3 the worst -setScores(dsnitch, 3, hosts, 10, 10, 30); -order = full(host1, host2, host3); -Util.assertRCEquals(order, dsnitch.sortedByProximity(self, full(host1, host2, host3))); - -// make host3 equal to the others -setScores(dsnitch, 5, hosts, 10, 10, 10); -order = full(host1, host2, host3); -Util.assertRCEquals(order, dsnitch.sortedByProximity(self, full(host1, host2, host3))); +setScores(dsnitch, 1, allHosts, 10, 10, 10); +EndpointsForRange order = full(hosts[1], hosts[2], hosts[3]); +Util.assertRCEquals(order, dsnitch.sortedByProximity(self, full(hosts[1], hosts[2], hosts[3]))); + +// make hosts[1] a little worse +setScores(dsnitch, 2, allHosts, 20, 10, 10); +order = full(hosts[2], hosts[3], hosts[1]); +Util.assertRCEquals(order, dsnitch.sortedByProximity(self, full(hosts[1], hosts[2], hosts[3]))); + +// make hosts[2] as bad as hosts[1] +setScores(dsnitch, 4, allHosts, 15, 20, 10); +order = full(hosts[3], hosts[1], hosts[2]); +Util.assertRCEquals(order, dsnitch.sortedByProximity(self, full(hosts[1], hosts[2], hosts[3]))); + +// make hosts[3] the worst +setScores(dsnitch, 10, allHosts, 10, 10, 30); +order = full(hosts[1], hosts[2], hosts[3]); +Util.assertRCEquals(order, dsnitch.sortedByProximity(self, full(hosts[1], hosts[2], hosts[3]))); + +// make hosts[3] equal to the others +setScores(dsnitch, 15, allHosts, 10, 10, 10); +order = full(hosts[1], hosts[2], hosts[3]); +Util.assertRCEquals(order, dsnitch.sortedByProximity(self, full(hosts[1], hosts[2], hosts[3]))); /// Tests CASSANDRA-6683 improvements // make the scores differ enough from the ideal order that we sort by score; under the old // dynamic snitch behavior (where we only compared neighbors), these wouldn't get sorted -setScores(dsnitch, 20, hosts, 10, 70, 20); -order = full(host1, host3, host2); -Util.assertRCEquals(order, dsnitch.sortedByProximity(self, full(host1, host2, host3))); +dsnitch.reset(); +setScores(dsnitch, 20, allHosts, 10, 70, 20); +order = full(hosts[1], hosts[3], hosts[2]); +Util.assertRCEquals(order, dsnitch.sortedByProximity(self, full(hosts[1], hosts[2], hosts[3]))); + +order = full(hosts[4], hosts[1], hosts[3], hosts[2]); +Util.assertRCEquals(order, dsnitch.sortedByProximity(s
[GitHub] cassandra pull request #283: CASSANDRA-14459: DynamicEndpointSnitch should n...
Github user jolynch commented on a diff in the pull request: https://github.com/apache/cassandra/pull/283#discussion_r236763554 --- Diff: test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java --- @@ -60,57 +130,229 @@ private static EndpointsForRange full(InetAddressAndPort... endpoints) } @Test -public void testSnitch() throws InterruptedException, IOException, ConfigurationException +public void testSortedByProximity() throws InterruptedException, IOException, ConfigurationException { -// do this because SS needs to be initialized before DES can work properly. -StorageService.instance.unsafeInitialize(); -SimpleSnitch ss = new SimpleSnitch(); -DynamicEndpointSnitch dsnitch = new DynamicEndpointSnitch(ss, String.valueOf(ss.hashCode())); -InetAddressAndPort self = FBUtilities.getBroadcastAddressAndPort(); -InetAddressAndPort host1 = InetAddressAndPort.getByName("127.0.0.2"); -InetAddressAndPort host2 = InetAddressAndPort.getByName("127.0.0.3"); -InetAddressAndPort host3 = InetAddressAndPort.getByName("127.0.0.4"); -InetAddressAndPort host4 = InetAddressAndPort.getByName("127.0.0.5"); -List hosts = Arrays.asList(host1, host2, host3); +InetAddressAndPort self = hosts[0]; +List allHosts = Arrays.asList(hosts[1], hosts[2], hosts[3]); // first, make all hosts equal -setScores(dsnitch, 1, hosts, 10, 10, 10); -EndpointsForRange order = full(host1, host2, host3); -Util.assertRCEquals(order, dsnitch.sortedByProximity(self, full(host1, host2, host3))); - -// make host1 a little worse -setScores(dsnitch, 1, hosts, 20, 10, 10); -order = full(host2, host3, host1); -Util.assertRCEquals(order, dsnitch.sortedByProximity(self, full(host1, host2, host3))); - -// make host2 as bad as host1 -setScores(dsnitch, 2, hosts, 15, 20, 10); -order = full(host3, host1, host2); -Util.assertRCEquals(order, dsnitch.sortedByProximity(self, full(host1, host2, host3))); - -// make host3 the worst -setScores(dsnitch, 3, hosts, 10, 10, 30); -order = full(host1, host2, host3); -Util.assertRCEquals(order, dsnitch.sortedByProximity(self, full(host1, host2, host3))); - -// make host3 equal to the others -setScores(dsnitch, 5, hosts, 10, 10, 10); -order = full(host1, host2, host3); -Util.assertRCEquals(order, dsnitch.sortedByProximity(self, full(host1, host2, host3))); +setScores(dsnitch, 1, allHosts, 10, 10, 10); +EndpointsForRange order = full(hosts[1], hosts[2], hosts[3]); +Util.assertRCEquals(order, dsnitch.sortedByProximity(self, full(hosts[1], hosts[2], hosts[3]))); + +// make hosts[1] a little worse +setScores(dsnitch, 2, allHosts, 20, 10, 10); +order = full(hosts[2], hosts[3], hosts[1]); +Util.assertRCEquals(order, dsnitch.sortedByProximity(self, full(hosts[1], hosts[2], hosts[3]))); + +// make hosts[2] as bad as hosts[1] +setScores(dsnitch, 4, allHosts, 15, 20, 10); +order = full(hosts[3], hosts[1], hosts[2]); +Util.assertRCEquals(order, dsnitch.sortedByProximity(self, full(hosts[1], hosts[2], hosts[3]))); + +// make hosts[3] the worst +setScores(dsnitch, 10, allHosts, 10, 10, 30); +order = full(hosts[1], hosts[2], hosts[3]); +Util.assertRCEquals(order, dsnitch.sortedByProximity(self, full(hosts[1], hosts[2], hosts[3]))); + +// make hosts[3] equal to the others +setScores(dsnitch, 15, allHosts, 10, 10, 10); +order = full(hosts[1], hosts[2], hosts[3]); +Util.assertRCEquals(order, dsnitch.sortedByProximity(self, full(hosts[1], hosts[2], hosts[3]))); /// Tests CASSANDRA-6683 improvements // make the scores differ enough from the ideal order that we sort by score; under the old // dynamic snitch behavior (where we only compared neighbors), these wouldn't get sorted -setScores(dsnitch, 20, hosts, 10, 70, 20); -order = full(host1, host3, host2); -Util.assertRCEquals(order, dsnitch.sortedByProximity(self, full(host1, host2, host3))); +dsnitch.reset(); +setScores(dsnitch, 20, allHosts, 10, 70, 20); +order = full(hosts[1], hosts[3], hosts[2]); +Util.assertRCEquals(order, dsnitch.sortedByProximity(self, full(hosts[1], hosts[2], hosts[3]))); + +order = full(hosts[4], hosts[1], hosts[3], hosts[2]); +Util.assertRCEquals(order, dsnitch.sortedByProximity(s
[GitHub] cassandra pull request #283: CASSANDRA-14459: DynamicEndpointSnitch should n...
Github user jolynch commented on a diff in the pull request: https://github.com/apache/cassandra/pull/283#discussion_r236761954 --- Diff: test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java --- @@ -60,57 +130,229 @@ private static EndpointsForRange full(InetAddressAndPort... endpoints) } @Test -public void testSnitch() throws InterruptedException, IOException, ConfigurationException +public void testSortedByProximity() throws InterruptedException, IOException, ConfigurationException { -// do this because SS needs to be initialized before DES can work properly. -StorageService.instance.unsafeInitialize(); -SimpleSnitch ss = new SimpleSnitch(); -DynamicEndpointSnitch dsnitch = new DynamicEndpointSnitch(ss, String.valueOf(ss.hashCode())); -InetAddressAndPort self = FBUtilities.getBroadcastAddressAndPort(); -InetAddressAndPort host1 = InetAddressAndPort.getByName("127.0.0.2"); -InetAddressAndPort host2 = InetAddressAndPort.getByName("127.0.0.3"); -InetAddressAndPort host3 = InetAddressAndPort.getByName("127.0.0.4"); -InetAddressAndPort host4 = InetAddressAndPort.getByName("127.0.0.5"); -List hosts = Arrays.asList(host1, host2, host3); +InetAddressAndPort self = hosts[0]; +List allHosts = Arrays.asList(hosts[1], hosts[2], hosts[3]); // first, make all hosts equal -setScores(dsnitch, 1, hosts, 10, 10, 10); -EndpointsForRange order = full(host1, host2, host3); -Util.assertRCEquals(order, dsnitch.sortedByProximity(self, full(host1, host2, host3))); - -// make host1 a little worse -setScores(dsnitch, 1, hosts, 20, 10, 10); -order = full(host2, host3, host1); -Util.assertRCEquals(order, dsnitch.sortedByProximity(self, full(host1, host2, host3))); - -// make host2 as bad as host1 -setScores(dsnitch, 2, hosts, 15, 20, 10); -order = full(host3, host1, host2); -Util.assertRCEquals(order, dsnitch.sortedByProximity(self, full(host1, host2, host3))); - -// make host3 the worst -setScores(dsnitch, 3, hosts, 10, 10, 30); -order = full(host1, host2, host3); -Util.assertRCEquals(order, dsnitch.sortedByProximity(self, full(host1, host2, host3))); - -// make host3 equal to the others -setScores(dsnitch, 5, hosts, 10, 10, 10); -order = full(host1, host2, host3); -Util.assertRCEquals(order, dsnitch.sortedByProximity(self, full(host1, host2, host3))); +setScores(dsnitch, 1, allHosts, 10, 10, 10); +EndpointsForRange order = full(hosts[1], hosts[2], hosts[3]); +Util.assertRCEquals(order, dsnitch.sortedByProximity(self, full(hosts[1], hosts[2], hosts[3]))); + +// make hosts[1] a little worse +setScores(dsnitch, 2, allHosts, 20, 10, 10); +order = full(hosts[2], hosts[3], hosts[1]); +Util.assertRCEquals(order, dsnitch.sortedByProximity(self, full(hosts[1], hosts[2], hosts[3]))); + +// make hosts[2] as bad as hosts[1] +setScores(dsnitch, 4, allHosts, 15, 20, 10); +order = full(hosts[3], hosts[1], hosts[2]); +Util.assertRCEquals(order, dsnitch.sortedByProximity(self, full(hosts[1], hosts[2], hosts[3]))); + +// make hosts[3] the worst +setScores(dsnitch, 10, allHosts, 10, 10, 30); +order = full(hosts[1], hosts[2], hosts[3]); +Util.assertRCEquals(order, dsnitch.sortedByProximity(self, full(hosts[1], hosts[2], hosts[3]))); + +// make hosts[3] equal to the others +setScores(dsnitch, 15, allHosts, 10, 10, 10); +order = full(hosts[1], hosts[2], hosts[3]); +Util.assertRCEquals(order, dsnitch.sortedByProximity(self, full(hosts[1], hosts[2], hosts[3]))); /// Tests CASSANDRA-6683 improvements // make the scores differ enough from the ideal order that we sort by score; under the old // dynamic snitch behavior (where we only compared neighbors), these wouldn't get sorted -setScores(dsnitch, 20, hosts, 10, 70, 20); -order = full(host1, host3, host2); -Util.assertRCEquals(order, dsnitch.sortedByProximity(self, full(host1, host2, host3))); +dsnitch.reset(); +setScores(dsnitch, 20, allHosts, 10, 70, 20); +order = full(hosts[1], hosts[3], hosts[2]); +Util.assertRCEquals(order, dsnitch.sortedByProximity(self, full(hosts[1], hosts[2], hosts[3]))); + +order = full(hosts[4], hosts[1], hosts[3], hosts[2]); +Util.assertRCEquals(order, dsnitch.sortedByProximity(s
[GitHub] cassandra pull request #283: CASSANDRA-14459: DynamicEndpointSnitch should n...
Github user jolynch commented on a diff in the pull request: https://github.com/apache/cassandra/pull/283#discussion_r236752030 --- Diff: test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java --- @@ -60,57 +130,229 @@ private static EndpointsForRange full(InetAddressAndPort... endpoints) } @Test -public void testSnitch() throws InterruptedException, IOException, ConfigurationException +public void testSortedByProximity() throws InterruptedException, IOException, ConfigurationException { -// do this because SS needs to be initialized before DES can work properly. -StorageService.instance.unsafeInitialize(); -SimpleSnitch ss = new SimpleSnitch(); -DynamicEndpointSnitch dsnitch = new DynamicEndpointSnitch(ss, String.valueOf(ss.hashCode())); -InetAddressAndPort self = FBUtilities.getBroadcastAddressAndPort(); -InetAddressAndPort host1 = InetAddressAndPort.getByName("127.0.0.2"); -InetAddressAndPort host2 = InetAddressAndPort.getByName("127.0.0.3"); -InetAddressAndPort host3 = InetAddressAndPort.getByName("127.0.0.4"); -InetAddressAndPort host4 = InetAddressAndPort.getByName("127.0.0.5"); -List hosts = Arrays.asList(host1, host2, host3); +InetAddressAndPort self = hosts[0]; +List allHosts = Arrays.asList(hosts[1], hosts[2], hosts[3]); // first, make all hosts equal -setScores(dsnitch, 1, hosts, 10, 10, 10); -EndpointsForRange order = full(host1, host2, host3); -Util.assertRCEquals(order, dsnitch.sortedByProximity(self, full(host1, host2, host3))); - -// make host1 a little worse -setScores(dsnitch, 1, hosts, 20, 10, 10); -order = full(host2, host3, host1); -Util.assertRCEquals(order, dsnitch.sortedByProximity(self, full(host1, host2, host3))); - -// make host2 as bad as host1 -setScores(dsnitch, 2, hosts, 15, 20, 10); -order = full(host3, host1, host2); -Util.assertRCEquals(order, dsnitch.sortedByProximity(self, full(host1, host2, host3))); - -// make host3 the worst -setScores(dsnitch, 3, hosts, 10, 10, 30); -order = full(host1, host2, host3); -Util.assertRCEquals(order, dsnitch.sortedByProximity(self, full(host1, host2, host3))); - -// make host3 equal to the others -setScores(dsnitch, 5, hosts, 10, 10, 10); -order = full(host1, host2, host3); -Util.assertRCEquals(order, dsnitch.sortedByProximity(self, full(host1, host2, host3))); +setScores(dsnitch, 1, allHosts, 10, 10, 10); +EndpointsForRange order = full(hosts[1], hosts[2], hosts[3]); +Util.assertRCEquals(order, dsnitch.sortedByProximity(self, full(hosts[1], hosts[2], hosts[3]))); + +// make hosts[1] a little worse +setScores(dsnitch, 2, allHosts, 20, 10, 10); +order = full(hosts[2], hosts[3], hosts[1]); +Util.assertRCEquals(order, dsnitch.sortedByProximity(self, full(hosts[1], hosts[2], hosts[3]))); + +// make hosts[2] as bad as hosts[1] +setScores(dsnitch, 4, allHosts, 15, 20, 10); +order = full(hosts[3], hosts[1], hosts[2]); +Util.assertRCEquals(order, dsnitch.sortedByProximity(self, full(hosts[1], hosts[2], hosts[3]))); + +// make hosts[3] the worst +setScores(dsnitch, 10, allHosts, 10, 10, 30); +order = full(hosts[1], hosts[2], hosts[3]); +Util.assertRCEquals(order, dsnitch.sortedByProximity(self, full(hosts[1], hosts[2], hosts[3]))); + +// make hosts[3] equal to the others +setScores(dsnitch, 15, allHosts, 10, 10, 10); +order = full(hosts[1], hosts[2], hosts[3]); +Util.assertRCEquals(order, dsnitch.sortedByProximity(self, full(hosts[1], hosts[2], hosts[3]))); /// Tests CASSANDRA-6683 improvements // make the scores differ enough from the ideal order that we sort by score; under the old // dynamic snitch behavior (where we only compared neighbors), these wouldn't get sorted -setScores(dsnitch, 20, hosts, 10, 70, 20); -order = full(host1, host3, host2); -Util.assertRCEquals(order, dsnitch.sortedByProximity(self, full(host1, host2, host3))); +dsnitch.reset(); +setScores(dsnitch, 20, allHosts, 10, 70, 20); +order = full(hosts[1], hosts[3], hosts[2]); +Util.assertRCEquals(order, dsnitch.sortedByProximity(self, full(hosts[1], hosts[2], hosts[3]))); + +order = full(hosts[4], hosts[1], hosts[3], hosts[2]); +Util.assertRCEquals(order, dsnitch.sortedByProximity(s
[GitHub] cassandra pull request #288: In BufferPool, make allocating thread receive a...
Github user jonmeredith commented on a diff in the pull request: https://github.com/apache/cassandra/pull/288#discussion_r236362932 --- Diff: src/java/org/apache/cassandra/utils/memory/BufferPool.java --- @@ -237,23 +237,25 @@ void check() /** Return a chunk, the caller will take owership of the parent chunk. */ public Chunk get() { -while (true) -{ -Chunk chunk = chunks.poll(); -if (chunk != null) -return chunk; +Chunk chunk = chunks.poll(); +if (chunk != null) +return chunk; -if (!allocateMoreChunks()) -// give it one last attempt, in case someone else allocated before us -return chunks.poll(); -} +chunk = allocateMoreChunks(); +if (chunk != null) +return chunk; + +/* another thread may have just allocated last macro chunk, so +** make one final attempt before returning null --- End diff -- thx for the pointers, updated to a single line. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #294: Fixing logging of netty client related ioexcept...
GitHub user sumanth-pasupuleti opened a pull request: https://github.com/apache/cassandra/pull/294 Fixing logging of netty client related ioexceptions to be at trace level You can merge this pull request into a Git repository by running: $ git pull https://github.com/sumanth-pasupuleti/cassandra debuglevelioexceptions Alternatively you can review and apply these changes as the patch at: https://github.com/apache/cassandra/pull/294.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #294 commit 3cd3cbe7233123f0c0461b11124a7f45c7df449e Author: sumanthpasupuleti Date: 2018-11-21T17:11:06Z Fixing logging of netty client related ioexceptions to be at trace level --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #293: 14855 - 3.0 backport immediate flusher
Github user belliottsmith commented on a diff in the pull request: https://github.com/apache/cassandra/pull/293#discussion_r235729649 --- Diff: conf/cassandra.yaml --- @@ -1003,3 +1003,9 @@ windows_timer_interval: 1 # An interval of 0 disables any wait time, which is the behavior of former Cassandra versions. # # otc_backlog_expiration_interval_ms: 200 + +# Define use of immediate flusher for replies to TCP connections. This is an alternate simplified flusher that does not +# depend on any event loop scheduling. Details around why this has been backported from trunk: CASSANDRA-14855. +# Default is false. +# native_transport_use_immediate_flusher: false --- End diff -- perhaps native_transport_flush_messages_immediately ? --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #293: 14855 - 3.0 backport immediate flusher
Github user sumanth-pasupuleti commented on a diff in the pull request: https://github.com/apache/cassandra/pull/293#discussion_r235090149 --- Diff: conf/cassandra.yaml --- @@ -1003,3 +1003,9 @@ windows_timer_interval: 1 # An interval of 0 disables any wait time, which is the behavior of former Cassandra versions. # # otc_backlog_expiration_interval_ms: 200 + +# Define use of immediate flusher for replies to TCP connections. This is an alternate simplified flusher that does not +# depend on any event loop scheduling. Details around why this has been backported from trunk: CASSANDRA-14855. +# Default is false. +# native_transport_use_immediate_flusher: false --- End diff -- other more descriptive name I had in mind was "native_transport_use_immediate_flusher_in_place_of_batches_flusher", but felt it could be too long a name --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #293: 14855 - 3.0 backport immediate flusher
GitHub user sumanth-pasupuleti opened a pull request: https://github.com/apache/cassandra/pull/293 14855 - 3.0 backport immediate flusher You can merge this pull request into a Git repository by running: $ git pull https://github.com/sumanth-pasupuleti/cassandra 3.0_backport_immediate_flusher Alternatively you can review and apply these changes as the patch at: https://github.com/apache/cassandra/pull/293.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #293 commit 713825b33c85099484991219e43e863a804ce96b Author: Michael Burman Date: 2018-05-08T12:40:54Z Backporting ImmediateFlusher from trunk commit a6d4e517716a08b72166f226751c7e97d392750a Author: sumanthpasupuleti Date: 2018-11-20T16:17:01Z Making immediate flusher OFF by default --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #292: provide ZSTD compression support to cassandra (...
GitHub user sushmaad opened a pull request: https://github.com/apache/cassandra/pull/292 provide ZSTD compression support to cassandra (CASSANDRA-14482) You can merge this pull request into a Git repository by running: $ git pull https://github.com/sushmaad/cassandra 14482-4.0 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/cassandra/pull/292.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #292 commit fcdf9e66c6d688eff23c5147b5ea94584d871d6d Author: Sushma Devendrappa Date: 2018-11-19T23:43:50Z provide ZSTD compression support to cassandra (CASSANDRA-14482) --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #277: 12106 - blacklisting bad partitions for point r...
Github user sumanth-pasupuleti commented on a diff in the pull request: https://github.com/apache/cassandra/pull/277#discussion_r234413324 --- Diff: src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java --- @@ -307,6 +320,55 @@ public static void setViewRemoved(String keyspaceName, String viewName) forceBlockingFlush(VIEW_BUILD_STATUS); } +/** + * Reads blacklisted partitions from system_distributed.blacklisted_partitions table. + * Stops reading partitions upon exceeding the cache size limit by logging a warning. + * @return + */ +public static Set getBlacklistedPartitions() +{ +String query = "SELECT keyspace_name, columnfamily_name, partition_key FROM %s.%s"; +UntypedResultSet results; +try +{ +results = QueryProcessor.execute(format(query, SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, BLACKLISTED_PARTITIONS), --- End diff -- execute() call can be replaced with executeInternalWithPaging(), but with paging or otherwise, we will still end up holding entire result set in memory through "results" variable here, isn't it? You think it could still help using paging? --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #277: 12106 - blacklisting bad partitions for point r...
Github user sumanth-pasupuleti commented on a diff in the pull request: https://github.com/apache/cassandra/pull/277#discussion_r234413081 --- Diff: src/java/org/apache/cassandra/service/CassandraDaemon.java --- @@ -426,6 +426,9 @@ public void uncaughtException(Thread t, Throwable e) // Native transport nativeTransportService = new NativeTransportService(); +// load blacklisted partitions into cache +StorageService.instance.refreshBlacklistedPartitionsCache(); --- End diff -- yeah, I think the first node of the new version would create the table. Either way, it wouldn't be a catastrophic failure for server start since possible failure during reading blacklisted partitions is handled. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #291: Fix typo
GitHub user kaito0228 opened a pull request: https://github.com/apache/cassandra/pull/291 Fix typo **I fixed two typos.** You can merge this pull request into a Git repository by running: $ git pull https://github.com/kaito0228/cassandra FixTypo Alternatively you can review and apply these changes as the patch at: https://github.com/apache/cassandra/pull/291.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #291 commit 15557ca5566523b7b917dad5b481a0f56da52f12 Author: ryo.s Date: 2018-11-16T15:50:08Z Fix typo --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #212: Rework the wait for healthy logic to count down...
Github user jolynch closed the pull request at: https://github.com/apache/cassandra/pull/212 --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #212: Rework the wait for healthy logic to count down...
Github user aweisberg commented on a diff in the pull request: https://github.com/apache/cassandra/pull/212#discussion_r232731480 --- Diff: src/java/org/apache/cassandra/net/StartupClusterConnectivityChecker.java --- @@ -48,81 +51,133 @@ { private static final Logger logger = LoggerFactory.getLogger(StartupClusterConnectivityChecker.class); -private final int targetPercent; +private final boolean blockForRemoteDcs; private final long timeoutNanos; -public static StartupClusterConnectivityChecker create(int targetPercent, int timeoutSecs) +public static StartupClusterConnectivityChecker create(long timeoutSecs, boolean blockForRemoteDcs) { -timeoutSecs = Math.max(1, timeoutSecs); +if (timeoutSecs < 0) +logger.warn("skipping block-for-peers due to negative timeout. You may encounter errors or timeouts on" + --- End diff -- The reason I thought it was strange was because -1 was a valid config option. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #283: CASSANDRA-14459: DynamicEndpointSnitch should n...
Github user jolynch commented on a diff in the pull request: https://github.com/apache/cassandra/pull/283#discussion_r232436329 --- Diff: src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java --- @@ -154,31 +326,203 @@ private void registerMBean() public void close() { -updateSchedular.cancel(false); -resetSchedular.cancel(false); +if (updateScoresScheduler != null) +updateScoresScheduler.cancel(false); +if (updateSamplesScheduler != null) +updateSamplesScheduler.cancel(false); + +for (AnnotatedMeasurement measurement : samples.values()) +{ +if (measurement.probeFuture != null) +measurement.probeFuture.cancel(false); + +measurement.millisSinceLastMeasure.set(0); +measurement.millisSinceLastRequest.set(MAX_PROBE_INTERVAL_MS); +measurement.probeTimerMillis = 0; +} MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); try { -mbs.unregisterMBean(new ObjectName(mbeanName)); +if (mbeanRegistered) +mbs.unregisterMBean(new ObjectName(mbeanName)); } catch (Exception e) { throw new RuntimeException(e); } } +/** + * Background task running on the samples dictionary. The default implementation sends latency probes (PING) + * messages to explore nodes that we have not received timings for recently but have ranked in + * {@link DynamicEndpointSnitch#sortedByProximity(InetAddressAndPort, ReplicaCollection)}. + */ +protected void updateSamples() +{ +// Split calculation of probe timers from sending probes for testability +calculateProbes(samples, dynamicLatencyProbeInterval); + +if (!StorageService.instance.isGossipActive()) +return; + +schedulePings(samples); --- End diff -- Sure, done. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #283: CASSANDRA-14459: DynamicEndpointSnitch should n...
Github user jolynch commented on a diff in the pull request: https://github.com/apache/cassandra/pull/283#discussion_r232436157 --- Diff: src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java --- @@ -23,128 +23,300 @@ import java.net.UnknownHostException; import java.util.*; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; -import com.codahale.metrics.ExponentiallyDecayingReservoir; import javax.management.MBeanServer; import javax.management.ObjectName; -import com.codahale.metrics.Snapshot; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.RateLimiter; +import com.google.common.util.concurrent.SettableFuture; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor; import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.exceptions.RequestFailureReason; import org.apache.cassandra.gms.ApplicationState; import org.apache.cassandra.gms.EndpointState; import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.gms.VersionedValue; +import org.apache.cassandra.locator.dynamicsnitch.DynamicEndpointSnitchHistogram; +import org.apache.cassandra.net.IAsyncCallbackWithFailure; +import org.apache.cassandra.net.LatencyMeasurementType; +import org.apache.cassandra.net.MessageIn; +import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.net.PingMessage; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.FBUtilities; +import static org.apache.cassandra.net.MessagingService.Verb.PING; +import static org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType.LARGE_MESSAGE; +import static org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType.SMALL_MESSAGE; + + /** * A dynamic snitch that sorts endpoints by latency with an adapted phi failure detector + * Note that the subclasses (e.g. {@link DynamicEndpointSnitchHistogram}) are responsible for actually measuring + * latency and providing an ISnitchMeasurement implementation back to this class. */ -public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILatencySubscriber, DynamicEndpointSnitchMBean +public abstract class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILatencySubscriber, DynamicEndpointSnitchMBean { -private static final boolean USE_SEVERITY = !Boolean.getBoolean("cassandra.ignore_dynamic_snitch_severity"); - -private static final double ALPHA = 0.75; // set to 0.75 to make EDS more biased to towards the newer values -private static final int WINDOW_SIZE = 100; - -private volatile int dynamicUpdateInterval = DatabaseDescriptor.getDynamicUpdateInterval(); -private volatile int dynamicResetInterval = DatabaseDescriptor.getDynamicResetInterval(); -private volatile double dynamicBadnessThreshold = DatabaseDescriptor.getDynamicBadnessThreshold(); +private static final Logger logger = LoggerFactory.getLogger(DynamicEndpointSnitch.class); + +// Latency measurement and ranking. The samples contain latency measurements and the scores are used for ranking +protected boolean registered = false; +protected static final boolean USE_SEVERITY = !Boolean.getBoolean("cassandra.ignore_dynamic_snitch_severity"); +protected volatile Map scores = new HashMap<>(); +protected final Map samples = new ConcurrentHashMap<>(); + +// Latency probe functionality for actively probing endpoints that we haven't measured recently but are ranking +public static final long MAX_PROBE_INTERVAL_MS = Long.getLong("cassandra.dynamic_snitch_max_probe_interval_ms", 60 * 10 * 1000L); +public static final long MIN_PROBE_INTERVAL_MS = Long.getLong("cassandra.dynamic_snitch_min_probe_interval_ms", 60 * 1000L) ; +// The probe rate is set later when configuration is read +protected static final RateLimiter probeRateLimiter = RateLimiter.create(1); +protected static final DebuggableScheduledThreadPoolExecutor latencyProbeExecutor = new DebuggableScheduledThreadPoolExecutor(1, "LatencyProbes", Thread.MIN_PRIORITY); + +// User confi
[GitHub] cassandra pull request #283: CASSANDRA-14459: DynamicEndpointSnitch should n...
Github user jolynch commented on a diff in the pull request: https://github.com/apache/cassandra/pull/283#discussion_r232435926 --- Diff: src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java --- @@ -23,128 +23,300 @@ import java.net.UnknownHostException; import java.util.*; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; -import com.codahale.metrics.ExponentiallyDecayingReservoir; import javax.management.MBeanServer; import javax.management.ObjectName; -import com.codahale.metrics.Snapshot; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.RateLimiter; +import com.google.common.util.concurrent.SettableFuture; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor; import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.exceptions.RequestFailureReason; import org.apache.cassandra.gms.ApplicationState; import org.apache.cassandra.gms.EndpointState; import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.gms.VersionedValue; +import org.apache.cassandra.locator.dynamicsnitch.DynamicEndpointSnitchHistogram; +import org.apache.cassandra.net.IAsyncCallbackWithFailure; +import org.apache.cassandra.net.LatencyMeasurementType; +import org.apache.cassandra.net.MessageIn; +import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.net.PingMessage; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.FBUtilities; +import static org.apache.cassandra.net.MessagingService.Verb.PING; +import static org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType.LARGE_MESSAGE; +import static org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType.SMALL_MESSAGE; + + /** * A dynamic snitch that sorts endpoints by latency with an adapted phi failure detector + * Note that the subclasses (e.g. {@link DynamicEndpointSnitchHistogram}) are responsible for actually measuring + * latency and providing an ISnitchMeasurement implementation back to this class. */ -public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILatencySubscriber, DynamicEndpointSnitchMBean +public abstract class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILatencySubscriber, DynamicEndpointSnitchMBean { -private static final boolean USE_SEVERITY = !Boolean.getBoolean("cassandra.ignore_dynamic_snitch_severity"); - -private static final double ALPHA = 0.75; // set to 0.75 to make EDS more biased to towards the newer values -private static final int WINDOW_SIZE = 100; - -private volatile int dynamicUpdateInterval = DatabaseDescriptor.getDynamicUpdateInterval(); -private volatile int dynamicResetInterval = DatabaseDescriptor.getDynamicResetInterval(); -private volatile double dynamicBadnessThreshold = DatabaseDescriptor.getDynamicBadnessThreshold(); +private static final Logger logger = LoggerFactory.getLogger(DynamicEndpointSnitch.class); + +// Latency measurement and ranking. The samples contain latency measurements and the scores are used for ranking +protected boolean registered = false; +protected static final boolean USE_SEVERITY = !Boolean.getBoolean("cassandra.ignore_dynamic_snitch_severity"); +protected volatile Map scores = new HashMap<>(); +protected final Map samples = new ConcurrentHashMap<>(); + +// Latency probe functionality for actively probing endpoints that we haven't measured recently but are ranking +public static final long MAX_PROBE_INTERVAL_MS = Long.getLong("cassandra.dynamic_snitch_max_probe_interval_ms", 60 * 10 * 1000L); +public static final long MIN_PROBE_INTERVAL_MS = Long.getLong("cassandra.dynamic_snitch_min_probe_interval_ms", 60 * 1000L) ; +// The probe rate is set later when configuration is read +protected static final RateLimiter probeRateLimiter = RateLimiter.create(1); +protected static final DebuggableScheduledThreadPoolExecutor latencyProbeExecutor = new DebuggableScheduledThreadPoolExecutor(1, "LatencyProbes", Thread.MIN_PRIORITY); + +// User confi
[GitHub] cassandra pull request #283: CASSANDRA-14459: DynamicEndpointSnitch should n...
Github user jolynch commented on a diff in the pull request: https://github.com/apache/cassandra/pull/283#discussion_r232435703 --- Diff: src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java --- @@ -23,128 +23,300 @@ import java.net.UnknownHostException; import java.util.*; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; -import com.codahale.metrics.ExponentiallyDecayingReservoir; import javax.management.MBeanServer; import javax.management.ObjectName; -import com.codahale.metrics.Snapshot; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.RateLimiter; +import com.google.common.util.concurrent.SettableFuture; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor; import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.exceptions.RequestFailureReason; import org.apache.cassandra.gms.ApplicationState; import org.apache.cassandra.gms.EndpointState; import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.gms.VersionedValue; +import org.apache.cassandra.locator.dynamicsnitch.DynamicEndpointSnitchHistogram; +import org.apache.cassandra.net.IAsyncCallbackWithFailure; +import org.apache.cassandra.net.LatencyMeasurementType; +import org.apache.cassandra.net.MessageIn; +import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.net.PingMessage; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.FBUtilities; +import static org.apache.cassandra.net.MessagingService.Verb.PING; +import static org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType.LARGE_MESSAGE; +import static org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType.SMALL_MESSAGE; + + /** * A dynamic snitch that sorts endpoints by latency with an adapted phi failure detector + * Note that the subclasses (e.g. {@link DynamicEndpointSnitchHistogram}) are responsible for actually measuring + * latency and providing an ISnitchMeasurement implementation back to this class. */ -public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILatencySubscriber, DynamicEndpointSnitchMBean +public abstract class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILatencySubscriber, DynamicEndpointSnitchMBean { -private static final boolean USE_SEVERITY = !Boolean.getBoolean("cassandra.ignore_dynamic_snitch_severity"); - -private static final double ALPHA = 0.75; // set to 0.75 to make EDS more biased to towards the newer values -private static final int WINDOW_SIZE = 100; - -private volatile int dynamicUpdateInterval = DatabaseDescriptor.getDynamicUpdateInterval(); -private volatile int dynamicResetInterval = DatabaseDescriptor.getDynamicResetInterval(); -private volatile double dynamicBadnessThreshold = DatabaseDescriptor.getDynamicBadnessThreshold(); +private static final Logger logger = LoggerFactory.getLogger(DynamicEndpointSnitch.class); + +// Latency measurement and ranking. The samples contain latency measurements and the scores are used for ranking +protected boolean registered = false; +protected static final boolean USE_SEVERITY = !Boolean.getBoolean("cassandra.ignore_dynamic_snitch_severity"); +protected volatile Map scores = new HashMap<>(); +protected final Map samples = new ConcurrentHashMap<>(); + +// Latency probe functionality for actively probing endpoints that we haven't measured recently but are ranking +public static final long MAX_PROBE_INTERVAL_MS = Long.getLong("cassandra.dynamic_snitch_max_probe_interval_ms", 60 * 10 * 1000L); +public static final long MIN_PROBE_INTERVAL_MS = Long.getLong("cassandra.dynamic_snitch_min_probe_interval_ms", 60 * 1000L) ; +// The probe rate is set later when configuration is read +protected static final RateLimiter probeRateLimiter = RateLimiter.create(1); +protected static final DebuggableScheduledThreadPoolExecutor latencyProbeExecutor = new DebuggableScheduledThreadPoolExecutor(1, "LatencyProbes", Thread.MIN_PRIORITY); + +// User confi
[GitHub] cassandra pull request #283: CASSANDRA-14459: DynamicEndpointSnitch should n...
Github user jolynch commented on a diff in the pull request: https://github.com/apache/cassandra/pull/283#discussion_r232435653 --- Diff: src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java --- @@ -23,128 +23,300 @@ import java.net.UnknownHostException; import java.util.*; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; -import com.codahale.metrics.ExponentiallyDecayingReservoir; import javax.management.MBeanServer; import javax.management.ObjectName; -import com.codahale.metrics.Snapshot; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.RateLimiter; +import com.google.common.util.concurrent.SettableFuture; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor; import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.exceptions.RequestFailureReason; import org.apache.cassandra.gms.ApplicationState; import org.apache.cassandra.gms.EndpointState; import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.gms.VersionedValue; +import org.apache.cassandra.locator.dynamicsnitch.DynamicEndpointSnitchHistogram; +import org.apache.cassandra.net.IAsyncCallbackWithFailure; +import org.apache.cassandra.net.LatencyMeasurementType; +import org.apache.cassandra.net.MessageIn; +import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.net.PingMessage; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.FBUtilities; +import static org.apache.cassandra.net.MessagingService.Verb.PING; +import static org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType.LARGE_MESSAGE; +import static org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType.SMALL_MESSAGE; + + /** * A dynamic snitch that sorts endpoints by latency with an adapted phi failure detector + * Note that the subclasses (e.g. {@link DynamicEndpointSnitchHistogram}) are responsible for actually measuring + * latency and providing an ISnitchMeasurement implementation back to this class. */ -public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILatencySubscriber, DynamicEndpointSnitchMBean +public abstract class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILatencySubscriber, DynamicEndpointSnitchMBean { -private static final boolean USE_SEVERITY = !Boolean.getBoolean("cassandra.ignore_dynamic_snitch_severity"); - -private static final double ALPHA = 0.75; // set to 0.75 to make EDS more biased to towards the newer values -private static final int WINDOW_SIZE = 100; - -private volatile int dynamicUpdateInterval = DatabaseDescriptor.getDynamicUpdateInterval(); -private volatile int dynamicResetInterval = DatabaseDescriptor.getDynamicResetInterval(); -private volatile double dynamicBadnessThreshold = DatabaseDescriptor.getDynamicBadnessThreshold(); +private static final Logger logger = LoggerFactory.getLogger(DynamicEndpointSnitch.class); + +// Latency measurement and ranking. The samples contain latency measurements and the scores are used for ranking +protected boolean registered = false; +protected static final boolean USE_SEVERITY = !Boolean.getBoolean("cassandra.ignore_dynamic_snitch_severity"); +protected volatile Map scores = new HashMap<>(); +protected final Map samples = new ConcurrentHashMap<>(); + +// Latency probe functionality for actively probing endpoints that we haven't measured recently but are ranking +public static final long MAX_PROBE_INTERVAL_MS = Long.getLong("cassandra.dynamic_snitch_max_probe_interval_ms", 60 * 10 * 1000L); +public static final long MIN_PROBE_INTERVAL_MS = Long.getLong("cassandra.dynamic_snitch_min_probe_interval_ms", 60 * 1000L) ; +// The probe rate is set later when configuration is read +protected static final RateLimiter probeRateLimiter = RateLimiter.create(1); +protected static final DebuggableScheduledThreadPoolExecutor latencyProbeExecutor = new DebuggableScheduledThreadPoolExecutor(1, "LatencyProbes", Thread.MIN_PRIORITY); + +// User confi
[GitHub] cassandra pull request #284: Expose schema in virtual table for CASSANDRA-14...
Github user clohfink commented on a diff in the pull request: https://github.com/apache/cassandra/pull/284#discussion_r232325693 --- Diff: src/java/org/apache/cassandra/db/virtual/DescribeTables.java --- @@ -0,0 +1,306 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.virtual; + +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.common.collect.Streams; + +import org.apache.cassandra.cql3.functions.Function; +import org.apache.cassandra.cql3.functions.FunctionName; +import org.apache.cassandra.cql3.functions.UDAggregate; +import org.apache.cassandra.cql3.functions.UDFunction; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.SchemaCQLHelper; +import org.apache.cassandra.db.marshal.CompositeType; +import org.apache.cassandra.db.marshal.UTF8Type; +import org.apache.cassandra.db.marshal.UserType; +import org.apache.cassandra.dht.LocalPartitioner; +import org.apache.cassandra.schema.Functions; +import org.apache.cassandra.schema.IndexMetadata; +import org.apache.cassandra.schema.Schema; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.schema.Types; +import org.apache.cassandra.schema.ViewMetadata; +import org.apache.cassandra.utils.ByteBufferUtil; + +public class DescribeTables +{ +private static final String KEYSPACE = "keyspace_name"; +private static final String CQL = "cql"; + +private static final CompositeType utfComposite = CompositeType.getInstance(UTF8Type.instance, UTF8Type.instance); + +public static Collection getAll(String name) +{ +return ImmutableList.of(new DescribeKeyspaceTable(name), +new DescribeIndexesTable(name), +new DescribeTypesTable(name), +new DescribeAggregatesTable(name), +new DescribeFunctionsTable(name), +new DescribeViewsTable(name), +new DescribeTablesTable(name)); +} + +static final class DescribeKeyspaceTable extends AbstractVirtualTable +{ +DescribeKeyspaceTable(String keyspace) +{ +super(TableMetadata.builder(keyspace, "describe_keyspace") + .comment("cql for keyspace metadata") + .kind(TableMetadata.Kind.VIRTUAL) + .partitioner(new LocalPartitioner(UTF8Type.instance)) + .addPartitionKeyColumn(KEYSPACE, UTF8Type.instance) + .addRegularColumn(CQL, UTF8Type.instance) + .build()); +} + +@Override +public DataSet data(DecoratedKey partitionKey) +{ +String keyspace = UTF8Type.instance.compose(partitionKey.getKey()); + +SimpleDataSet result = new SimpleDataSet(metadata()); +result.row(keyspace) + .column(CQL, SchemaCQLHelper.getKeyspaceAsCQL(keyspace)); +return result; +} + +public DataSet data() +{ +SimpleDataSet result = new SimpleDataSet(metadata()); +for (String keyspace : Schema.instance.getKeyspaces()) +{ +result.row(keyspace) + .column(CQL, SchemaCQLHelper.getKeyspaceAsCQL(keyspace)); +} +return result; +} +} + +static abstract class AbstractDescribeTable extends AbstractVirtualTable
[GitHub] cassandra pull request #284: Expose schema in virtual table for CASSANDRA-14...
Github user JeremiahDJordan commented on a diff in the pull request: https://github.com/apache/cassandra/pull/284#discussion_r232303844 --- Diff: src/java/org/apache/cassandra/db/virtual/DescribeTables.java --- @@ -0,0 +1,306 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.virtual; + +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.common.collect.Streams; + +import org.apache.cassandra.cql3.functions.Function; +import org.apache.cassandra.cql3.functions.FunctionName; +import org.apache.cassandra.cql3.functions.UDAggregate; +import org.apache.cassandra.cql3.functions.UDFunction; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.SchemaCQLHelper; +import org.apache.cassandra.db.marshal.CompositeType; +import org.apache.cassandra.db.marshal.UTF8Type; +import org.apache.cassandra.db.marshal.UserType; +import org.apache.cassandra.dht.LocalPartitioner; +import org.apache.cassandra.schema.Functions; +import org.apache.cassandra.schema.IndexMetadata; +import org.apache.cassandra.schema.Schema; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.schema.Types; +import org.apache.cassandra.schema.ViewMetadata; +import org.apache.cassandra.utils.ByteBufferUtil; + +public class DescribeTables +{ +private static final String KEYSPACE = "keyspace_name"; +private static final String CQL = "cql"; + +private static final CompositeType utfComposite = CompositeType.getInstance(UTF8Type.instance, UTF8Type.instance); + +public static Collection getAll(String name) +{ +return ImmutableList.of(new DescribeKeyspaceTable(name), +new DescribeIndexesTable(name), +new DescribeTypesTable(name), +new DescribeAggregatesTable(name), +new DescribeFunctionsTable(name), +new DescribeViewsTable(name), +new DescribeTablesTable(name)); +} + +static final class DescribeKeyspaceTable extends AbstractVirtualTable +{ +DescribeKeyspaceTable(String keyspace) +{ +super(TableMetadata.builder(keyspace, "describe_keyspace") + .comment("cql for keyspace metadata") + .kind(TableMetadata.Kind.VIRTUAL) + .partitioner(new LocalPartitioner(UTF8Type.instance)) + .addPartitionKeyColumn(KEYSPACE, UTF8Type.instance) + .addRegularColumn(CQL, UTF8Type.instance) + .build()); +} + +@Override +public DataSet data(DecoratedKey partitionKey) +{ +String keyspace = UTF8Type.instance.compose(partitionKey.getKey()); + +SimpleDataSet result = new SimpleDataSet(metadata()); +result.row(keyspace) + .column(CQL, SchemaCQLHelper.getKeyspaceAsCQL(keyspace)); +return result; +} + +public DataSet data() +{ +SimpleDataSet result = new SimpleDataSet(metadata()); +for (String keyspace : Schema.instance.getKeyspaces()) +{ +result.row(keyspace) + .column(CQL, SchemaCQLHelper.getKeyspaceAsCQL(keyspace)); +} +return result; +} +} + +static abstract class AbstractDescribeTable extends AbstractVirtualTa
[GitHub] cassandra pull request #212: Rework the wait for healthy logic to count down...
Github user jolynch commented on a diff in the pull request: https://github.com/apache/cassandra/pull/212#discussion_r231711085 --- Diff: src/java/org/apache/cassandra/config/Config.java --- @@ -376,9 +376,31 @@ public String full_query_log_dir = null; -// parameters to adjust how much to delay startup until a certain amount of the cluster is connect to and marked alive -public int block_for_peers_percentage = 70; +/** + * When a node first starts up it intially thinks all other peers are DOWN, and then as the initial gossip + * broadcast messages comes back nodes transition to UP. These options configure how many nodes can remain in + * DOWN state before we make this node available as a coordinator, as well as an overall timeout on this process + * to ensure that startup is not delayed too much. + * + * The defaults are tuned for LOCAL_ONE consistency levels with RF=3, and have natural settings for other CLs: + * + * Consistency Level | local_dc | all_dcs + * + * LOCAL_ONE | default (2) | default (any) + * LOCAL_QUORUM | 1| default (any) + * ONE | any | RF - 1 + * QUORUM| any | (RF / 2) - 1 + * ALL | default | 0 + * + * A concrete example with QUORUM would be if you have 3 replicas in 2 datacenters, then you would set + * block_for_peers_all_dcs to (6 / 2) - 1 = 2 because that guarantees that at most 2 hosts in all datacenters + * are down when you start taking client traffic, which should satistfy QUORUM for all RF=6 QUORUM queries. + */ +public int block_for_peers_local_dc = 2; --- End diff -- We ended up going the direction of a boolean. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #212: Rework the wait for healthy logic to count down...
Github user jolynch commented on a diff in the pull request: https://github.com/apache/cassandra/pull/212#discussion_r231705256 --- Diff: test/unit/org/apache/cassandra/net/StartupClusterConnectivityCheckerTest.java --- @@ -69,33 +113,102 @@ public void tearDown() @Test public void execute_HappyPath() { -Sink sink = new Sink(true, true); +Sink sink = new Sink(true, true, peers); MessagingService.instance().addMessageSink(sink); -Assert.assertTrue(connectivityChecker.execute(peers)); +Assert.assertTrue(localQuorumConnectivityChecker.execute(peers, this::getDatacenter)); checkAllConnectionTypesSeen(sink); } @Test public void execute_NotAlive() { -Sink sink = new Sink(false, true); +Sink sink = new Sink(false, true, peers); MessagingService.instance().addMessageSink(sink); -Assert.assertFalse(connectivityChecker.execute(peers)); +Assert.assertFalse(localQuorumConnectivityChecker.execute(peers, this::getDatacenter)); checkAllConnectionTypesSeen(sink); } @Test public void execute_NoConnectionsAcks() { -Sink sink = new Sink(true, false); +Sink sink = new Sink(true, false, peers); MessagingService.instance().addMessageSink(sink); -Assert.assertFalse(connectivityChecker.execute(peers)); +Assert.assertFalse(localQuorumConnectivityChecker.execute(peers, this::getDatacenter)); +} + +@Test +public void execute_LocalQuorum() +{ +// local peer plus 3 peers from same dc shouldn't pass (4/6) +Set available = new HashSet<>(); +copyCount(peersAMinusLocal, available, NUM_PER_DC - 3); +checkAvailable(localQuorumConnectivityChecker, available, false, true); + +// local peer plus 4 peers from same dc should pass (5/6) +available.clear(); +copyCount(peersAMinusLocal, available, NUM_PER_DC - 2); +checkAvailable(localQuorumConnectivityChecker, available, true, true); +} + +@Test +public void execute_GlobalQuorum() +{ +// local dc passing shouldn'nt pass globally with two hosts down in datacenterB +Set available = new HashSet<>(); +copyCount(peersAMinusLocal, available, NUM_PER_DC - 2); +copyCount(peersB, available, NUM_PER_DC - 2); +copyCount(peersC, available, NUM_PER_DC - 1); +checkAvailable(globalQuorumConnectivityChecker, available, false, true); + +available.clear(); +copyCount(peersAMinusLocal, available, NUM_PER_DC -2); +copyCount(peersB, available, NUM_PER_DC - 1); +copyCount(peersC, available, NUM_PER_DC - 1); +checkAvailable(globalQuorumConnectivityChecker, available, true, true); +} + +@Test +public void execute_Noop() +{ +checkAvailable(noopChecker, new HashSet<>(), true, false); +} + +@Test +public void execute_ZeroWait() +{ +checkAvailable(zeroWaitChecker, new HashSet<>(), false, false); --- End diff -- Makes sense, I refactored this test to look at that instead of just returning immediately. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #212: Rework the wait for healthy logic to count down...
Github user jolynch commented on a diff in the pull request: https://github.com/apache/cassandra/pull/212#discussion_r231697341 --- Diff: test/unit/org/apache/cassandra/net/StartupClusterConnectivityCheckerTest.java --- @@ -69,33 +113,102 @@ public void tearDown() @Test public void execute_HappyPath() { -Sink sink = new Sink(true, true); +Sink sink = new Sink(true, true, peers); MessagingService.instance().addMessageSink(sink); -Assert.assertTrue(connectivityChecker.execute(peers)); +Assert.assertTrue(localQuorumConnectivityChecker.execute(peers, this::getDatacenter)); checkAllConnectionTypesSeen(sink); } @Test public void execute_NotAlive() { -Sink sink = new Sink(false, true); +Sink sink = new Sink(false, true, peers); MessagingService.instance().addMessageSink(sink); -Assert.assertFalse(connectivityChecker.execute(peers)); +Assert.assertFalse(localQuorumConnectivityChecker.execute(peers, this::getDatacenter)); checkAllConnectionTypesSeen(sink); } @Test public void execute_NoConnectionsAcks() { -Sink sink = new Sink(true, false); +Sink sink = new Sink(true, false, peers); MessagingService.instance().addMessageSink(sink); -Assert.assertFalse(connectivityChecker.execute(peers)); +Assert.assertFalse(localQuorumConnectivityChecker.execute(peers, this::getDatacenter)); +} + +@Test +public void execute_LocalQuorum() +{ +// local peer plus 3 peers from same dc shouldn't pass (4/6) +Set available = new HashSet<>(); +copyCount(peersAMinusLocal, available, NUM_PER_DC - 3); +checkAvailable(localQuorumConnectivityChecker, available, false, true); + +// local peer plus 4 peers from same dc should pass (5/6) +available.clear(); +copyCount(peersAMinusLocal, available, NUM_PER_DC - 2); +checkAvailable(localQuorumConnectivityChecker, available, true, true); +} + +@Test +public void execute_GlobalQuorum() +{ +// local dc passing shouldn'nt pass globally with two hosts down in datacenterB --- End diff -- Ack, fixed. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #212: Rework the wait for healthy logic to count down...
Github user jolynch commented on a diff in the pull request: https://github.com/apache/cassandra/pull/212#discussion_r231696993 --- Diff: test/unit/org/apache/cassandra/net/StartupClusterConnectivityCheckerTest.java --- @@ -36,13 +36,34 @@ import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.gms.HeartBeatState; import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.utils.FBUtilities; import static org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType.SMALL_MESSAGE; public class StartupClusterConnectivityCheckerTest { -private StartupClusterConnectivityChecker connectivityChecker; +private StartupClusterConnectivityChecker localQuorumConnectivityChecker; +private StartupClusterConnectivityChecker globalQuorumConnectivityChecker; +private StartupClusterConnectivityChecker noopChecker; +private StartupClusterConnectivityChecker zeroWaitChecker; + +private static final int NUM_PER_DC = 6; private Set peers; +private Set peersA; +private Set peersAMinusLocal; +private Set peersB; +private Set peersC; + +private String getDatacenter(InetAddressAndPort endpoint) +{ +if (peersA.contains(endpoint)) +return "datacenterA"; +if (peersB.contains(endpoint)) +return "datacenterB"; +else if (peersC.contains(endpoint)) +return "datacenterC"; +return "NA"; --- End diff -- Right, so it'll fail fast, done. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #212: Rework the wait for healthy logic to count down...
Github user jolynch commented on a diff in the pull request: https://github.com/apache/cassandra/pull/212#discussion_r231696207 --- Diff: src/java/org/apache/cassandra/net/StartupClusterConnectivityChecker.java --- @@ -48,81 +51,133 @@ { private static final Logger logger = LoggerFactory.getLogger(StartupClusterConnectivityChecker.class); -private final int targetPercent; +private final boolean blockForRemoteDcs; private final long timeoutNanos; -public static StartupClusterConnectivityChecker create(int targetPercent, int timeoutSecs) +public static StartupClusterConnectivityChecker create(long timeoutSecs, boolean blockForRemoteDcs) { -timeoutSecs = Math.max(1, timeoutSecs); +if (timeoutSecs < 0) +logger.warn("skipping block-for-peers due to negative timeout. You may encounter errors or timeouts on" + +" the first user query"); if (timeoutSecs > 100) logger.warn("setting the block-for-peers timeout (in seconds) to {} might be a bit excessive, but using it nonetheless", timeoutSecs); long timeoutNanos = TimeUnit.SECONDS.toNanos(timeoutSecs); -return new StartupClusterConnectivityChecker(targetPercent, timeoutNanos); +return new StartupClusterConnectivityChecker(timeoutNanos, blockForRemoteDcs); } @VisibleForTesting -StartupClusterConnectivityChecker(int targetPercent, long timeoutNanos) +StartupClusterConnectivityChecker(long timeoutNanos, boolean blockForRemoteDcs) { -this.targetPercent = Math.min(100, Math.max(0, targetPercent)); +this.blockForRemoteDcs = blockForRemoteDcs; this.timeoutNanos = timeoutNanos; } /** * @param peers The currently known peers in the cluster; argument is not modified. + * @param getDatacenterSource A function for mapping peers to their datacenter. * @return true if the requested percentage of peers are marked ALIVE in gossip and have their connections opened; * else false. */ -public boolean execute(Set peers) +public boolean execute(Set peers, Function getDatacenterSource) { -if (targetPercent == 0 || peers == null) +if (peers == null || this.timeoutNanos < 0) return true; // make a copy of the set, to avoid mucking with the input (in case it's a sensitive collection) peers = new HashSet<>(peers); -peers.remove(FBUtilities.getBroadcastAddressAndPort()); +InetAddressAndPort localAddress = FBUtilities.getBroadcastAddressAndPort(); +String localDc = getDatacenterSource.apply(localAddress); +peers.remove(localAddress); if (peers.isEmpty()) return true; -logger.info("choosing to block until {}% of the {} known peers are marked alive and connections are established; max time to wait = {} seconds", -targetPercent, peers.size(), TimeUnit.NANOSECONDS.toSeconds(timeoutNanos)); +// make a copy of the datacenter mapping (in case gossip updates happen during this method or some such) +Map datacenterMap = peers.stream() + .collect(Collectors.toMap(k -> k, getDatacenterSource)); +Function getDatacenter = datacenterMap::get; -long startNanos = System.nanoTime(); +Map> peersByDc = peers.stream() + .collect(Collectors.groupingBy(getDatacenter, + Collectors.toSet())); + +if (!blockForRemoteDcs) +{ +peersByDc.keySet().retainAll(Collections.singleton(localDc)); +logger.info("Blocking coordination until only a single peer is DOWN in the local datacenter, timeout={}s", +TimeUnit.NANOSECONDS.toSeconds(timeoutNanos)); +} +else +{ +logger.info("Blocking coordination until only a single peer is DOWN in each datacenter, timeout={}s", +TimeUnit.NANOSECONDS.toSeconds(timeoutNanos)); +} AckMap acks = new AckMap(3); -int target = (int) ((targetPercent / 100.0) * peers.size()); -CountDownLatch latch = new CountDownLatch(target); +Map latchMap = new HashMap<>(peersByDc.size()); +for (Map.Entry> entry: peersByDc.entrySet()) +{ +latchMap.put(entry.getKey(), new CountDownLatch(Math.max(entry.getValue().size() - 1, 0))); +
[GitHub] cassandra pull request #212: Rework the wait for healthy logic to count down...
Github user jolynch commented on a diff in the pull request: https://github.com/apache/cassandra/pull/212#discussion_r231695792 --- Diff: src/java/org/apache/cassandra/net/StartupClusterConnectivityChecker.java --- @@ -48,81 +51,133 @@ { private static final Logger logger = LoggerFactory.getLogger(StartupClusterConnectivityChecker.class); -private final int targetPercent; +private final boolean blockForRemoteDcs; private final long timeoutNanos; -public static StartupClusterConnectivityChecker create(int targetPercent, int timeoutSecs) +public static StartupClusterConnectivityChecker create(long timeoutSecs, boolean blockForRemoteDcs) { -timeoutSecs = Math.max(1, timeoutSecs); +if (timeoutSecs < 0) +logger.warn("skipping block-for-peers due to negative timeout. You may encounter errors or timeouts on" + +" the first user query"); if (timeoutSecs > 100) logger.warn("setting the block-for-peers timeout (in seconds) to {} might be a bit excessive, but using it nonetheless", timeoutSecs); long timeoutNanos = TimeUnit.SECONDS.toNanos(timeoutSecs); -return new StartupClusterConnectivityChecker(targetPercent, timeoutNanos); +return new StartupClusterConnectivityChecker(timeoutNanos, blockForRemoteDcs); } @VisibleForTesting -StartupClusterConnectivityChecker(int targetPercent, long timeoutNanos) +StartupClusterConnectivityChecker(long timeoutNanos, boolean blockForRemoteDcs) { -this.targetPercent = Math.min(100, Math.max(0, targetPercent)); +this.blockForRemoteDcs = blockForRemoteDcs; this.timeoutNanos = timeoutNanos; } /** * @param peers The currently known peers in the cluster; argument is not modified. + * @param getDatacenterSource A function for mapping peers to their datacenter. * @return true if the requested percentage of peers are marked ALIVE in gossip and have their connections opened; * else false. */ -public boolean execute(Set peers) +public boolean execute(Set peers, Function getDatacenterSource) { -if (targetPercent == 0 || peers == null) +if (peers == null || this.timeoutNanos < 0) return true; // make a copy of the set, to avoid mucking with the input (in case it's a sensitive collection) peers = new HashSet<>(peers); -peers.remove(FBUtilities.getBroadcastAddressAndPort()); +InetAddressAndPort localAddress = FBUtilities.getBroadcastAddressAndPort(); +String localDc = getDatacenterSource.apply(localAddress); +peers.remove(localAddress); if (peers.isEmpty()) return true; -logger.info("choosing to block until {}% of the {} known peers are marked alive and connections are established; max time to wait = {} seconds", -targetPercent, peers.size(), TimeUnit.NANOSECONDS.toSeconds(timeoutNanos)); +// make a copy of the datacenter mapping (in case gossip updates happen during this method or some such) +Map datacenterMap = peers.stream() + .collect(Collectors.toMap(k -> k, getDatacenterSource)); +Function getDatacenter = datacenterMap::get; -long startNanos = System.nanoTime(); +Map> peersByDc = peers.stream() + .collect(Collectors.groupingBy(getDatacenter, + Collectors.toSet())); + +if (!blockForRemoteDcs) +{ +peersByDc.keySet().retainAll(Collections.singleton(localDc)); --- End diff -- Ack, added the comment. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #212: Rework the wait for healthy logic to count down...
Github user jolynch commented on a diff in the pull request: https://github.com/apache/cassandra/pull/212#discussion_r231695413 --- Diff: src/java/org/apache/cassandra/net/StartupClusterConnectivityChecker.java --- @@ -48,81 +51,133 @@ { private static final Logger logger = LoggerFactory.getLogger(StartupClusterConnectivityChecker.class); -private final int targetPercent; +private final boolean blockForRemoteDcs; private final long timeoutNanos; -public static StartupClusterConnectivityChecker create(int targetPercent, int timeoutSecs) +public static StartupClusterConnectivityChecker create(long timeoutSecs, boolean blockForRemoteDcs) { -timeoutSecs = Math.max(1, timeoutSecs); +if (timeoutSecs < 0) +logger.warn("skipping block-for-peers due to negative timeout. You may encounter errors or timeouts on" + +" the first user query"); if (timeoutSecs > 100) logger.warn("setting the block-for-peers timeout (in seconds) to {} might be a bit excessive, but using it nonetheless", timeoutSecs); long timeoutNanos = TimeUnit.SECONDS.toNanos(timeoutSecs); -return new StartupClusterConnectivityChecker(targetPercent, timeoutNanos); +return new StartupClusterConnectivityChecker(timeoutNanos, blockForRemoteDcs); } @VisibleForTesting -StartupClusterConnectivityChecker(int targetPercent, long timeoutNanos) +StartupClusterConnectivityChecker(long timeoutNanos, boolean blockForRemoteDcs) { -this.targetPercent = Math.min(100, Math.max(0, targetPercent)); +this.blockForRemoteDcs = blockForRemoteDcs; this.timeoutNanos = timeoutNanos; } /** * @param peers The currently known peers in the cluster; argument is not modified. + * @param getDatacenterSource A function for mapping peers to their datacenter. * @return true if the requested percentage of peers are marked ALIVE in gossip and have their connections opened; * else false. */ -public boolean execute(Set peers) +public boolean execute(Set peers, Function getDatacenterSource) { -if (targetPercent == 0 || peers == null) +if (peers == null || this.timeoutNanos < 0) return true; // make a copy of the set, to avoid mucking with the input (in case it's a sensitive collection) peers = new HashSet<>(peers); -peers.remove(FBUtilities.getBroadcastAddressAndPort()); +InetAddressAndPort localAddress = FBUtilities.getBroadcastAddressAndPort(); +String localDc = getDatacenterSource.apply(localAddress); +peers.remove(localAddress); if (peers.isEmpty()) return true; -logger.info("choosing to block until {}% of the {} known peers are marked alive and connections are established; max time to wait = {} seconds", -targetPercent, peers.size(), TimeUnit.NANOSECONDS.toSeconds(timeoutNanos)); +// make a copy of the datacenter mapping (in case gossip updates happen during this method or some such) +Map datacenterMap = peers.stream() + .collect(Collectors.toMap(k -> k, getDatacenterSource)); +Function getDatacenter = datacenterMap::get; -long startNanos = System.nanoTime(); +Map> peersByDc = peers.stream() + .collect(Collectors.groupingBy(getDatacenter, + Collectors.toSet())); + +if (!blockForRemoteDcs) +{ +peersByDc.keySet().retainAll(Collections.singleton(localDc)); +logger.info("Blocking coordination until only a single peer is DOWN in the local datacenter, timeout={}s", +TimeUnit.NANOSECONDS.toSeconds(timeoutNanos)); +} +else +{ +logger.info("Blocking coordination until only a single peer is DOWN in each datacenter, timeout={}s", +TimeUnit.NANOSECONDS.toSeconds(timeoutNanos)); +} AckMap acks = new AckMap(3); -int target = (int) ((targetPercent / 100.0) * peers.size()); -CountDownLatch latch = new CountDownLatch(target); +Map latchMap = new HashMap<>(peersByDc.size()); --- End diff -- I called it `dcToRemainingPeers` to try to convey what it was counting down. Let me know if that's not clear and I'll change it. --- -
[GitHub] cassandra pull request #212: Rework the wait for healthy logic to count down...
Github user jolynch commented on a diff in the pull request: https://github.com/apache/cassandra/pull/212#discussion_r231693230 --- Diff: src/java/org/apache/cassandra/net/StartupClusterConnectivityChecker.java --- @@ -48,81 +51,133 @@ { private static final Logger logger = LoggerFactory.getLogger(StartupClusterConnectivityChecker.class); -private final int targetPercent; +private final boolean blockForRemoteDcs; private final long timeoutNanos; -public static StartupClusterConnectivityChecker create(int targetPercent, int timeoutSecs) +public static StartupClusterConnectivityChecker create(long timeoutSecs, boolean blockForRemoteDcs) { -timeoutSecs = Math.max(1, timeoutSecs); +if (timeoutSecs < 0) +logger.warn("skipping block-for-peers due to negative timeout. You may encounter errors or timeouts on" + +" the first user query"); if (timeoutSecs > 100) logger.warn("setting the block-for-peers timeout (in seconds) to {} might be a bit excessive, but using it nonetheless", timeoutSecs); long timeoutNanos = TimeUnit.SECONDS.toNanos(timeoutSecs); -return new StartupClusterConnectivityChecker(targetPercent, timeoutNanos); +return new StartupClusterConnectivityChecker(timeoutNanos, blockForRemoteDcs); } @VisibleForTesting -StartupClusterConnectivityChecker(int targetPercent, long timeoutNanos) +StartupClusterConnectivityChecker(long timeoutNanos, boolean blockForRemoteDcs) { -this.targetPercent = Math.min(100, Math.max(0, targetPercent)); +this.blockForRemoteDcs = blockForRemoteDcs; this.timeoutNanos = timeoutNanos; } /** * @param peers The currently known peers in the cluster; argument is not modified. + * @param getDatacenterSource A function for mapping peers to their datacenter. * @return true if the requested percentage of peers are marked ALIVE in gossip and have their connections opened; * else false. */ -public boolean execute(Set peers) +public boolean execute(Set peers, Function getDatacenterSource) { -if (targetPercent == 0 || peers == null) +if (peers == null || this.timeoutNanos < 0) return true; // make a copy of the set, to avoid mucking with the input (in case it's a sensitive collection) peers = new HashSet<>(peers); -peers.remove(FBUtilities.getBroadcastAddressAndPort()); +InetAddressAndPort localAddress = FBUtilities.getBroadcastAddressAndPort(); +String localDc = getDatacenterSource.apply(localAddress); +peers.remove(localAddress); if (peers.isEmpty()) return true; -logger.info("choosing to block until {}% of the {} known peers are marked alive and connections are established; max time to wait = {} seconds", -targetPercent, peers.size(), TimeUnit.NANOSECONDS.toSeconds(timeoutNanos)); +// make a copy of the datacenter mapping (in case gossip updates happen during this method or some such) +Map datacenterMap = peers.stream() + .collect(Collectors.toMap(k -> k, getDatacenterSource)); +Function getDatacenter = datacenterMap::get; -long startNanos = System.nanoTime(); +Map> peersByDc = peers.stream() + .collect(Collectors.groupingBy(getDatacenter, + Collectors.toSet())); + +if (!blockForRemoteDcs) +{ +peersByDc.keySet().retainAll(Collections.singleton(localDc)); +logger.info("Blocking coordination until only a single peer is DOWN in the local datacenter, timeout={}s", +TimeUnit.NANOSECONDS.toSeconds(timeoutNanos)); +} +else +{ +logger.info("Blocking coordination until only a single peer is DOWN in each datacenter, timeout={}s", +TimeUnit.NANOSECONDS.toSeconds(timeoutNanos)); +} AckMap acks = new AckMap(3); -int target = (int) ((targetPercent / 100.0) * peers.size()); -CountDownLatch latch = new CountDownLatch(target); +Map latchMap = new HashMap<>(peersByDc.size()); +for (Map.Entry> entry: peersByDc.entrySet()) +{ +latchMap.put(entry.getKey(), new CountDownLatch(Math.max(entry.getValue().size() - 1, 0))); +
[GitHub] cassandra pull request #212: Rework the wait for healthy logic to count down...
Github user jolynch commented on a diff in the pull request: https://github.com/apache/cassandra/pull/212#discussion_r231691984 --- Diff: src/java/org/apache/cassandra/net/StartupClusterConnectivityChecker.java --- @@ -48,81 +51,133 @@ { private static final Logger logger = LoggerFactory.getLogger(StartupClusterConnectivityChecker.class); -private final int targetPercent; +private final boolean blockForRemoteDcs; private final long timeoutNanos; -public static StartupClusterConnectivityChecker create(int targetPercent, int timeoutSecs) +public static StartupClusterConnectivityChecker create(long timeoutSecs, boolean blockForRemoteDcs) { -timeoutSecs = Math.max(1, timeoutSecs); +if (timeoutSecs < 0) +logger.warn("skipping block-for-peers due to negative timeout. You may encounter errors or timeouts on" + +" the first user query"); if (timeoutSecs > 100) logger.warn("setting the block-for-peers timeout (in seconds) to {} might be a bit excessive, but using it nonetheless", timeoutSecs); long timeoutNanos = TimeUnit.SECONDS.toNanos(timeoutSecs); -return new StartupClusterConnectivityChecker(targetPercent, timeoutNanos); +return new StartupClusterConnectivityChecker(timeoutNanos, blockForRemoteDcs); } @VisibleForTesting -StartupClusterConnectivityChecker(int targetPercent, long timeoutNanos) +StartupClusterConnectivityChecker(long timeoutNanos, boolean blockForRemoteDcs) { -this.targetPercent = Math.min(100, Math.max(0, targetPercent)); +this.blockForRemoteDcs = blockForRemoteDcs; this.timeoutNanos = timeoutNanos; } /** * @param peers The currently known peers in the cluster; argument is not modified. + * @param getDatacenterSource A function for mapping peers to their datacenter. * @return true if the requested percentage of peers are marked ALIVE in gossip and have their connections opened; * else false. */ -public boolean execute(Set peers) +public boolean execute(Set peers, Function getDatacenterSource) { -if (targetPercent == 0 || peers == null) +if (peers == null || this.timeoutNanos < 0) return true; // make a copy of the set, to avoid mucking with the input (in case it's a sensitive collection) peers = new HashSet<>(peers); -peers.remove(FBUtilities.getBroadcastAddressAndPort()); +InetAddressAndPort localAddress = FBUtilities.getBroadcastAddressAndPort(); +String localDc = getDatacenterSource.apply(localAddress); +peers.remove(localAddress); if (peers.isEmpty()) return true; -logger.info("choosing to block until {}% of the {} known peers are marked alive and connections are established; max time to wait = {} seconds", -targetPercent, peers.size(), TimeUnit.NANOSECONDS.toSeconds(timeoutNanos)); +// make a copy of the datacenter mapping (in case gossip updates happen during this method or some such) +Map datacenterMap = peers.stream() + .collect(Collectors.toMap(k -> k, getDatacenterSource)); +Function getDatacenter = datacenterMap::get; -long startNanos = System.nanoTime(); +Map> peersByDc = peers.stream() + .collect(Collectors.groupingBy(getDatacenter, + Collectors.toSet())); + +if (!blockForRemoteDcs) +{ +peersByDc.keySet().retainAll(Collections.singleton(localDc)); +logger.info("Blocking coordination until only a single peer is DOWN in the local datacenter, timeout={}s", +TimeUnit.NANOSECONDS.toSeconds(timeoutNanos)); +} +else +{ +logger.info("Blocking coordination until only a single peer is DOWN in each datacenter, timeout={}s", +TimeUnit.NANOSECONDS.toSeconds(timeoutNanos)); +} AckMap acks = new AckMap(3); -int target = (int) ((targetPercent / 100.0) * peers.size()); -CountDownLatch latch = new CountDownLatch(target); +Map latchMap = new HashMap<>(peersByDc.size()); +for (Map.Entry> entry: peersByDc.entrySet()) +{ +latchMap.put(entry.getKey(), new CountDownLatch(Math.max(entry.getValue().size() - 1, 0))); +
[GitHub] cassandra pull request #212: Rework the wait for healthy logic to count down...
Github user jolynch commented on a diff in the pull request: https://github.com/apache/cassandra/pull/212#discussion_r231688792 --- Diff: src/java/org/apache/cassandra/net/StartupClusterConnectivityChecker.java --- @@ -48,81 +51,133 @@ { private static final Logger logger = LoggerFactory.getLogger(StartupClusterConnectivityChecker.class); -private final int targetPercent; +private final boolean blockForRemoteDcs; private final long timeoutNanos; -public static StartupClusterConnectivityChecker create(int targetPercent, int timeoutSecs) +public static StartupClusterConnectivityChecker create(long timeoutSecs, boolean blockForRemoteDcs) { -timeoutSecs = Math.max(1, timeoutSecs); +if (timeoutSecs < 0) +logger.warn("skipping block-for-peers due to negative timeout. You may encounter errors or timeouts on" + +" the first user query"); if (timeoutSecs > 100) logger.warn("setting the block-for-peers timeout (in seconds) to {} might be a bit excessive, but using it nonetheless", timeoutSecs); long timeoutNanos = TimeUnit.SECONDS.toNanos(timeoutSecs); -return new StartupClusterConnectivityChecker(targetPercent, timeoutNanos); +return new StartupClusterConnectivityChecker(timeoutNanos, blockForRemoteDcs); } @VisibleForTesting -StartupClusterConnectivityChecker(int targetPercent, long timeoutNanos) +StartupClusterConnectivityChecker(long timeoutNanos, boolean blockForRemoteDcs) { -this.targetPercent = Math.min(100, Math.max(0, targetPercent)); +this.blockForRemoteDcs = blockForRemoteDcs; this.timeoutNanos = timeoutNanos; } /** * @param peers The currently known peers in the cluster; argument is not modified. + * @param getDatacenterSource A function for mapping peers to their datacenter. * @return true if the requested percentage of peers are marked ALIVE in gossip and have their connections opened; * else false. */ -public boolean execute(Set peers) +public boolean execute(Set peers, Function getDatacenterSource) { -if (targetPercent == 0 || peers == null) +if (peers == null || this.timeoutNanos < 0) return true; // make a copy of the set, to avoid mucking with the input (in case it's a sensitive collection) peers = new HashSet<>(peers); -peers.remove(FBUtilities.getBroadcastAddressAndPort()); +InetAddressAndPort localAddress = FBUtilities.getBroadcastAddressAndPort(); +String localDc = getDatacenterSource.apply(localAddress); +peers.remove(localAddress); if (peers.isEmpty()) return true; -logger.info("choosing to block until {}% of the {} known peers are marked alive and connections are established; max time to wait = {} seconds", -targetPercent, peers.size(), TimeUnit.NANOSECONDS.toSeconds(timeoutNanos)); +// make a copy of the datacenter mapping (in case gossip updates happen during this method or some such) +Map datacenterMap = peers.stream() + .collect(Collectors.toMap(k -> k, getDatacenterSource)); +Function getDatacenter = datacenterMap::get; -long startNanos = System.nanoTime(); +Map> peersByDc = peers.stream() --- End diff -- Sure, done. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #212: Rework the wait for healthy logic to count down...
Github user jolynch commented on a diff in the pull request: https://github.com/apache/cassandra/pull/212#discussion_r231688756 --- Diff: src/java/org/apache/cassandra/net/StartupClusterConnectivityChecker.java --- @@ -48,81 +51,133 @@ { private static final Logger logger = LoggerFactory.getLogger(StartupClusterConnectivityChecker.class); -private final int targetPercent; +private final boolean blockForRemoteDcs; private final long timeoutNanos; -public static StartupClusterConnectivityChecker create(int targetPercent, int timeoutSecs) +public static StartupClusterConnectivityChecker create(long timeoutSecs, boolean blockForRemoteDcs) { -timeoutSecs = Math.max(1, timeoutSecs); +if (timeoutSecs < 0) +logger.warn("skipping block-for-peers due to negative timeout. You may encounter errors or timeouts on" + +" the first user query"); if (timeoutSecs > 100) logger.warn("setting the block-for-peers timeout (in seconds) to {} might be a bit excessive, but using it nonetheless", timeoutSecs); long timeoutNanos = TimeUnit.SECONDS.toNanos(timeoutSecs); -return new StartupClusterConnectivityChecker(targetPercent, timeoutNanos); +return new StartupClusterConnectivityChecker(timeoutNanos, blockForRemoteDcs); } @VisibleForTesting -StartupClusterConnectivityChecker(int targetPercent, long timeoutNanos) +StartupClusterConnectivityChecker(long timeoutNanos, boolean blockForRemoteDcs) { -this.targetPercent = Math.min(100, Math.max(0, targetPercent)); +this.blockForRemoteDcs = blockForRemoteDcs; this.timeoutNanos = timeoutNanos; } /** * @param peers The currently known peers in the cluster; argument is not modified. + * @param getDatacenterSource A function for mapping peers to their datacenter. * @return true if the requested percentage of peers are marked ALIVE in gossip and have their connections opened; * else false. */ -public boolean execute(Set peers) +public boolean execute(Set peers, Function getDatacenterSource) { -if (targetPercent == 0 || peers == null) +if (peers == null || this.timeoutNanos < 0) return true; // make a copy of the set, to avoid mucking with the input (in case it's a sensitive collection) peers = new HashSet<>(peers); -peers.remove(FBUtilities.getBroadcastAddressAndPort()); +InetAddressAndPort localAddress = FBUtilities.getBroadcastAddressAndPort(); +String localDc = getDatacenterSource.apply(localAddress); +peers.remove(localAddress); if (peers.isEmpty()) return true; -logger.info("choosing to block until {}% of the {} known peers are marked alive and connections are established; max time to wait = {} seconds", -targetPercent, peers.size(), TimeUnit.NANOSECONDS.toSeconds(timeoutNanos)); +// make a copy of the datacenter mapping (in case gossip updates happen during this method or some such) +Map datacenterMap = peers.stream() --- End diff -- Sure thing, makes sense and I've made both refactors. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #212: Rework the wait for healthy logic to count down...
Github user jolynch commented on a diff in the pull request: https://github.com/apache/cassandra/pull/212#discussion_r231681120 --- Diff: src/java/org/apache/cassandra/net/StartupClusterConnectivityChecker.java --- @@ -48,81 +51,133 @@ { private static final Logger logger = LoggerFactory.getLogger(StartupClusterConnectivityChecker.class); -private final int targetPercent; +private final boolean blockForRemoteDcs; private final long timeoutNanos; -public static StartupClusterConnectivityChecker create(int targetPercent, int timeoutSecs) +public static StartupClusterConnectivityChecker create(long timeoutSecs, boolean blockForRemoteDcs) { -timeoutSecs = Math.max(1, timeoutSecs); +if (timeoutSecs < 0) +logger.warn("skipping block-for-peers due to negative timeout. You may encounter errors or timeouts on" + --- End diff -- I thought it should be for the same reason we warn them if they set the timeout to more than 100, but I don't care too much. I removed it. --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #212: Rework the wait for healthy logic to count down...
Github user jolynch commented on a diff in the pull request: https://github.com/apache/cassandra/pull/212#discussion_r231680735 --- Diff: src/java/org/apache/cassandra/config/Config.java --- @@ -380,9 +380,23 @@ public RepairCommandPoolFullStrategy repair_command_pool_full_strategy = RepairCommandPoolFullStrategy.queue; public int repair_command_pool_size = concurrent_validations; -// parameters to adjust how much to delay startup until a certain amount of the cluster is connect to and marked alive -public int block_for_peers_percentage = 70; +/** + * When a node first starts up it intially considers all other peers as DOWN, and then as the initial gossip + * broadcast messages comes back nodes transition to UP. These options configure how long we wait for peers to + * connect before we make this node available as a coordinator. Furthermore, if this feature is enabled + * (timeout >= 0) Cassandra initiates the non gossip channel internode connections on startup as well and waits --- End diff -- Ok, I reworded it. I think that having an escape hatch makes sense, what do you think? --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #277: 12106 - blacklisting bad partitions for point r...
Github user jolynch commented on a diff in the pull request: https://github.com/apache/cassandra/pull/277#discussion_r230865548 --- Diff: src/java/org/apache/cassandra/service/CassandraDaemon.java --- @@ -426,6 +426,9 @@ public void uncaughtException(Thread t, Throwable e) // Native transport nativeTransportService = new NativeTransportService(); +// load blacklisted partitions into cache +StorageService.instance.refreshBlacklistedPartitionsCache(); --- End diff -- During a cluster upgrade, I'm not sure that we can assume this call will succeed. I'm not entirely sure how that works tbh with system_distributed tables. I guess that the first node of the new version will create the table maybe? --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #277: 12106 - blacklisting bad partitions for point r...
Github user jolynch commented on a diff in the pull request: https://github.com/apache/cassandra/pull/277#discussion_r230865112 --- Diff: src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java --- @@ -307,6 +320,55 @@ public static void setViewRemoved(String keyspaceName, String viewName) forceBlockingFlush(VIEW_BUILD_STATUS); } +/** + * Reads blacklisted partitions from system_distributed.blacklisted_partitions table. + * Stops reading partitions upon exceeding the cache size limit by logging a warning. + * @return + */ +public static Set getBlacklistedPartitions() +{ +String query = "SELECT keyspace_name, columnfamily_name, partition_key FROM %s.%s"; +UntypedResultSet results; +try +{ +results = QueryProcessor.execute(format(query, SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, BLACKLISTED_PARTITIONS), --- End diff -- I think there is a local execution with pagination option (which should avoid oom right)? --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #277: 12106 - blacklisting bad partitions for point r...
Github user jolynch commented on a diff in the pull request: https://github.com/apache/cassandra/pull/277#discussion_r230864957 --- Diff: src/java/org/apache/cassandra/cache/BlacklistedPartition.java --- @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.cache; + +import java.nio.ByteBuffer; +import java.util.Arrays; + +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.schema.KeyspaceMetadata; +import org.apache.cassandra.schema.Schema; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.schema.TableMetadataRef; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.ObjectSizes; + +/** + * Class to represent a blacklisted partition + */ +public class BlacklistedPartition implements IMeasurableMemory --- End diff -- Hm, not sure I agree since this is in the same package, but I don't really care :-) --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #277: 12106 - blacklisting bad partitions for point r...
Github user jolynch commented on a diff in the pull request: https://github.com/apache/cassandra/pull/277#discussion_r230864731 --- Diff: doc/source/operating/blacklisting_partitions.rst --- @@ -0,0 +1,63 @@ +.. Licensed to the Apache Software Foundation (ASF) under one +.. or more contributor license agreements. See the NOTICE file +.. distributed with this work for additional information +.. regarding copyright ownership. The ASF licenses this file +.. to you under the Apache License, Version 2.0 (the +.. "License"); you may not use this file except in compliance +.. with the License. You may obtain a copy of the License at +.. +.. http://www.apache.org/licenses/LICENSE-2.0 +.. +.. Unless required by applicable law or agreed to in writing, software +.. distributed under the License is distributed on an "AS IS" BASIS, +.. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +.. See the License for the specific language governing permissions and +.. limitations under the License. + +.. highlight:: none + + +Blacklisting Partitions (CASSANDRA-12106) +- + +This feature allows partition keys to be blacklisted i.e. Cassandra would not process following operations on those +blacklisted partitions. + +- Point READs + +Response would be InvalidQueryException. + +It is important to note that, this would not have any effect on range reads or write operations. + +How to blacklist a partition key + +``system_distributed.blacklisted_partitions`` table can be used to blacklist partitions. Essentially, you will have to +insert a new row into the table with the following details: + +- Keyspace name +- Table name +- Partition key + +The way partition key needs to be mentioned is exactly similar to how ``nodetool getendpoints`` works. +Following are several examples for blacklisting partition keys in keyspace, say ks, and table, say table1 for different +possibilities of table1 partition key: +- For a single column partition key, say Id, + Id is a simple type - insert into system_distributed.blacklisted_partitions (keyspace_name ,columnfamily_name ,partition_key) VALUES ('ks','table1','1'); + Id is a blob- insert into system_distributed.blacklisted_partitions (keyspace_name ,columnfamily_name ,partition_key) VALUES ('ks','table1','12345f'); + Id has a colon - insert into system_distributed.blacklisted_partitions (keyspace_name ,columnfamily_name ,partition_key) VALUES ('ks','table1','1\:2'); + +- For a composite column partition key, say (Key1, Key2), + insert into system_distributed.blacklisted_partitions (keyspace_name ,columnfamily_name, partition_key) VALUES ('ks','table1','k11:k21'); + +BlacklistedPartitions Cache +^^^ +Cassandra internally maintains an on-heap cache that loads partition keys from ``system_distributed.blacklisted_partitions``. +Any partition key mentioned against a non-existent keyspace name and table name will not be cached. + +Cache gets refreshed in following ways +- During Cassandra start up +- Scheduled cache refresh at a default interval of 10 minutes (can be overridden using ``blacklisted_partitions_cache_refresh_period_in_sec`` yaml property) +- Using nodetool refreshblacklistedpartitionscache + +There is no bound on how much on-heap memory this cache can occupy - so be cautious on how many keys you would want to blacklist. +``blacklisted_partitions_cache_size_warn_threshold_in_mb`` yaml property can be used to be notified (via warning logs) if cache size exceeds the set threshold. --- End diff -- The docs have a `cassandra.yaml` section that you can link to to share context on configuration options. I was suggesting you could link to that page potentially? http://cassandra.apache.org/doc/latest/configuration/cassandra_config_file.html --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #277: 12106 - blacklisting bad partitions for point r...
Github user sumanth-pasupuleti commented on a diff in the pull request: https://github.com/apache/cassandra/pull/277#discussion_r230548740 --- Diff: test/unit/org/apache/cassandra/cache/BlacklistedPartitionsCacheTest.java --- @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.cache; + +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.exceptions.InvalidQueryException; +import org.apache.cassandra.cql3.CQLTester; +import org.apache.cassandra.repair.SystemDistributedKeyspace; +import org.apache.cassandra.schema.SchemaConstants; +import org.apache.cassandra.utils.ByteBufferUtil; + +public class BlacklistedPartitionsCacheTest extends CQLTester --- End diff -- makes sense. Added a test case where reading blacklist fails due to absence of blacklisted_partitions table --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #277: 12106 - blacklisting bad partitions for point r...
Github user sumanth-pasupuleti commented on a diff in the pull request: https://github.com/apache/cassandra/pull/277#discussion_r230548472 --- Diff: src/java/org/apache/cassandra/tools/nodetool/BlacklistedPartitions.java --- @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tools.nodetool; + +import io.airlift.airline.Command; +import io.airlift.airline.Option; +import org.apache.cassandra.tools.NodeProbe; +import org.apache.cassandra.tools.NodeTool; + +@Command(name = "blacklistedpartitions", description = "Refresh blacklisted partitions cache from system_distributed.blacklisted_partitions table") +public class BlacklistedPartitions extends NodeTool.NodeToolCmd +{ +@Option(title = "refresh_cache", name = { "--refresh-cache"}, description = "Refresh blacklisted partitions cache. Default = false.") --- End diff -- done --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org
[GitHub] cassandra pull request #277: 12106 - blacklisting bad partitions for point r...
Github user sumanth-pasupuleti commented on a diff in the pull request: https://github.com/apache/cassandra/pull/277#discussion_r230548454 --- Diff: src/java/org/apache/cassandra/service/CassandraDaemon.java --- @@ -426,6 +426,9 @@ public void uncaughtException(Thread t, Throwable e) // Native transport nativeTransportService = new NativeTransportService(); +// load blacklisted partitions into cache +StorageService.instance.refreshBlacklistedPartitionsCache(); --- End diff -- can you elaborate please? not having what? --- - To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org