[GitHub] cassandra pull request #281: Sep worker shutdown

2019-01-08 Thread ifesdjeen
Github user ifesdjeen closed the pull request at:

https://github.com/apache/cassandra/pull/281


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #175: Fix: Page width exceeding the viewport width

2019-01-06 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/cassandra/pull/175


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #:

2019-01-05 Thread michaelsembwever
Github user michaelsembwever commented on the pull request:


https://github.com/apache/cassandra/commit/f30a7dab7f2871fd4b88352f4b17129c02425484#commitcomment-31847255
  
In doc/source/operating/read_repair.rst:
In doc/source/operating/read_repair.rst on line 24:
"fatest replica" ? that's not objectively true. maybe just drop the 
"fastest"?


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #:

2019-01-05 Thread michaelsembwever
Github user michaelsembwever commented on the pull request:


https://github.com/apache/cassandra/commit/f30a7dab7f2871fd4b88352f4b17129c02425484#commitcomment-31847253
  
In doc/source/operating/read_repair.rst:
In doc/source/operating/read_repair.rst on line 26:
Is this accurate? While read repairs initially ask just those replicas it 
got digests from (hence matching the consistency level of the request) for full 
data responses, but it can request additional replicas (a la speculative 
execution)?


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #:

2019-01-05 Thread michaelsembwever
Github user michaelsembwever commented on the pull request:


https://github.com/apache/cassandra/commit/f30a7dab7f2871fd4b88352f4b17129c02425484#commitcomment-31847204
  
In doc/source/operating/hints.rst:
In doc/source/operating/hints.rst on line 36:
I would drop all mention of `ANY`. It's not a consistency level that the C* 
community wants to announce anymore, afaik.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #:

2019-01-05 Thread michaelsembwever
Github user michaelsembwever commented on the pull request:


https://github.com/apache/cassandra/commit/f30a7dab7f2871fd4b88352f4b17129c02425484#commitcomment-31847200
  
In doc/source/operating/hints.rst:
In doc/source/operating/hints.rst on line 22:
This sentence could be more exact. Hints don't "ensure consistency", per 
se. Rather they reduce inconsistencies at rest (on disk). As explained below, 
hints take time to reply so nothing is immediate or guaranteed. But in practice 
it minimises inconsistencies on disk when nodes are not down for longer than 
the max hint window.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #297: Update URLs for gitbox.apache.org migration

2019-01-04 Thread mshuler
GitHub user mshuler opened a pull request:

https://github.com/apache/cassandra/pull/297

Update URLs for gitbox.apache.org migration



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mshuler/cassandra git-wip2gitbox

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/cassandra/pull/297.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #297


commit 9fd6659633f87b32fde2704346ac156806d19a67
Author: Michael Shuler 
Date:   2019-01-04T20:50:29Z

Update URLs for gitbox.apache.org migration




---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #296: Missing 'hppc' transitive dependency in 'cassan...

2018-12-20 Thread jojczykp
GitHub user jojczykp opened a pull request:

https://github.com/apache/cassandra/pull/296

Missing 'hppc' transitive dependency in 'cassandra-all' 

When I run:
`$ ant mvn-install`
I get, as expected, cassandra-all-4.0-SNAPSHOT generated and installed in 
my local repo.

Unfortunately, to make use of it as a dependency in my gradle project, I 
also have to add `com.carrotsearch:hppc:0.5.4` as a dependency in that project.

As hppc is required by cassandra, I'd rather expected it to be 
automatically resolved as a dependency, and appear in my project without any 
extra effort (same way as it works for all other cassandra deps, like 
joda-time, sigar, etc.)

This PR makes it working :)

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jojczykp/cassandra 
missing-hppc-transitive-dependency-in-produced-pom

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/cassandra/pull/296.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #296


commit 57f8fa1ff77aebedc397b33f06f83ba1e6c4690b
Author: jojczykp 
Date:   2018-12-20T22:23:37Z

Missing 'hppc' transitive dependency in 'cassandra-all' generated with 
'maven-install'




---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #292: provide ZSTD compression support to cassandra (...

2018-12-12 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/292#discussion_r241232030
  
--- Diff: src/java/org/apache/cassandra/io/compress/ZSTDCompressor.java ---
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.compress;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.github.luben.zstd.Zstd;
+
+public class ZSTDCompressor implements ICompressor
+{
+public static final int FAST_COMPRESSION = 1; // fastest compression 
time
+public static final int DEFAULT_LEVEL = 3;
+public static final int BEST_COMPRESSION = 22;// very good compression 
ratio
+private static final ZSTDCompressor instance = new ZSTDCompressor();
+private static final String COMPRESSION_LEVEL_OPTION_NAME = 
"compression_level";
+@VisibleForTesting
+protected static final int compressionLevel = DEFAULT_LEVEL;
+
+public static ZSTDCompressor create(Map 
compressionOptions)
+{
+
validateCompressionLevel(parseCompressionLevelOption(compressionOptions));
+return instance;
--- End diff --

From my brief reading of the JNI Bindings it seems that zstd-jni uses 
`ZSTD_compress` and not `ZSTD_compressCCtx`. The latter allows you to pass in a 
context which would've been more efficient. For now, I think it would be ok to 
simply pass in the compression level. The only value of keeping multiple 
compressor objects around is to retain the `CompressionParams`. It would be 
useful to have a caching factory to avoid creating multiple objects with the 
same compression params.

As far as thread safety goes, the JNI Code looks thread safe specifically 
compress and decompress methods. As pointed out earlier they create and destroy 
the Compression Context on each invocation which is memory unfriendly. So, 
although the zstd benchmarks may look great, I am not so sure about this JNI 
binding.

We should definitely add a JMH Benchmark for zstd.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #292: provide ZSTD compression support to cassandra (...

2018-12-12 Thread dineshjoshi
Github user dineshjoshi commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/292#discussion_r241224459
  
--- Diff: build.xml ---
@@ -412,6 +412,7 @@
   
   
   
+  
--- End diff --

The latest version is `1.3.7`. Let's upgrade to it.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #292: provide ZSTD compression support to cassandra (...

2018-12-11 Thread sushmaad
Github user sushmaad commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/292#discussion_r240890567
  
--- Diff: src/java/org/apache/cassandra/io/compress/ZSTDCompressor.java ---
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.compress;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.github.luben.zstd.Zstd;
+
+public class ZSTDCompressor implements ICompressor
+{
+public static final int FAST_COMPRESSION = 1; // fastest compression 
time
+public static final int DEFAULT_LEVEL = 3;
+public static final int BEST_COMPRESSION = 22;// very good compression 
ratio
+private static final ZSTDCompressor instance = new ZSTDCompressor();
+private static final String COMPRESSION_LEVEL_OPTION_NAME = 
"compression_level";
+@VisibleForTesting
+protected static final int compressionLevel = DEFAULT_LEVEL;
+
+public static ZSTDCompressor create(Map 
compressionOptions)
+{
+
validateCompressionLevel(parseCompressionLevelOption(compressionOptions));
+return instance;
+}
+
+private static void validateCompressionLevel(int compressionLevel)
+{
+if (compressionLevel < FAST_COMPRESSION || compressionLevel > 
BEST_COMPRESSION)
--- End diff --

Zstd accepts integer levels from negative infinity up to 22. Levels 1 to 22 
are normal levels, ranging from fastest at level 1 to highest compression ratio 
at level 22. 0 selects the default compression level (which is level 3). 
Negative levels are a new addition, and are even faster than level 1, getting 
faster the more negative you go. 
Definitely negative compression levels can be supported here.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #292: provide ZSTD compression support to cassandra (...

2018-12-11 Thread sushmaad
Github user sushmaad commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/292#discussion_r240889512
  
--- Diff: src/java/org/apache/cassandra/io/compress/ZSTDCompressor.java ---
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.compress;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.github.luben.zstd.Zstd;
+
+public class ZSTDCompressor implements ICompressor
+{
+public static final int FAST_COMPRESSION = 1; // fastest compression 
time
+public static final int DEFAULT_LEVEL = 3;
+public static final int BEST_COMPRESSION = 22;// very good compression 
ratio
+private static final ZSTDCompressor instance = new ZSTDCompressor();
+private static final String COMPRESSION_LEVEL_OPTION_NAME = 
"compression_level";
+@VisibleForTesting
+protected static final int compressionLevel = DEFAULT_LEVEL;
+
+public static ZSTDCompressor create(Map 
compressionOptions)
+{
+
validateCompressionLevel(parseCompressionLevelOption(compressionOptions));
+return instance;
--- End diff --

@jolynch @alexott indeed compression_level is currently always set to 
default, I will refactor this a bit so that each CF will have CompressionParams 
with dedicated compressor by removing the static reference. 
Seems to me that there is a tradeoff between  having static reference with 
statefulness based on compression_level versus just creating multiple 
compressor with each CompressionParams.
Having individual compressor might increase the number of compressor 
objects floating around but we have the flexibility of directly using 
compression_level without any complicated pre-determined conditions. let me 
know what you think.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #292: provide ZSTD compression support to cassandra (...

2018-12-11 Thread sushmaad
Github user sushmaad commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/292#discussion_r240885996
  
--- Diff: src/java/org/apache/cassandra/io/compress/ZSTDCompressor.java ---
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.compress;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.github.luben.zstd.Zstd;
+
+public class ZSTDCompressor implements ICompressor
+{
+public static final int FAST_COMPRESSION = 1; // fastest compression 
time
+public static final int DEFAULT_LEVEL = 3;
+public static final int BEST_COMPRESSION = 22;// very good compression 
ratio
+private static final ZSTDCompressor instance = new ZSTDCompressor();
+private static final String COMPRESSION_LEVEL_OPTION_NAME = 
"compression_level";
+@VisibleForTesting
+protected static final int compressionLevel = DEFAULT_LEVEL;
+
+public static ZSTDCompressor create(Map 
compressionOptions)
+{
+
validateCompressionLevel(parseCompressionLevelOption(compressionOptions));
+return instance;
+}
+
+private static void validateCompressionLevel(int compressionLevel)
+{
+if (compressionLevel < FAST_COMPRESSION || compressionLevel > 
BEST_COMPRESSION)
+{
+throw new IllegalArgumentException(
+"ZSTD compression_level " + 
Integer.toString(compressionLevel) + " invalid ",
+null
+);
+}
+}
+
+private static int parseCompressionLevelOption(Map 
compressionOptions)
+{
+return 
Integer.parseInt(compressionOptions.getOrDefault(COMPRESSION_LEVEL_OPTION_NAME,
+
Integer.toString(DEFAULT_LEVEL)));
+}
+
+@Override
+public int initialCompressedBufferLength(int chunkLength)
+{
+return (int)Zstd.compressBound(chunkLength);
+}
+
+@Override
+public int uncompress(byte[] input, int inputOffset, int inputLength, 
byte[] output, int outputOffset)
+throws IOException
+{
+long decompressSize = Zstd.decompressByteArray(output,
+   outputOffset,
+   output.length - 
outputOffset,
+   input,
+   inputOffset,
+   inputLength);
+if (Zstd.isError(decompressSize))
+{
+throw new IOException("ZSTD uncompress failed with error 
reason " + Zstd.getErrorName(decompressSize));
+}
+
+return (int) decompressSize;
+}
+
+@Override
+public void uncompress(ByteBuffer input, ByteBuffer output) throws 
IOException
+{
+Zstd.decompress(output, input);
+}
+
+@Override
+public void compress(ByteBuffer input, ByteBuffer output) throws 
IOException
--- End diff --

Yes will do that, that seems to simplify the code here


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #295: add missing commands to nodetool-completion (CA...

2018-12-11 Thread carlo4002
GitHub user carlo4002 opened a pull request:

https://github.com/apache/cassandra/pull/295

add missing commands to nodetool-completion (CASSANDRA-14916)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/carlo4002/cassandra trunk

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/cassandra/pull/295.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #295


commit 6f4c22a6af19421d113432837b5f16b6db830439
Author: jean carlo rivera ura 
Date:   2018-12-11T08:40:14Z

add missing commands to nodetool-completion (CASSANDRA-14916)




---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #292: provide ZSTD compression support to cassandra (...

2018-12-10 Thread jolynch
Github user jolynch commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/292#discussion_r240495812
  
--- Diff: src/java/org/apache/cassandra/io/compress/ZSTDCompressor.java ---
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.compress;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.github.luben.zstd.Zstd;
+
+public class ZSTDCompressor implements ICompressor
+{
+public static final int FAST_COMPRESSION = 1; // fastest compression 
time
+public static final int DEFAULT_LEVEL = 3;
+public static final int BEST_COMPRESSION = 22;// very good compression 
ratio
+private static final ZSTDCompressor instance = new ZSTDCompressor();
+private static final String COMPRESSION_LEVEL_OPTION_NAME = 
"compression_level";
+@VisibleForTesting
+protected static final int compressionLevel = DEFAULT_LEVEL;
+
+public static ZSTDCompressor create(Map 
compressionOptions)
+{
+
validateCompressionLevel(parseCompressionLevelOption(compressionOptions));
+return instance;
+}
+
+private static void validateCompressionLevel(int compressionLevel)
+{
+if (compressionLevel < FAST_COMPRESSION || compressionLevel > 
BEST_COMPRESSION)
+{
+throw new IllegalArgumentException(
+"ZSTD compression_level " + 
Integer.toString(compressionLevel) + " invalid ",
+null
+);
+}
+}
+
+private static int parseCompressionLevelOption(Map 
compressionOptions)
+{
+return 
Integer.parseInt(compressionOptions.getOrDefault(COMPRESSION_LEVEL_OPTION_NAME,
+
Integer.toString(DEFAULT_LEVEL)));
+}
+
+@Override
+public int initialCompressedBufferLength(int chunkLength)
+{
+return (int)Zstd.compressBound(chunkLength);
+}
+
+@Override
+public int uncompress(byte[] input, int inputOffset, int inputLength, 
byte[] output, int outputOffset)
+throws IOException
+{
+long decompressSize = Zstd.decompressByteArray(output,
+   outputOffset,
+   output.length - 
outputOffset,
+   input,
+   inputOffset,
+   inputLength);
+if (Zstd.isError(decompressSize))
+{
+throw new IOException("ZSTD uncompress failed with error 
reason " + Zstd.getErrorName(decompressSize));
+}
+
+return (int) decompressSize;
+}
+
+@Override
+public void uncompress(ByteBuffer input, ByteBuffer output) throws 
IOException
+{
+Zstd.decompress(output, input);
+}
+
+@Override
+public void compress(ByteBuffer input, ByteBuffer output) throws 
IOException
--- End diff --

Indeed, but I think you can simplify this implementation by using the built 
in `Zstd.compress` method I linked about which afaict does what the code here 
does.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #292: provide ZSTD compression support to cassandra (...

2018-12-10 Thread sushmaad
Github user sushmaad commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/292#discussion_r240494855
  
--- Diff: src/java/org/apache/cassandra/io/compress/ZSTDCompressor.java ---
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.compress;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.github.luben.zstd.Zstd;
+
+public class ZSTDCompressor implements ICompressor
+{
+public static final int FAST_COMPRESSION = 1; // fastest compression 
time
+public static final int DEFAULT_LEVEL = 3;
+public static final int BEST_COMPRESSION = 22;// very good compression 
ratio
+private static final ZSTDCompressor instance = new ZSTDCompressor();
+private static final String COMPRESSION_LEVEL_OPTION_NAME = 
"compression_level";
+@VisibleForTesting
+protected static final int compressionLevel = DEFAULT_LEVEL;
+
+public static ZSTDCompressor create(Map 
compressionOptions)
+{
+
validateCompressionLevel(parseCompressionLevelOption(compressionOptions));
+return instance;
+}
+
+private static void validateCompressionLevel(int compressionLevel)
+{
+if (compressionLevel < FAST_COMPRESSION || compressionLevel > 
BEST_COMPRESSION)
+{
+throw new IllegalArgumentException(
+"ZSTD compression_level " + 
Integer.toString(compressionLevel) + " invalid ",
+null
+);
+}
+}
+
+private static int parseCompressionLevelOption(Map 
compressionOptions)
+{
+return 
Integer.parseInt(compressionOptions.getOrDefault(COMPRESSION_LEVEL_OPTION_NAME,
+
Integer.toString(DEFAULT_LEVEL)));
+}
+
+@Override
+public int initialCompressedBufferLength(int chunkLength)
+{
+return (int)Zstd.compressBound(chunkLength);
+}
+
+@Override
+public int uncompress(byte[] input, int inputOffset, int inputLength, 
byte[] output, int outputOffset)
+throws IOException
+{
+long decompressSize = Zstd.decompressByteArray(output,
+   outputOffset,
+   output.length - 
outputOffset,
+   input,
+   inputOffset,
+   inputLength);
+if (Zstd.isError(decompressSize))
+{
+throw new IOException("ZSTD uncompress failed with error 
reason " + Zstd.getErrorName(decompressSize));
+}
+
+return (int) decompressSize;
+}
+
+@Override
+public void uncompress(ByteBuffer input, ByteBuffer output) throws 
IOException
+{
+Zstd.decompress(output, input);
+}
+
+@Override
+public void compress(ByteBuffer input, ByteBuffer output) throws 
IOException
--- End diff --

ICompressor Interface supports only compress(ByteBuffer input, ByteBuffer 
output)


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #292: provide ZSTD compression support to cassandra (...

2018-12-10 Thread jolynch
Github user jolynch commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/292#discussion_r240344277
  
--- Diff: src/java/org/apache/cassandra/io/compress/ZSTDCompressor.java ---
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.compress;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.github.luben.zstd.Zstd;
+
+public class ZSTDCompressor implements ICompressor
+{
+public static final int FAST_COMPRESSION = 1; // fastest compression 
time
+public static final int DEFAULT_LEVEL = 3;
+public static final int BEST_COMPRESSION = 22;// very good compression 
ratio
+private static final ZSTDCompressor instance = new ZSTDCompressor();
+private static final String COMPRESSION_LEVEL_OPTION_NAME = 
"compression_level";
+@VisibleForTesting
+protected static final int compressionLevel = DEFAULT_LEVEL;
+
+public static ZSTDCompressor create(Map 
compressionOptions)
+{
+
validateCompressionLevel(parseCompressionLevelOption(compressionOptions));
+return instance;
--- End diff --

Sorry, just to clarify this is a Cassandra level thing, not a Zstd level 
thing. Cassandra calls the `create` method via reflection 
[here](https://github.com/apache/cassandra/blob/caf50de31b034ed77140b3c1597e7ca6ddc44e17/src/java/org/apache/cassandra/schema/CompressionParams.java#L288-L289),
 and since the `ICompressors` are "stateless" we can make a reasonable cache 
using a `ConcurrentHashMap` like 
[LZ4Compressor](https://github.com/apache/cassandra/blob/06209037ea56b5a2a49615a99f1542d6ea1b2947/src/java/org/apache/cassandra/io/compress/LZ4Compressor.java#L61)
 does keyed by parameter. This assumes of course that the `Zstd` apis are 
threadsafe, but I'm pretty confident they are.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #292: provide ZSTD compression support to cassandra (...

2018-12-10 Thread jolynch
Github user jolynch commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/292#discussion_r240341941
  
--- Diff: src/java/org/apache/cassandra/io/compress/ZSTDCompressor.java ---
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.compress;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.github.luben.zstd.Zstd;
+
+public class ZSTDCompressor implements ICompressor
+{
+public static final int FAST_COMPRESSION = 1; // fastest compression 
time
+public static final int DEFAULT_LEVEL = 3;
+public static final int BEST_COMPRESSION = 22;// very good compression 
ratio
+private static final ZSTDCompressor instance = new ZSTDCompressor();
+private static final String COMPRESSION_LEVEL_OPTION_NAME = 
"compression_level";
+@VisibleForTesting
+protected static final int compressionLevel = DEFAULT_LEVEL;
+
+public static ZSTDCompressor create(Map 
compressionOptions)
+{
+
validateCompressionLevel(parseCompressionLevelOption(compressionOptions));
+return instance;
+}
+
+private static void validateCompressionLevel(int compressionLevel)
+{
+if (compressionLevel < FAST_COMPRESSION || compressionLevel > 
BEST_COMPRESSION)
+{
+throw new IllegalArgumentException(
+"ZSTD compression_level " + 
Integer.toString(compressionLevel) + " invalid ",
+null
+);
+}
+}
+
+private static int parseCompressionLevelOption(Map 
compressionOptions)
+{
+return 
Integer.parseInt(compressionOptions.getOrDefault(COMPRESSION_LEVEL_OPTION_NAME,
+
Integer.toString(DEFAULT_LEVEL)));
+}
+
+@Override
+public int initialCompressedBufferLength(int chunkLength)
+{
+return (int)Zstd.compressBound(chunkLength);
+}
+
+@Override
+public int uncompress(byte[] input, int inputOffset, int inputLength, 
byte[] output, int outputOffset)
+throws IOException
+{
+long decompressSize = Zstd.decompressByteArray(output,
+   outputOffset,
+   output.length - 
outputOffset,
+   input,
+   inputOffset,
+   inputLength);
+if (Zstd.isError(decompressSize))
+{
+throw new IOException("ZSTD uncompress failed with error 
reason " + Zstd.getErrorName(decompressSize));
+}
+
+return (int) decompressSize;
+}
+
+@Override
+public void uncompress(ByteBuffer input, ByteBuffer output) throws 
IOException
+{
+Zstd.decompress(output, input);
--- End diff --

My reading of the [`decompress(ByteBuffer dstBuffer, ByteBuffer 
srcBuf)`](https://github.com/luben/zstd-jni/blob/c642dc38eddd9142c55644034e6485444a25f052/src/main/java/com/github/luben/zstd/Zstd.java#L879)
 method used here is that it calls `Zstd.isError` and throws a 
`RuntimeException`. To be consistent with the `ICompressor` contract these 
methods should afaict throw `IOException` instead.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #283: CASSANDRA-14459: DynamicEndpointSnitch should n...

2018-12-10 Thread aweisberg
Github user aweisberg commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/283#discussion_r240295987
  
--- Diff: src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java 
---
@@ -21,133 +21,505 @@
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.*;
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 
-import com.codahale.metrics.ExponentiallyDecayingReservoir;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.RateLimiter;
+import com.google.common.util.concurrent.SettableFuture;
 
-import com.codahale.metrics.Snapshot;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import 
org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
 import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.exceptions.RequestFailureReason;
 import org.apache.cassandra.gms.ApplicationState;
 import org.apache.cassandra.gms.EndpointState;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.gms.VersionedValue;
+import 
org.apache.cassandra.locator.dynamicsnitch.DynamicEndpointSnitchHistogram;
+import org.apache.cassandra.net.IAsyncCallbackWithFailure;
+import org.apache.cassandra.net.LatencyMeasurementType;
+import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.PingMessage;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.MBeanWrapper;
 
+import static org.apache.cassandra.net.MessagingService.Verb.PING;
+import static 
org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType.LARGE_MESSAGE;
+import static 
org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType.SMALL_MESSAGE;
+
+
 /**
  * A dynamic snitch that sorts endpoints by latency with an adapted phi 
failure detector
+ * Note that the subclasses (e.g. {@link DynamicEndpointSnitchHistogram}) 
are responsible for actually measuring
+ * latency and providing an ISnitchMeasurement implementation back to this 
class.
  */
-public class DynamicEndpointSnitch extends AbstractEndpointSnitch 
implements ILatencySubscriber, DynamicEndpointSnitchMBean
+public abstract class DynamicEndpointSnitch extends AbstractEndpointSnitch 
implements ILatencySubscriber, DynamicEndpointSnitchMBean
 {
-private static final boolean USE_SEVERITY = 
!Boolean.getBoolean("cassandra.ignore_dynamic_snitch_severity");
+private static final Logger logger = 
LoggerFactory.getLogger(DynamicEndpointSnitch.class);
+
+// Latency measurement and ranking. The samples contain latency 
measurements and the scores are used for ranking
+protected boolean registered = false;
+protected static final boolean USE_SEVERITY = 
!Boolean.getBoolean("cassandra.ignore_dynamic_snitch_severity");
+protected volatile Map scores = new 
HashMap<>();
 
-private static final double ALPHA = 0.75; // set to 0.75 to make EDS 
more biased to towards the newer values
-private static final int WINDOW_SIZE = 100;
+protected final Map samples 
= new ConcurrentHashMap<>();
 
-private volatile int dynamicUpdateInterval = 
DatabaseDescriptor.getDynamicUpdateInterval();
-private volatile int dynamicResetInterval = 
DatabaseDescriptor.getDynamicResetInterval();
-private volatile double dynamicBadnessThreshold = 
DatabaseDescriptor.getDynamicBadnessThreshold();
+// Latency probe functionality for actively probing endpoints that we 
haven't measured recently but are ranking
+public static final long MAX_PROBE_INTERVAL_MS = 
Long.getLong("cassandra.dynamic_snitch_max_probe_interval_ms", 60 * 10 * 1000L);
+public static final long MIN_PROBE_INTERVAL_MS = 
Long.getLong("cassandra.dynamic_snitch_min_probe_interval_ms", 60 * 1000L) ;
+// The probe rate is set later when configuration is read in 
applyConfigChanges
+protected static final RateLimiter probeRateLimiter = 
RateLimiter.create(1);
+protected static final DebuggableScheduledThreadPoolExecutor 
latencyProbe

[GitHub] cassandra pull request #283: CASSANDRA-14459: DynamicEndpointSnitch should n...

2018-12-10 Thread aweisberg
Github user aweisberg commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/283#discussion_r240308095
  
--- Diff: src/java/org/apache/cassandra/net/ResponseVerbHandler.java ---
@@ -49,7 +49,7 @@ public void doVerb(MessageIn message, int id)
 else
 {
 //TODO: Should we add latency only in success cases?
--- End diff --

Do you think we should remove this TODO while are here?


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #283: CASSANDRA-14459: DynamicEndpointSnitch should n...

2018-12-10 Thread aweisberg
Github user aweisberg commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/283#discussion_r239884316
  
--- Diff: src/java/org/apache/cassandra/net/MessagingService.java ---
@@ -729,20 +730,26 @@ void markTimeout(InetAddressAndPort addr)
 /**
  * Track latency information for the dynamic snitch
  *
- * @param cb  the callback associated with this message -- this 
lets us know if it's a message type we're interested in
- * @param address the host that replied to the message
- * @param latency
+ * @param cbthe callback associated with this message -- 
this lets us know if it's a message type we're interested in
+ * @param address   the host that replied to the message
+ * @param latencyMicros the number of microseconds to record for this 
host
  */
-public void maybeAddLatency(IAsyncCallback cb, InetAddressAndPort 
address, long latency)
+public void maybeAddLatency(IAsyncCallback cb, InetAddressAndPort 
address, long latencyMicros)
 {
-if (cb.isLatencyForSnitch())
-addLatency(address, latency);
+if (cb.latencyMeasurementType() != LatencyMeasurementType.IGNORE)
+addLatency(address, latencyMicros, 
cb.latencyMeasurementType());
 }
 
-public void addLatency(InetAddressAndPort address, long latency)
+// Used on the local read path
+public void addLatency(InetAddressAndPort address, long latencyMicros)
+{
+addLatency(address, latencyMicros, LatencyMeasurementType.READ);
+}
+
+private void addLatency(InetAddressAndPort address, long 
latencyMicros, LatencyMeasurementType usable)
--- End diff --

Is it usable or is it a type? It's not a boolean anymore.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #283: CASSANDRA-14459: DynamicEndpointSnitch should n...

2018-12-10 Thread aweisberg
Github user aweisberg commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/283#discussion_r239900862
  
--- Diff: src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java 
---
@@ -21,133 +21,505 @@
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.*;
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 
-import com.codahale.metrics.ExponentiallyDecayingReservoir;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.RateLimiter;
+import com.google.common.util.concurrent.SettableFuture;
 
-import com.codahale.metrics.Snapshot;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import 
org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
 import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.exceptions.RequestFailureReason;
 import org.apache.cassandra.gms.ApplicationState;
 import org.apache.cassandra.gms.EndpointState;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.gms.VersionedValue;
+import 
org.apache.cassandra.locator.dynamicsnitch.DynamicEndpointSnitchHistogram;
+import org.apache.cassandra.net.IAsyncCallbackWithFailure;
+import org.apache.cassandra.net.LatencyMeasurementType;
+import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.PingMessage;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.MBeanWrapper;
 
+import static org.apache.cassandra.net.MessagingService.Verb.PING;
+import static 
org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType.LARGE_MESSAGE;
+import static 
org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType.SMALL_MESSAGE;
+
+
 /**
  * A dynamic snitch that sorts endpoints by latency with an adapted phi 
failure detector
+ * Note that the subclasses (e.g. {@link DynamicEndpointSnitchHistogram}) 
are responsible for actually measuring
+ * latency and providing an ISnitchMeasurement implementation back to this 
class.
  */
-public class DynamicEndpointSnitch extends AbstractEndpointSnitch 
implements ILatencySubscriber, DynamicEndpointSnitchMBean
+public abstract class DynamicEndpointSnitch extends AbstractEndpointSnitch 
implements ILatencySubscriber, DynamicEndpointSnitchMBean
 {
-private static final boolean USE_SEVERITY = 
!Boolean.getBoolean("cassandra.ignore_dynamic_snitch_severity");
+private static final Logger logger = 
LoggerFactory.getLogger(DynamicEndpointSnitch.class);
+
+// Latency measurement and ranking. The samples contain latency 
measurements and the scores are used for ranking
+protected boolean registered = false;
+protected static final boolean USE_SEVERITY = 
!Boolean.getBoolean("cassandra.ignore_dynamic_snitch_severity");
+protected volatile Map scores = new 
HashMap<>();
 
-private static final double ALPHA = 0.75; // set to 0.75 to make EDS 
more biased to towards the newer values
-private static final int WINDOW_SIZE = 100;
+protected final Map samples 
= new ConcurrentHashMap<>();
 
-private volatile int dynamicUpdateInterval = 
DatabaseDescriptor.getDynamicUpdateInterval();
-private volatile int dynamicResetInterval = 
DatabaseDescriptor.getDynamicResetInterval();
-private volatile double dynamicBadnessThreshold = 
DatabaseDescriptor.getDynamicBadnessThreshold();
+// Latency probe functionality for actively probing endpoints that we 
haven't measured recently but are ranking
+public static final long MAX_PROBE_INTERVAL_MS = 
Long.getLong("cassandra.dynamic_snitch_max_probe_interval_ms", 60 * 10 * 1000L);
+public static final long MIN_PROBE_INTERVAL_MS = 
Long.getLong("cassandra.dynamic_snitch_min_probe_interval_ms", 60 * 1000L) ;
+// The probe rate is set later when configuration is read in 
applyConfigChanges
+protected static final RateLimiter probeRateLimiter = 
RateLimiter.create(1);
+protected static final DebuggableScheduledThreadPoolExecutor 
latencyProbe

[GitHub] cassandra pull request #283: CASSANDRA-14459: DynamicEndpointSnitch should n...

2018-12-10 Thread aweisberg
Github user aweisberg commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/283#discussion_r239906662
  
--- Diff: src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java 
---
@@ -21,133 +21,505 @@
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.*;
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 
-import com.codahale.metrics.ExponentiallyDecayingReservoir;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.RateLimiter;
+import com.google.common.util.concurrent.SettableFuture;
 
-import com.codahale.metrics.Snapshot;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import 
org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
 import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.exceptions.RequestFailureReason;
 import org.apache.cassandra.gms.ApplicationState;
 import org.apache.cassandra.gms.EndpointState;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.gms.VersionedValue;
+import 
org.apache.cassandra.locator.dynamicsnitch.DynamicEndpointSnitchHistogram;
+import org.apache.cassandra.net.IAsyncCallbackWithFailure;
+import org.apache.cassandra.net.LatencyMeasurementType;
+import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.PingMessage;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.MBeanWrapper;
 
+import static org.apache.cassandra.net.MessagingService.Verb.PING;
+import static 
org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType.LARGE_MESSAGE;
+import static 
org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType.SMALL_MESSAGE;
+
+
 /**
  * A dynamic snitch that sorts endpoints by latency with an adapted phi 
failure detector
+ * Note that the subclasses (e.g. {@link DynamicEndpointSnitchHistogram}) 
are responsible for actually measuring
+ * latency and providing an ISnitchMeasurement implementation back to this 
class.
  */
-public class DynamicEndpointSnitch extends AbstractEndpointSnitch 
implements ILatencySubscriber, DynamicEndpointSnitchMBean
+public abstract class DynamicEndpointSnitch extends AbstractEndpointSnitch 
implements ILatencySubscriber, DynamicEndpointSnitchMBean
 {
-private static final boolean USE_SEVERITY = 
!Boolean.getBoolean("cassandra.ignore_dynamic_snitch_severity");
+private static final Logger logger = 
LoggerFactory.getLogger(DynamicEndpointSnitch.class);
+
+// Latency measurement and ranking. The samples contain latency 
measurements and the scores are used for ranking
+protected boolean registered = false;
+protected static final boolean USE_SEVERITY = 
!Boolean.getBoolean("cassandra.ignore_dynamic_snitch_severity");
+protected volatile Map scores = new 
HashMap<>();
 
-private static final double ALPHA = 0.75; // set to 0.75 to make EDS 
more biased to towards the newer values
-private static final int WINDOW_SIZE = 100;
+protected final Map samples 
= new ConcurrentHashMap<>();
 
-private volatile int dynamicUpdateInterval = 
DatabaseDescriptor.getDynamicUpdateInterval();
-private volatile int dynamicResetInterval = 
DatabaseDescriptor.getDynamicResetInterval();
-private volatile double dynamicBadnessThreshold = 
DatabaseDescriptor.getDynamicBadnessThreshold();
+// Latency probe functionality for actively probing endpoints that we 
haven't measured recently but are ranking
+public static final long MAX_PROBE_INTERVAL_MS = 
Long.getLong("cassandra.dynamic_snitch_max_probe_interval_ms", 60 * 10 * 1000L);
+public static final long MIN_PROBE_INTERVAL_MS = 
Long.getLong("cassandra.dynamic_snitch_min_probe_interval_ms", 60 * 1000L) ;
+// The probe rate is set later when configuration is read in 
applyConfigChanges
+protected static final RateLimiter probeRateLimiter = 
RateLimiter.create(1);
+protected static final DebuggableScheduledThreadPoolExecutor 
latencyProbe

[GitHub] cassandra pull request #283: CASSANDRA-14459: DynamicEndpointSnitch should n...

2018-12-10 Thread aweisberg
Github user aweisberg commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/283#discussion_r239899040
  
--- Diff: src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java 
---
@@ -21,133 +21,505 @@
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.*;
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 
-import com.codahale.metrics.ExponentiallyDecayingReservoir;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.RateLimiter;
+import com.google.common.util.concurrent.SettableFuture;
 
-import com.codahale.metrics.Snapshot;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import 
org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
 import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.exceptions.RequestFailureReason;
 import org.apache.cassandra.gms.ApplicationState;
 import org.apache.cassandra.gms.EndpointState;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.gms.VersionedValue;
+import 
org.apache.cassandra.locator.dynamicsnitch.DynamicEndpointSnitchHistogram;
+import org.apache.cassandra.net.IAsyncCallbackWithFailure;
+import org.apache.cassandra.net.LatencyMeasurementType;
+import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.PingMessage;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.MBeanWrapper;
 
+import static org.apache.cassandra.net.MessagingService.Verb.PING;
+import static 
org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType.LARGE_MESSAGE;
+import static 
org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType.SMALL_MESSAGE;
+
+
 /**
  * A dynamic snitch that sorts endpoints by latency with an adapted phi 
failure detector
+ * Note that the subclasses (e.g. {@link DynamicEndpointSnitchHistogram}) 
are responsible for actually measuring
+ * latency and providing an ISnitchMeasurement implementation back to this 
class.
  */
-public class DynamicEndpointSnitch extends AbstractEndpointSnitch 
implements ILatencySubscriber, DynamicEndpointSnitchMBean
+public abstract class DynamicEndpointSnitch extends AbstractEndpointSnitch 
implements ILatencySubscriber, DynamicEndpointSnitchMBean
 {
-private static final boolean USE_SEVERITY = 
!Boolean.getBoolean("cassandra.ignore_dynamic_snitch_severity");
+private static final Logger logger = 
LoggerFactory.getLogger(DynamicEndpointSnitch.class);
+
+// Latency measurement and ranking. The samples contain latency 
measurements and the scores are used for ranking
+protected boolean registered = false;
+protected static final boolean USE_SEVERITY = 
!Boolean.getBoolean("cassandra.ignore_dynamic_snitch_severity");
+protected volatile Map scores = new 
HashMap<>();
 
-private static final double ALPHA = 0.75; // set to 0.75 to make EDS 
more biased to towards the newer values
-private static final int WINDOW_SIZE = 100;
+protected final Map samples 
= new ConcurrentHashMap<>();
 
-private volatile int dynamicUpdateInterval = 
DatabaseDescriptor.getDynamicUpdateInterval();
-private volatile int dynamicResetInterval = 
DatabaseDescriptor.getDynamicResetInterval();
-private volatile double dynamicBadnessThreshold = 
DatabaseDescriptor.getDynamicBadnessThreshold();
+// Latency probe functionality for actively probing endpoints that we 
haven't measured recently but are ranking
+public static final long MAX_PROBE_INTERVAL_MS = 
Long.getLong("cassandra.dynamic_snitch_max_probe_interval_ms", 60 * 10 * 1000L);
+public static final long MIN_PROBE_INTERVAL_MS = 
Long.getLong("cassandra.dynamic_snitch_min_probe_interval_ms", 60 * 1000L) ;
+// The probe rate is set later when configuration is read in 
applyConfigChanges
+protected static final RateLimiter probeRateLimiter = 
RateLimiter.create(1);
+protected static final DebuggableScheduledThreadPoolExecutor 
latencyProbe

[GitHub] cassandra pull request #283: CASSANDRA-14459: DynamicEndpointSnitch should n...

2018-12-10 Thread aweisberg
Github user aweisberg commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/283#discussion_r240288561
  
--- Diff: src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java 
---
@@ -21,133 +21,505 @@
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.*;
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 
-import com.codahale.metrics.ExponentiallyDecayingReservoir;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.RateLimiter;
+import com.google.common.util.concurrent.SettableFuture;
 
-import com.codahale.metrics.Snapshot;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import 
org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
 import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.exceptions.RequestFailureReason;
 import org.apache.cassandra.gms.ApplicationState;
 import org.apache.cassandra.gms.EndpointState;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.gms.VersionedValue;
+import 
org.apache.cassandra.locator.dynamicsnitch.DynamicEndpointSnitchHistogram;
+import org.apache.cassandra.net.IAsyncCallbackWithFailure;
+import org.apache.cassandra.net.LatencyMeasurementType;
+import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.PingMessage;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.MBeanWrapper;
 
+import static org.apache.cassandra.net.MessagingService.Verb.PING;
+import static 
org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType.LARGE_MESSAGE;
+import static 
org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType.SMALL_MESSAGE;
+
+
 /**
  * A dynamic snitch that sorts endpoints by latency with an adapted phi 
failure detector
+ * Note that the subclasses (e.g. {@link DynamicEndpointSnitchHistogram}) 
are responsible for actually measuring
+ * latency and providing an ISnitchMeasurement implementation back to this 
class.
  */
-public class DynamicEndpointSnitch extends AbstractEndpointSnitch 
implements ILatencySubscriber, DynamicEndpointSnitchMBean
+public abstract class DynamicEndpointSnitch extends AbstractEndpointSnitch 
implements ILatencySubscriber, DynamicEndpointSnitchMBean
 {
-private static final boolean USE_SEVERITY = 
!Boolean.getBoolean("cassandra.ignore_dynamic_snitch_severity");
+private static final Logger logger = 
LoggerFactory.getLogger(DynamicEndpointSnitch.class);
+
+// Latency measurement and ranking. The samples contain latency 
measurements and the scores are used for ranking
+protected boolean registered = false;
+protected static final boolean USE_SEVERITY = 
!Boolean.getBoolean("cassandra.ignore_dynamic_snitch_severity");
+protected volatile Map scores = new 
HashMap<>();
 
-private static final double ALPHA = 0.75; // set to 0.75 to make EDS 
more biased to towards the newer values
-private static final int WINDOW_SIZE = 100;
+protected final Map samples 
= new ConcurrentHashMap<>();
 
-private volatile int dynamicUpdateInterval = 
DatabaseDescriptor.getDynamicUpdateInterval();
-private volatile int dynamicResetInterval = 
DatabaseDescriptor.getDynamicResetInterval();
-private volatile double dynamicBadnessThreshold = 
DatabaseDescriptor.getDynamicBadnessThreshold();
+// Latency probe functionality for actively probing endpoints that we 
haven't measured recently but are ranking
+public static final long MAX_PROBE_INTERVAL_MS = 
Long.getLong("cassandra.dynamic_snitch_max_probe_interval_ms", 60 * 10 * 1000L);
+public static final long MIN_PROBE_INTERVAL_MS = 
Long.getLong("cassandra.dynamic_snitch_min_probe_interval_ms", 60 * 1000L) ;
+// The probe rate is set later when configuration is read in 
applyConfigChanges
+protected static final RateLimiter probeRateLimiter = 
RateLimiter.create(1);
+protected static final DebuggableScheduledThreadPoolExecutor 
latencyProbe

[GitHub] cassandra pull request #283: CASSANDRA-14459: DynamicEndpointSnitch should n...

2018-12-10 Thread aweisberg
Github user aweisberg commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/283#discussion_r240311669
  
--- Diff: 
test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java ---
@@ -18,35 +18,99 @@
 
 package org.apache.cassandra.locator;
 
-import java.io.IOException;
+import java.net.UnknownHostException;
--- End diff --

My code coverage is a bit wonky today, but is sendPingMessage tested? Is it 
possible?


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #283: CASSANDRA-14459: DynamicEndpointSnitch should n...

2018-12-10 Thread aweisberg
Github user aweisberg commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/283#discussion_r239953888
  
--- Diff: src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java 
---
@@ -21,133 +21,505 @@
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.*;
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 
-import com.codahale.metrics.ExponentiallyDecayingReservoir;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.RateLimiter;
+import com.google.common.util.concurrent.SettableFuture;
 
-import com.codahale.metrics.Snapshot;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import 
org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
 import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.exceptions.RequestFailureReason;
 import org.apache.cassandra.gms.ApplicationState;
 import org.apache.cassandra.gms.EndpointState;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.gms.VersionedValue;
+import 
org.apache.cassandra.locator.dynamicsnitch.DynamicEndpointSnitchHistogram;
+import org.apache.cassandra.net.IAsyncCallbackWithFailure;
+import org.apache.cassandra.net.LatencyMeasurementType;
+import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.PingMessage;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.MBeanWrapper;
 
+import static org.apache.cassandra.net.MessagingService.Verb.PING;
+import static 
org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType.LARGE_MESSAGE;
+import static 
org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType.SMALL_MESSAGE;
+
+
 /**
  * A dynamic snitch that sorts endpoints by latency with an adapted phi 
failure detector
+ * Note that the subclasses (e.g. {@link DynamicEndpointSnitchHistogram}) 
are responsible for actually measuring
+ * latency and providing an ISnitchMeasurement implementation back to this 
class.
  */
-public class DynamicEndpointSnitch extends AbstractEndpointSnitch 
implements ILatencySubscriber, DynamicEndpointSnitchMBean
+public abstract class DynamicEndpointSnitch extends AbstractEndpointSnitch 
implements ILatencySubscriber, DynamicEndpointSnitchMBean
 {
-private static final boolean USE_SEVERITY = 
!Boolean.getBoolean("cassandra.ignore_dynamic_snitch_severity");
+private static final Logger logger = 
LoggerFactory.getLogger(DynamicEndpointSnitch.class);
+
+// Latency measurement and ranking. The samples contain latency 
measurements and the scores are used for ranking
+protected boolean registered = false;
+protected static final boolean USE_SEVERITY = 
!Boolean.getBoolean("cassandra.ignore_dynamic_snitch_severity");
+protected volatile Map scores = new 
HashMap<>();
 
-private static final double ALPHA = 0.75; // set to 0.75 to make EDS 
more biased to towards the newer values
-private static final int WINDOW_SIZE = 100;
+protected final Map samples 
= new ConcurrentHashMap<>();
 
-private volatile int dynamicUpdateInterval = 
DatabaseDescriptor.getDynamicUpdateInterval();
-private volatile int dynamicResetInterval = 
DatabaseDescriptor.getDynamicResetInterval();
-private volatile double dynamicBadnessThreshold = 
DatabaseDescriptor.getDynamicBadnessThreshold();
+// Latency probe functionality for actively probing endpoints that we 
haven't measured recently but are ranking
+public static final long MAX_PROBE_INTERVAL_MS = 
Long.getLong("cassandra.dynamic_snitch_max_probe_interval_ms", 60 * 10 * 1000L);
+public static final long MIN_PROBE_INTERVAL_MS = 
Long.getLong("cassandra.dynamic_snitch_min_probe_interval_ms", 60 * 1000L) ;
+// The probe rate is set later when configuration is read in 
applyConfigChanges
+protected static final RateLimiter probeRateLimiter = 
RateLimiter.create(1);
+protected static final DebuggableScheduledThreadPoolExecutor 
latencyProbe

[GitHub] cassandra pull request #283: CASSANDRA-14459: DynamicEndpointSnitch should n...

2018-12-10 Thread aweisberg
Github user aweisberg commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/283#discussion_r240308899
  
--- Diff: src/java/org/apache/cassandra/service/StorageServiceMBean.java ---
@@ -463,21 +463,33 @@
 public Map getViewBuildStatusesWithPort(String 
keyspace, String view);
 
 /**
- * Change endpointsnitch class and dynamic-ness (and dynamic 
attributes) at runtime.
+ * Change endpointsnitch class and dynamicsnitch class at runtime.
  *
  * This method is used to change the snitch implementation and/or 
dynamic snitch parameters.
  * If {@code epSnitchClassName} is specified, it will configure a new 
snitch instance and make it a
- * 'dynamic snitch' if {@code dynamic} is specified and {@code true}.
+ * 'dynamic snitch' if {@code dynamicSnitchClassName} is not null.
  *
- * The parameters {@code dynamicUpdateInterval}, {@code 
dynamicResetInterval} and {@code dynamicBadnessThreshold}
+ * The parameters {@code dynamicUpdateInterval}, {@code 
dynamicSampleUpdateInterval} and {@code dynamicBadnessThreshold}
  * can be specified individually to update the parameters of the 
dynamic snitch during runtime.
  *
- * @param epSnitchClassNamethe canonical path name for a class 
implementing IEndpointSnitch
- * @param dynamic  boolean that decides whether 
dynamicsnitch is used or not - only valid, if {@code epSnitchClassName} is 
specified
- * @param dynamicUpdateIntervalinteger, in ms (defaults to the 
value configured in cassandra.yaml, which defaults to 100)
- * @param dynamicResetInterval integer, in ms (defaults to the 
value configured in cassandra.yaml, which defaults to 600,000)
- * @param dynamicBadnessThreshold  double, (defaults to the value 
configured in cassandra.yaml, which defaults to 0.0)
+ * @param epSnitchClassNamethe canonical path name for a 
class implementing IEndpointSnitch or null.
+ * If null then no snitch change 
is made. If an empty string the existing
+ * Snitch class is used.
+ * @param dynamicSnitchClassName   the canonical path name for a 
class extending DynamicEndpointSnitch. If
+ * null while epSnitchClassName is 
not null, this turns off dynamic snitching;
+ * otherwise just settings are 
updated. If an empty string is passed then
+ * dynamic snitching is kept with 
the default implementation.
+ * @param dynamicUpdateIntervalinteger, in ms (defaults to the 
value configured in cassandra.yaml, which defaults to 100)
+ * @param dynamicSampleUpdateInterval  integer, in ms (defaults to the 
value configured in cassandra.yaml, which defaults to 1,000)
--- End diff --

Documenting defaults in too many locations means the docs are likely to be 
obsoleted. Such as when you fix the bug with the probe rate.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #283: CASSANDRA-14459: DynamicEndpointSnitch should n...

2018-12-10 Thread aweisberg
Github user aweisberg commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/283#discussion_r239939483
  
--- Diff: src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java 
---
@@ -21,133 +21,505 @@
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.*;
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 
-import com.codahale.metrics.ExponentiallyDecayingReservoir;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.RateLimiter;
+import com.google.common.util.concurrent.SettableFuture;
 
-import com.codahale.metrics.Snapshot;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import 
org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
 import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.exceptions.RequestFailureReason;
 import org.apache.cassandra.gms.ApplicationState;
 import org.apache.cassandra.gms.EndpointState;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.gms.VersionedValue;
+import 
org.apache.cassandra.locator.dynamicsnitch.DynamicEndpointSnitchHistogram;
+import org.apache.cassandra.net.IAsyncCallbackWithFailure;
+import org.apache.cassandra.net.LatencyMeasurementType;
+import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.PingMessage;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.MBeanWrapper;
 
+import static org.apache.cassandra.net.MessagingService.Verb.PING;
+import static 
org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType.LARGE_MESSAGE;
+import static 
org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType.SMALL_MESSAGE;
+
+
 /**
  * A dynamic snitch that sorts endpoints by latency with an adapted phi 
failure detector
+ * Note that the subclasses (e.g. {@link DynamicEndpointSnitchHistogram}) 
are responsible for actually measuring
+ * latency and providing an ISnitchMeasurement implementation back to this 
class.
  */
-public class DynamicEndpointSnitch extends AbstractEndpointSnitch 
implements ILatencySubscriber, DynamicEndpointSnitchMBean
+public abstract class DynamicEndpointSnitch extends AbstractEndpointSnitch 
implements ILatencySubscriber, DynamicEndpointSnitchMBean
 {
-private static final boolean USE_SEVERITY = 
!Boolean.getBoolean("cassandra.ignore_dynamic_snitch_severity");
+private static final Logger logger = 
LoggerFactory.getLogger(DynamicEndpointSnitch.class);
+
+// Latency measurement and ranking. The samples contain latency 
measurements and the scores are used for ranking
+protected boolean registered = false;
+protected static final boolean USE_SEVERITY = 
!Boolean.getBoolean("cassandra.ignore_dynamic_snitch_severity");
+protected volatile Map scores = new 
HashMap<>();
 
-private static final double ALPHA = 0.75; // set to 0.75 to make EDS 
more biased to towards the newer values
-private static final int WINDOW_SIZE = 100;
+protected final Map samples 
= new ConcurrentHashMap<>();
 
-private volatile int dynamicUpdateInterval = 
DatabaseDescriptor.getDynamicUpdateInterval();
-private volatile int dynamicResetInterval = 
DatabaseDescriptor.getDynamicResetInterval();
-private volatile double dynamicBadnessThreshold = 
DatabaseDescriptor.getDynamicBadnessThreshold();
+// Latency probe functionality for actively probing endpoints that we 
haven't measured recently but are ranking
+public static final long MAX_PROBE_INTERVAL_MS = 
Long.getLong("cassandra.dynamic_snitch_max_probe_interval_ms", 60 * 10 * 1000L);
+public static final long MIN_PROBE_INTERVAL_MS = 
Long.getLong("cassandra.dynamic_snitch_min_probe_interval_ms", 60 * 1000L) ;
+// The probe rate is set later when configuration is read in 
applyConfigChanges
+protected static final RateLimiter probeRateLimiter = 
RateLimiter.create(1);
+protected static final DebuggableScheduledThreadPoolExecutor 
latencyProbe

[GitHub] cassandra pull request #283: CASSANDRA-14459: DynamicEndpointSnitch should n...

2018-12-10 Thread aweisberg
Github user aweisberg commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/283#discussion_r240299652
  
--- Diff: src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java 
---
@@ -214,19 +600,27 @@ public String getDatacenter(InetAddressAndPort 
endpoint)
 {
 if (subsnitchScore > (sortedScoreIterator.next() * (1.0 + 
dynamicBadnessThreshold)))
 {
-return sortedByProximityWithScore(address, replicas);
+return sortedByProximityWithScore(address, replicas, 
aliasedScores);
 }
 }
 
 return replicas;
 }
 
 // Compare endpoints given an immutable snapshot of the scores
-private int compareEndpoints(InetAddressAndPort target, Replica a1, 
Replica a2, Map scores)
+public int compareEndpoints(InetAddressAndPort target, Replica a1, 
Replica a2)
--- End diff --

This comment seems wrong and the entire scenario is a little confusing?
Why can't this be removed?


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #283: CASSANDRA-14459: DynamicEndpointSnitch should n...

2018-12-10 Thread aweisberg
Github user aweisberg commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/283#discussion_r240311277
  
--- Diff: 
test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java ---
@@ -18,35 +18,99 @@
 
 package org.apache.cassandra.locator;
 
-import java.io.IOException;
+import java.net.UnknownHostException;
--- End diff --

It looks like shutting down the DES is covered a little in the settings 
test, but it would be good if it validated somewhere the futures are cleaned up 
and there are no scheduled/running tasks in the executors.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #283: CASSANDRA-14459: DynamicEndpointSnitch should n...

2018-12-10 Thread aweisberg
Github user aweisberg commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/283#discussion_r239884211
  
--- Diff: src/java/org/apache/cassandra/net/MessagingService.java ---
@@ -729,20 +730,26 @@ void markTimeout(InetAddressAndPort addr)
 /**
  * Track latency information for the dynamic snitch
  *
- * @param cb  the callback associated with this message -- this 
lets us know if it's a message type we're interested in
- * @param address the host that replied to the message
- * @param latency
+ * @param cbthe callback associated with this message -- 
this lets us know if it's a message type we're interested in
+ * @param address   the host that replied to the message
+ * @param latencyMicros the number of microseconds to record for this 
host
  */
-public void maybeAddLatency(IAsyncCallback cb, InetAddressAndPort 
address, long latency)
+public void maybeAddLatency(IAsyncCallback cb, InetAddressAndPort 
address, long latencyMicros)
 {
-if (cb.isLatencyForSnitch())
-addLatency(address, latency);
+if (cb.latencyMeasurementType() != LatencyMeasurementType.IGNORE)
+addLatency(address, latencyMicros, 
cb.latencyMeasurementType());
 }
 
-public void addLatency(InetAddressAndPort address, long latency)
+// Used on the local read path
+public void addLatency(InetAddressAndPort address, long latencyMicros)
--- End diff --

It's a bit implicit that this is exclusively for adding read latency?


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #283: CASSANDRA-14459: DynamicEndpointSnitch should n...

2018-12-10 Thread aweisberg
Github user aweisberg commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/283#discussion_r239897722
  
--- Diff: src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java 
---
@@ -21,133 +21,505 @@
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.*;
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 
-import com.codahale.metrics.ExponentiallyDecayingReservoir;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.RateLimiter;
+import com.google.common.util.concurrent.SettableFuture;
 
-import com.codahale.metrics.Snapshot;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import 
org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
 import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.exceptions.RequestFailureReason;
 import org.apache.cassandra.gms.ApplicationState;
 import org.apache.cassandra.gms.EndpointState;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.gms.VersionedValue;
+import 
org.apache.cassandra.locator.dynamicsnitch.DynamicEndpointSnitchHistogram;
+import org.apache.cassandra.net.IAsyncCallbackWithFailure;
+import org.apache.cassandra.net.LatencyMeasurementType;
+import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.PingMessage;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.MBeanWrapper;
 
+import static org.apache.cassandra.net.MessagingService.Verb.PING;
+import static 
org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType.LARGE_MESSAGE;
+import static 
org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType.SMALL_MESSAGE;
+
+
 /**
  * A dynamic snitch that sorts endpoints by latency with an adapted phi 
failure detector
+ * Note that the subclasses (e.g. {@link DynamicEndpointSnitchHistogram}) 
are responsible for actually measuring
+ * latency and providing an ISnitchMeasurement implementation back to this 
class.
  */
-public class DynamicEndpointSnitch extends AbstractEndpointSnitch 
implements ILatencySubscriber, DynamicEndpointSnitchMBean
+public abstract class DynamicEndpointSnitch extends AbstractEndpointSnitch 
implements ILatencySubscriber, DynamicEndpointSnitchMBean
 {
-private static final boolean USE_SEVERITY = 
!Boolean.getBoolean("cassandra.ignore_dynamic_snitch_severity");
+private static final Logger logger = 
LoggerFactory.getLogger(DynamicEndpointSnitch.class);
+
+// Latency measurement and ranking. The samples contain latency 
measurements and the scores are used for ranking
+protected boolean registered = false;
--- End diff --

Should probably have a more specific similar to how mbeanRegistered 
specifies what is registered.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #292: provide ZSTD compression support to cassandra (...

2018-12-10 Thread alexott
Github user alexott commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/292#discussion_r240125195
  
--- Diff: src/java/org/apache/cassandra/io/compress/ZSTDCompressor.java ---
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.compress;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.github.luben.zstd.Zstd;
+
+public class ZSTDCompressor implements ICompressor
+{
+public static final int FAST_COMPRESSION = 1; // fastest compression 
time
+public static final int DEFAULT_LEVEL = 3;
+public static final int BEST_COMPRESSION = 22;// very good compression 
ratio
+private static final ZSTDCompressor instance = new ZSTDCompressor();
+private static final String COMPRESSION_LEVEL_OPTION_NAME = 
"compression_level";
+@VisibleForTesting
+protected static final int compressionLevel = DEFAULT_LEVEL;
+
+public static ZSTDCompressor create(Map 
compressionOptions)
+{
+
validateCompressionLevel(parseCompressionLevelOption(compressionOptions));
+return instance;
+}
+
+private static void validateCompressionLevel(int compressionLevel)
+{
+if (compressionLevel < FAST_COMPRESSION || compressionLevel > 
BEST_COMPRESSION)
+{
+throw new IllegalArgumentException(
+"ZSTD compression_level " + 
Integer.toString(compressionLevel) + " invalid ",
+null
+);
+}
+}
+
+private static int parseCompressionLevelOption(Map 
compressionOptions)
+{
+return 
Integer.parseInt(compressionOptions.getOrDefault(COMPRESSION_LEVEL_OPTION_NAME,
+
Integer.toString(DEFAULT_LEVEL)));
+}
+
+@Override
+public int initialCompressedBufferLength(int chunkLength)
+{
+return (int)Zstd.compressBound(chunkLength);
+}
+
+@Override
+public int uncompress(byte[] input, int inputOffset, int inputLength, 
byte[] output, int outputOffset)
+throws IOException
+{
+long decompressSize = Zstd.decompressByteArray(output,
+   outputOffset,
+   output.length - 
outputOffset,
+   input,
+   inputOffset,
+   inputLength);
+if (Zstd.isError(decompressSize))
+{
+throw new IOException("ZSTD uncompress failed with error 
reason " + Zstd.getErrorName(decompressSize));
+}
+
+return (int) decompressSize;
+}
+
+@Override
+public void uncompress(ByteBuffer input, ByteBuffer output) throws 
IOException
+{
+Zstd.decompress(output, input);
+}
+
+@Override
+public void compress(ByteBuffer input, ByteBuffer output) throws 
IOException
+{
+if (!input.isDirect())
+{
+throw new IllegalArgumentException("input must be a direct 
buffer");
+}
+
+if (!output.isDirect())
+{
+throw new IllegalArgumentException("output must be a direct 
buffer");
+}
+long compressedSize = Zstd.compressDirectByteBuffer(output,
+
output.position(),
+output.limit() 
- output.position(),
+inp

[GitHub] cassandra pull request #292: provide ZSTD compression support to cassandra (...

2018-12-10 Thread alexott
Github user alexott commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/292#discussion_r240125007
  
--- Diff: src/java/org/apache/cassandra/io/compress/ZSTDCompressor.java ---
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.compress;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.github.luben.zstd.Zstd;
+
+public class ZSTDCompressor implements ICompressor
+{
+public static final int FAST_COMPRESSION = 1; // fastest compression 
time
+public static final int DEFAULT_LEVEL = 3;
+public static final int BEST_COMPRESSION = 22;// very good compression 
ratio
+private static final ZSTDCompressor instance = new ZSTDCompressor();
+private static final String COMPRESSION_LEVEL_OPTION_NAME = 
"compression_level";
+@VisibleForTesting
+protected static final int compressionLevel = DEFAULT_LEVEL;
+
+public static ZSTDCompressor create(Map 
compressionOptions)
+{
+
validateCompressionLevel(parseCompressionLevelOption(compressionOptions));
+return instance;
+}
+
+private static void validateCompressionLevel(int compressionLevel)
+{
+if (compressionLevel < FAST_COMPRESSION || compressionLevel > 
BEST_COMPRESSION)
+{
+throw new IllegalArgumentException(
+"ZSTD compression_level " + 
Integer.toString(compressionLevel) + " invalid ",
+null
+);
+}
+}
+
+private static int parseCompressionLevelOption(Map 
compressionOptions)
+{
+return 
Integer.parseInt(compressionOptions.getOrDefault(COMPRESSION_LEVEL_OPTION_NAME,
+
Integer.toString(DEFAULT_LEVEL)));
+}
+
+@Override
+public int initialCompressedBufferLength(int chunkLength)
+{
+return (int)Zstd.compressBound(chunkLength);
+}
+
+@Override
+public int uncompress(byte[] input, int inputOffset, int inputLength, 
byte[] output, int outputOffset)
+throws IOException
+{
+long decompressSize = Zstd.decompressByteArray(output,
+   outputOffset,
+   output.length - 
outputOffset,
+   input,
+   inputOffset,
+   inputLength);
+if (Zstd.isError(decompressSize))
+{
+throw new IOException("ZSTD uncompress failed with error 
reason " + Zstd.getErrorName(decompressSize));
+}
+
+return (int) decompressSize;
+}
+
+@Override
+public void uncompress(ByteBuffer input, ByteBuffer output) throws 
IOException
+{
+Zstd.decompress(output, input);
--- End diff --

[decompress 
functions](https://github.com/luben/zstd-jni/blob/master/src/main/java/com/github/luben/zstd/Zstd.java#L189)
 doesn't throw exceptions - it should be checked the same way as above - with 
`Zstd.isError`.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #292: provide ZSTD compression support to cassandra (...

2018-12-10 Thread alexott
Github user alexott commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/292#discussion_r240124308
  
--- Diff: src/java/org/apache/cassandra/io/compress/ZSTDCompressor.java ---
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.compress;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.github.luben.zstd.Zstd;
+
+public class ZSTDCompressor implements ICompressor
+{
+public static final int FAST_COMPRESSION = 1; // fastest compression 
time
+public static final int DEFAULT_LEVEL = 3;
+public static final int BEST_COMPRESSION = 22;// very good compression 
ratio
+private static final ZSTDCompressor instance = new ZSTDCompressor();
+private static final String COMPRESSION_LEVEL_OPTION_NAME = 
"compression_level";
+@VisibleForTesting
+protected static final int compressionLevel = DEFAULT_LEVEL;
+
+public static ZSTDCompressor create(Map 
compressionOptions)
+{
+
validateCompressionLevel(parseCompressionLevelOption(compressionOptions));
+return instance;
+}
+
+private static void validateCompressionLevel(int compressionLevel)
+{
+if (compressionLevel < FAST_COMPRESSION || compressionLevel > 
BEST_COMPRESSION)
--- End diff --

ZStd also supports negative compression levels that are fast but at cost of 
compression:

> The library supports regular compression levels from 1 up to 
ZSTD_maxCLevel(),
  which is currently 22. Levels >= 20, labeled `--ultra`, should be used 
with
  caution, as they require more memory. The library also offers negative
  compression levels, which extend the range of speed vs. ratio preferences.
  The lower the level, the faster the speed (at the cost of compression).

I think that it makes sense to add them as well. I think that the maximal 
negative value is 17...


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #292: provide ZSTD compression support to cassandra (...

2018-12-10 Thread alexott
Github user alexott commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/292#discussion_r240123941
  
--- Diff: src/java/org/apache/cassandra/io/compress/ZSTDCompressor.java ---
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.compress;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.github.luben.zstd.Zstd;
+
+public class ZSTDCompressor implements ICompressor
+{
+public static final int FAST_COMPRESSION = 1; // fastest compression 
time
+public static final int DEFAULT_LEVEL = 3;
+public static final int BEST_COMPRESSION = 22;// very good compression 
ratio
+private static final ZSTDCompressor instance = new ZSTDCompressor();
+private static final String COMPRESSION_LEVEL_OPTION_NAME = 
"compression_level";
+@VisibleForTesting
+protected static final int compressionLevel = DEFAULT_LEVEL;
+
+public static ZSTDCompressor create(Map 
compressionOptions)
+{
+
validateCompressionLevel(parseCompressionLevelOption(compressionOptions));
+return instance;
--- End diff --

Yes, right now the configuration parameter isn't used at all...
Regarding the ConcurrentHasMap - it makes sense only when [explicit 
context](http://facebook.github.io/zstd/zstd_manual.html#Chapter5) is used, but 
this functionality isn't exposed by JNI bindings yet.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #292: provide ZSTD compression support to cassandra (...

2018-12-09 Thread jolynch
Github user jolynch commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/292#discussion_r240091916
  
--- Diff: build.xml ---
@@ -412,6 +412,7 @@
   
   
   
+  
--- End diff --

Nit: might be worth grabbing the latest version that was recently released. 
I think latest is 1.3.7-2 or 3.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #292: provide ZSTD compression support to cassandra (...

2018-12-09 Thread jolynch
Github user jolynch commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/292#discussion_r240095058
  
--- Diff: src/java/org/apache/cassandra/io/compress/ZSTDCompressor.java ---
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.compress;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.github.luben.zstd.Zstd;
+
+public class ZSTDCompressor implements ICompressor
+{
+public static final int FAST_COMPRESSION = 1; // fastest compression 
time
+public static final int DEFAULT_LEVEL = 3;
+public static final int BEST_COMPRESSION = 22;// very good compression 
ratio
+private static final ZSTDCompressor instance = new ZSTDCompressor();
+private static final String COMPRESSION_LEVEL_OPTION_NAME = 
"compression_level";
+@VisibleForTesting
+protected static final int compressionLevel = DEFAULT_LEVEL;
+
+public static ZSTDCompressor create(Map 
compressionOptions)
+{
+
validateCompressionLevel(parseCompressionLevelOption(compressionOptions));
+return instance;
+}
+
+private static void validateCompressionLevel(int compressionLevel)
+{
+if (compressionLevel < FAST_COMPRESSION || compressionLevel > 
BEST_COMPRESSION)
+{
+throw new IllegalArgumentException(
+"ZSTD compression_level " + 
Integer.toString(compressionLevel) + " invalid ",
+null
+);
+}
+}
+
+private static int parseCompressionLevelOption(Map 
compressionOptions)
+{
+return 
Integer.parseInt(compressionOptions.getOrDefault(COMPRESSION_LEVEL_OPTION_NAME,
+
Integer.toString(DEFAULT_LEVEL)));
+}
+
+@Override
+public int initialCompressedBufferLength(int chunkLength)
+{
+return (int)Zstd.compressBound(chunkLength);
+}
+
+@Override
+public int uncompress(byte[] input, int inputOffset, int inputLength, 
byte[] output, int outputOffset)
+throws IOException
+{
+long decompressSize = Zstd.decompressByteArray(output,
+   outputOffset,
+   output.length - 
outputOffset,
+   input,
+   inputOffset,
+   inputLength);
+if (Zstd.isError(decompressSize))
+{
+throw new IOException("ZSTD uncompress failed with error 
reason " + Zstd.getErrorName(decompressSize));
+}
+
+return (int) decompressSize;
+}
+
+@Override
+public void uncompress(ByteBuffer input, ByteBuffer output) throws 
IOException
+{
+Zstd.decompress(output, input);
--- End diff --

I think this needs to catch the 
[`RuntimeException`](https://github.com/luben/zstd-jni/blob/master/src/main/java/com/github/luben/zstd/Zstd.java#L895)
 and rethrow as an `IOException`.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #292: provide ZSTD compression support to cassandra (...

2018-12-09 Thread jolynch
Github user jolynch commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/292#discussion_r240093947
  
--- Diff: src/java/org/apache/cassandra/io/compress/ZSTDCompressor.java ---
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.compress;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.github.luben.zstd.Zstd;
+
+public class ZSTDCompressor implements ICompressor
+{
+public static final int FAST_COMPRESSION = 1; // fastest compression 
time
+public static final int DEFAULT_LEVEL = 3;
+public static final int BEST_COMPRESSION = 22;// very good compression 
ratio
+private static final ZSTDCompressor instance = new ZSTDCompressor();
+private static final String COMPRESSION_LEVEL_OPTION_NAME = 
"compression_level";
+@VisibleForTesting
+protected static final int compressionLevel = DEFAULT_LEVEL;
+
+public static ZSTDCompressor create(Map 
compressionOptions)
+{
+
validateCompressionLevel(parseCompressionLevelOption(compressionOptions));
+return instance;
--- End diff --

If we want to support different sstables/column families having different 
levels I think that this needs to be something closer to what the LZ4 
compressor does where it creates a `ConcurrentHashMap` of instances keyed by 
parameter rather than just one instance. In practice I _think_ this will lead 
to every compressor having the default level (might want to TDD and ensure that 
different parameters yield different instances).


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #292: provide ZSTD compression support to cassandra (...

2018-12-09 Thread jolynch
Github user jolynch commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/292#discussion_r240094348
  
--- Diff: src/java/org/apache/cassandra/io/compress/ZSTDCompressor.java ---
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.compress;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.github.luben.zstd.Zstd;
+
+public class ZSTDCompressor implements ICompressor
+{
+public static final int FAST_COMPRESSION = 1; // fastest compression 
time
+public static final int DEFAULT_LEVEL = 3;
+public static final int BEST_COMPRESSION = 22;// very good compression 
ratio
+private static final ZSTDCompressor instance = new ZSTDCompressor();
+private static final String COMPRESSION_LEVEL_OPTION_NAME = 
"compression_level";
+@VisibleForTesting
+protected static final int compressionLevel = DEFAULT_LEVEL;
+
+public static ZSTDCompressor create(Map 
compressionOptions)
+{
+
validateCompressionLevel(parseCompressionLevelOption(compressionOptions));
+return instance;
+}
+
+private static void validateCompressionLevel(int compressionLevel)
+{
+if (compressionLevel < FAST_COMPRESSION || compressionLevel > 
BEST_COMPRESSION)
+{
+throw new IllegalArgumentException(
+"ZSTD compression_level " + 
Integer.toString(compressionLevel) + " invalid ",
+null
+);
+}
+}
+
+private static int parseCompressionLevelOption(Map 
compressionOptions)
+{
+return 
Integer.parseInt(compressionOptions.getOrDefault(COMPRESSION_LEVEL_OPTION_NAME,
+
Integer.toString(DEFAULT_LEVEL)));
+}
+
+@Override
+public int initialCompressedBufferLength(int chunkLength)
+{
+return (int)Zstd.compressBound(chunkLength);
+}
+
+@Override
+public int uncompress(byte[] input, int inputOffset, int inputLength, 
byte[] output, int outputOffset)
+throws IOException
+{
+long decompressSize = Zstd.decompressByteArray(output,
+   outputOffset,
+   output.length - 
outputOffset,
+   input,
+   inputOffset,
+   inputLength);
+if (Zstd.isError(decompressSize))
+{
+throw new IOException("ZSTD uncompress failed with error 
reason " + Zstd.getErrorName(decompressSize));
+}
+
+return (int) decompressSize;
+}
+
+@Override
+public void uncompress(ByteBuffer input, ByteBuffer output) throws 
IOException
+{
+Zstd.decompress(output, input);
+}
+
+@Override
+public void compress(ByteBuffer input, ByteBuffer output) throws 
IOException
+{
+if (!input.isDirect())
+{
+throw new IllegalArgumentException("input must be a direct 
buffer");
+}
+
+if (!output.isDirect())
+{
+throw new IllegalArgumentException("output must be a direct 
buffer");
+}
+long compressedSize = Zstd.compressDirectByteBuffer(output,
+
output.position(),
+output.limit() 
- output.position(),
+inp

[GitHub] cassandra pull request #292: provide ZSTD compression support to cassandra (...

2018-12-09 Thread jolynch
Github user jolynch commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/292#discussion_r240092674
  
--- Diff: src/java/org/apache/cassandra/io/compress/ZSTDCompressor.java ---
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.io.compress;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.github.luben.zstd.Zstd;
+
+public class ZSTDCompressor implements ICompressor
+{
+public static final int FAST_COMPRESSION = 1; // fastest compression 
time
+public static final int DEFAULT_LEVEL = 3;
+public static final int BEST_COMPRESSION = 22;// very good compression 
ratio
+private static final ZSTDCompressor instance = new ZSTDCompressor();
+private static final String COMPRESSION_LEVEL_OPTION_NAME = 
"compression_level";
+@VisibleForTesting
+protected static final int compressionLevel = DEFAULT_LEVEL;
+
+public static ZSTDCompressor create(Map 
compressionOptions)
+{
+
validateCompressionLevel(parseCompressionLevelOption(compressionOptions));
+return instance;
+}
+
+private static void validateCompressionLevel(int compressionLevel)
+{
+if (compressionLevel < FAST_COMPRESSION || compressionLevel > 
BEST_COMPRESSION)
+{
+throw new IllegalArgumentException(
+"ZSTD compression_level " + 
Integer.toString(compressionLevel) + " invalid ",
+null
+);
+}
+}
+
+private static int parseCompressionLevelOption(Map 
compressionOptions)
+{
+return 
Integer.parseInt(compressionOptions.getOrDefault(COMPRESSION_LEVEL_OPTION_NAME,
+
Integer.toString(DEFAULT_LEVEL)));
+}
+
+@Override
+public int initialCompressedBufferLength(int chunkLength)
+{
+return (int)Zstd.compressBound(chunkLength);
+}
+
+@Override
+public int uncompress(byte[] input, int inputOffset, int inputLength, 
byte[] output, int outputOffset)
+throws IOException
+{
+long decompressSize = Zstd.decompressByteArray(output,
+   outputOffset,
+   output.length - 
outputOffset,
+   input,
+   inputOffset,
+   inputLength);
+if (Zstd.isError(decompressSize))
+{
+throw new IOException("ZSTD uncompress failed with error 
reason " + Zstd.getErrorName(decompressSize));
+}
+
+return (int) decompressSize;
+}
+
+@Override
+public void uncompress(ByteBuffer input, ByteBuffer output) throws 
IOException
+{
+Zstd.decompress(output, input);
+}
+
+@Override
+public void compress(ByteBuffer input, ByteBuffer output) throws 
IOException
--- End diff --

I think you may be able to replace this with the [``compress(ByteBuffer 
dstBuf, ByteBuffer srcBuf, int 
level)``](https://github.com/luben/zstd-jni/blob/master/src/main/java/com/github/luben/zstd/Zstd.java#L535)
 method, you just have to make sure to pass the arguments in the opposite order 
(output comes first) and re-throw the `RuntimeException` as an `IOException`.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #283: CASSANDRA-14459: DynamicEndpointSnitch should n...

2018-12-03 Thread jolynch
Github user jolynch commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/283#discussion_r238172698
  
--- Diff: src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java 
---
@@ -154,31 +326,203 @@ private void registerMBean()
 
 public void close()
 {
-updateSchedular.cancel(false);
-resetSchedular.cancel(false);
+if (updateScoresScheduler != null)
+updateScoresScheduler.cancel(false);
+if (updateSamplesScheduler != null)
+updateSamplesScheduler.cancel(false);
--- End diff --

Ok, I added a cancel+wait method to do this, and added in some 
synchronization around `close`/`open` that I believe is is needed to ensure 
that this method works 100% of the time during snitch updates.

This actually fixes a bug I believe exists in trunk where we just hoped 
that we didn't get an exception on Snitch construction in 
https://github.com/apache/cassandra/blob/f3609995c09570d523527d9bd0fd69c2bc65d986/src/java/org/apache/cassandra/service/StorageService.java#L4975-L4986


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #283: CASSANDRA-14459: DynamicEndpointSnitch should n...

2018-12-03 Thread jolynch
Github user jolynch commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/283#discussion_r238169787
  
--- Diff: src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java 
---
@@ -154,31 +326,203 @@ private void registerMBean()
 
 public void close()
 {
-updateSchedular.cancel(false);
-resetSchedular.cancel(false);
+if (updateScoresScheduler != null)
+updateScoresScheduler.cancel(false);
+if (updateSamplesScheduler != null)
+updateSamplesScheduler.cancel(false);
+
+for (AnnotatedMeasurement measurement : samples.values())
+{
+if (measurement.probeFuture != null)
+measurement.probeFuture.cancel(false);
+
+measurement.millisSinceLastMeasure.set(0);
+measurement.millisSinceLastRequest.set(MAX_PROBE_INTERVAL_MS);
+measurement.probeTimerMillis = 0;
+}
 
 MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
 try
 {
-mbs.unregisterMBean(new ObjectName(mbeanName));
+if (mbeanRegistered)
+mbs.unregisterMBean(new ObjectName(mbeanName));
 }
 catch (Exception e)
 {
 throw new RuntimeException(e);
 }
 }
 
+/**
+ * Background task running on the samples dictionary. The default 
implementation sends latency probes (PING)
+ * messages to explore nodes that we have not received timings for 
recently but have ranked in
+ * {@link DynamicEndpointSnitch#sortedByProximity(InetAddressAndPort, 
ReplicaCollection)}.
+ */
+protected void updateSamples()
+{
+// Split calculation of probe timers from sending probes for 
testability
+calculateProbes(samples, dynamicLatencyProbeInterval);
+
+if (!StorageService.instance.isGossipActive())
--- End diff --

This needs to be here so that the bookeeping clocks keep ticking. Basically 
we do the calculation but don't send any probes (respecting a disabled gossip 
indicating we shouldn't be sending messages).


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #283: CASSANDRA-14459: DynamicEndpointSnitch should n...

2018-12-02 Thread jolynch
Github user jolynch commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/283#discussion_r238167792
  
--- Diff: src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java 
---
@@ -154,20 +228,101 @@ private void registerMBean()
 
 public void close()
 {
-updateSchedular.cancel(false);
-resetSchedular.cancel(false);
+if (updateScheduler != null)
+updateScheduler.cancel(false);
+if (latencyProbeScheduler != null)
+latencyProbeScheduler.cancel(false);
 
 MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
 try
 {
-mbs.unregisterMBean(new ObjectName(mbeanName));
+if (mbeanRegistered)
+mbs.unregisterMBean(new ObjectName(mbeanName));
 }
 catch (Exception e)
 {
 throw new RuntimeException(e);
 }
 }
 
+/**
+ * Determines if latency probes need to be sent, and potentially sends 
a single latency probe per invocation
+ */
+protected void maybeSendLatencyProbe()
+{
+long currentTimeNS = System.nanoTime();
+markNextAllowedProbeGenerationTime(currentTimeNS);
+
+Optional needsProbe = 
latencyProbeNeeded(currentTimeNS);
+needsProbe.ifPresent(this::sendPingMessageToPeer);
+}
+
+/**
+ * Determines which peers need latency at a particular time. Note that 
this takes currentTimeNS for testability
+ * of this code path.
+ * @param currentTimeNS The current time to evaluate. Used mostly for 
testing.
+ * @return An Optional that if present contains a host to probe.
+ */
+@VisibleForTesting
+Optional latencyProbeNeeded(long currentTimeNS) {
+if (currentProbePosition >= latencyProbeSequence.size() && 
(currentTimeNS > nextProbeGenerationTime))
+{
+nextProbeGenerationTime = nextAllowedProbeGenerationTime;
+latencyProbeSequence.clear();
+
+// Delegate to the subclass to actually figure out what the 
probe sequence should be
+updateLatencyProbeSequence(latencyProbeSequence);
+
+if (latencyProbeSequence.size() > 0)
+Collections.shuffle(latencyProbeSequence);
+
+currentProbePosition = 0;
+}
+
+if (currentProbePosition < latencyProbeSequence.size())
+{
+try
+{
+return 
Optional.of(latencyProbeSequence.get(currentProbePosition++));
+}
+catch (IndexOutOfBoundsException ignored) {}
+}
+
+return Optional.empty();
+}
+
+private void sendPingMessageToPeer(InetAddressAndPort to)
+{
+logger.trace("Sending a small and large PingMessage to {}", to);
+IAsyncCallback latencyProbeHandler = new IAsyncCallback()
--- End diff --

In the latest version I kept the changed API but I can remove it you think 
it'll make the PR easier to review/merge.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #283: CASSANDRA-14459: DynamicEndpointSnitch should n...

2018-11-30 Thread jolynch
Github user jolynch commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/283#discussion_r237979844
  
--- Diff: src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java 
---
@@ -154,31 +326,203 @@ private void registerMBean()
 
 public void close()
 {
-updateSchedular.cancel(false);
-resetSchedular.cancel(false);
+if (updateScoresScheduler != null)
+updateScoresScheduler.cancel(false);
+if (updateSamplesScheduler != null)
+updateSamplesScheduler.cancel(false);
+
+for (AnnotatedMeasurement measurement : samples.values())
+{
+if (measurement.probeFuture != null)
+measurement.probeFuture.cancel(false);
+
+measurement.millisSinceLastMeasure.set(0);
+measurement.millisSinceLastRequest.set(MAX_PROBE_INTERVAL_MS);
+measurement.probeTimerMillis = 0;
+}
 
 MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
 try
 {
-mbs.unregisterMBean(new ObjectName(mbeanName));
+if (mbeanRegistered)
+mbs.unregisterMBean(new ObjectName(mbeanName));
 }
 catch (Exception e)
 {
 throw new RuntimeException(e);
 }
 }
 
+/**
+ * Background task running on the samples dictionary. The default 
implementation sends latency probes (PING)
+ * messages to explore nodes that we have not received timings for 
recently but have ranked in
+ * {@link DynamicEndpointSnitch#sortedByProximity(InetAddressAndPort, 
ReplicaCollection)}.
+ */
+protected void updateSamples()
+{
+// Split calculation of probe timers from sending probes for 
testability
+calculateProbes(samples, dynamicLatencyProbeInterval);
+
+if (!StorageService.instance.isGossipActive())
+return;
+
+schedulePings(samples);
+}
+
+/**
+ * This method mutates the passed AnnotatedMeasurements to implement 
capped exponential backoff per endpoint.
+ *
+ * The algorithm is as follows:
+ * 1. All samples get their millisSinceLastMeasure and 
millisSinceLastRequest fields
+ *incremented by the passed interval
+ * 2. Any recently requested (ranked) endpoints that have not been 
measured recently (e.g. because the snitch
+ *has sent them no traffic) get probes with exponential backoff.
+ *
+ * The backoff is capped at MAX_PROBE_INTERVAL_MS. Furthermore the 
probes are stopped after
+ * MAX_PROBE_INTERVAL_MS of no ranking requests as well.
+ *
+ * At the end of this method, any passed AnnotatedMeasurements that 
need latency probes will have non zero
+ * probeTimerMillis members set.
+ */
+@VisibleForTesting
+static void calculateProbes(Map samples, long intervalMillis) {
+for (Map.Entry entry: 
samples.entrySet())
+{
+if 
(entry.getKey().equals(FBUtilities.getBroadcastAddressAndPort()))
+continue;
+
+AnnotatedMeasurement measurement = entry.getValue();
+long lastMeasure = 
measurement.millisSinceLastMeasure.getAndAdd(intervalMillis);
+long lastRequest = 
measurement.millisSinceLastRequest.getAndAdd(intervalMillis);
+
+if (lastMeasure >= MIN_PROBE_INTERVAL_MS && lastRequest < 
MAX_PROBE_INTERVAL_MS)
+{
+if (measurement.probeTimerMillis == 0)
+{
+measurement.probeTimerMillis = intervalMillis;
--- End diff --

It might not be null if we had scheduled a 10 minute probe and then got a 
real reading.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #283: CASSANDRA-14459: DynamicEndpointSnitch should n...

2018-11-30 Thread jolynch
Github user jolynch commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/283#discussion_r237978698
  
--- Diff: src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java 
---
@@ -154,31 +326,203 @@ private void registerMBean()
 
 public void close()
 {
-updateSchedular.cancel(false);
-resetSchedular.cancel(false);
+if (updateScoresScheduler != null)
+updateScoresScheduler.cancel(false);
+if (updateSamplesScheduler != null)
+updateSamplesScheduler.cancel(false);
+
+for (AnnotatedMeasurement measurement : samples.values())
+{
+if (measurement.probeFuture != null)
+measurement.probeFuture.cancel(false);
+
+measurement.millisSinceLastMeasure.set(0);
+measurement.millisSinceLastRequest.set(MAX_PROBE_INTERVAL_MS);
+measurement.probeTimerMillis = 0;
+}
 
 MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
 try
 {
-mbs.unregisterMBean(new ObjectName(mbeanName));
+if (mbeanRegistered)
+mbs.unregisterMBean(new ObjectName(mbeanName));
 }
 catch (Exception e)
 {
 throw new RuntimeException(e);
 }
 }
 
+/**
+ * Background task running on the samples dictionary. The default 
implementation sends latency probes (PING)
+ * messages to explore nodes that we have not received timings for 
recently but have ranked in
+ * {@link DynamicEndpointSnitch#sortedByProximity(InetAddressAndPort, 
ReplicaCollection)}.
+ */
+protected void updateSamples()
+{
+// Split calculation of probe timers from sending probes for 
testability
+calculateProbes(samples, dynamicLatencyProbeInterval);
+
+if (!StorageService.instance.isGossipActive())
+return;
+
+schedulePings(samples);
+}
+
+/**
+ * This method mutates the passed AnnotatedMeasurements to implement 
capped exponential backoff per endpoint.
+ *
+ * The algorithm is as follows:
+ * 1. All samples get their millisSinceLastMeasure and 
millisSinceLastRequest fields
+ *incremented by the passed interval
+ * 2. Any recently requested (ranked) endpoints that have not been 
measured recently (e.g. because the snitch
+ *has sent them no traffic) get probes with exponential backoff.
+ *
+ * The backoff is capped at MAX_PROBE_INTERVAL_MS. Furthermore the 
probes are stopped after
+ * MAX_PROBE_INTERVAL_MS of no ranking requests as well.
+ *
+ * At the end of this method, any passed AnnotatedMeasurements that 
need latency probes will have non zero
+ * probeTimerMillis members set.
+ */
+@VisibleForTesting
+static void calculateProbes(Map samples, long intervalMillis) {
+for (Map.Entry entry: 
samples.entrySet())
+{
+if 
(entry.getKey().equals(FBUtilities.getBroadcastAddressAndPort()))
+continue;
+
+AnnotatedMeasurement measurement = entry.getValue();
+long lastMeasure = 
measurement.millisSinceLastMeasure.getAndAdd(intervalMillis);
+long lastRequest = 
measurement.millisSinceLastRequest.getAndAdd(intervalMillis);
+
+if (lastMeasure >= MIN_PROBE_INTERVAL_MS && lastRequest < 
MAX_PROBE_INTERVAL_MS)
+{
+if (measurement.probeTimerMillis == 0)
+{
+measurement.probeTimerMillis = intervalMillis;
+}
+else if (measurement.probeFuture != null && 
measurement.probeFuture.isDone())
+{
+measurement.probeTimerMillis = 
Math.min(MAX_PROBE_INTERVAL_MS, measurement.probeTimerMillis * 2);
+}
+}
+else
+{
+measurement.probeTimerMillis = 0;
+}
+}
+}
+
+@VisibleForTesting
+void schedulePings(Map 
samples)
+{
+for (Map.Entry entry: 
samples.entrySet())
+{
+AnnotatedMeasurement measurement = entry.getValue();
+long delay = measurement.probeTimerMillis;
+long millisSinceLastRequest = 
measurement.millisSinceLastRequest.get();
+
+if (millisSinceLastRequest > MAX_PROBE_INTERVAL_MS && 
!Gossiper.instance.isAlive(entry.getKey()))
+{
+samples.remove(entry.getKey());
+}
+
+if (delay > 0 && millisSinc

[GitHub] cassandra pull request #283: CASSANDRA-14459: DynamicEndpointSnitch should n...

2018-11-30 Thread jolynch
Github user jolynch commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/283#discussion_r237971740
  
--- Diff: src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java 
---
@@ -154,31 +326,203 @@ private void registerMBean()
 
 public void close()
 {
-updateSchedular.cancel(false);
-resetSchedular.cancel(false);
+if (updateScoresScheduler != null)
+updateScoresScheduler.cancel(false);
+if (updateSamplesScheduler != null)
+updateSamplesScheduler.cancel(false);
+
+for (AnnotatedMeasurement measurement : samples.values())
+{
+if (measurement.probeFuture != null)
+measurement.probeFuture.cancel(false);
+
+measurement.millisSinceLastMeasure.set(0);
+measurement.millisSinceLastRequest.set(MAX_PROBE_INTERVAL_MS);
+measurement.probeTimerMillis = 0;
+}
 
 MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
 try
 {
-mbs.unregisterMBean(new ObjectName(mbeanName));
+if (mbeanRegistered)
+mbs.unregisterMBean(new ObjectName(mbeanName));
 }
 catch (Exception e)
 {
 throw new RuntimeException(e);
 }
 }
 
+/**
+ * Background task running on the samples dictionary. The default 
implementation sends latency probes (PING)
+ * messages to explore nodes that we have not received timings for 
recently but have ranked in
+ * {@link DynamicEndpointSnitch#sortedByProximity(InetAddressAndPort, 
ReplicaCollection)}.
+ */
+protected void updateSamples()
+{
+// Split calculation of probe timers from sending probes for 
testability
+calculateProbes(samples, dynamicLatencyProbeInterval);
+
+if (!StorageService.instance.isGossipActive())
+return;
+
+schedulePings(samples);
+}
+
+/**
+ * This method mutates the passed AnnotatedMeasurements to implement 
capped exponential backoff per endpoint.
+ *
+ * The algorithm is as follows:
+ * 1. All samples get their millisSinceLastMeasure and 
millisSinceLastRequest fields
+ *incremented by the passed interval
+ * 2. Any recently requested (ranked) endpoints that have not been 
measured recently (e.g. because the snitch
+ *has sent them no traffic) get probes with exponential backoff.
+ *
+ * The backoff is capped at MAX_PROBE_INTERVAL_MS. Furthermore the 
probes are stopped after
+ * MAX_PROBE_INTERVAL_MS of no ranking requests as well.
+ *
+ * At the end of this method, any passed AnnotatedMeasurements that 
need latency probes will have non zero
+ * probeTimerMillis members set.
+ */
+@VisibleForTesting
+static void calculateProbes(Map samples, long intervalMillis) {
+for (Map.Entry entry: 
samples.entrySet())
+{
+if 
(entry.getKey().equals(FBUtilities.getBroadcastAddressAndPort()))
+continue;
+
+AnnotatedMeasurement measurement = entry.getValue();
+long lastMeasure = 
measurement.millisSinceLastMeasure.getAndAdd(intervalMillis);
--- End diff --

The messaging rate is so low let's punt on the metric for now? I'm happy to 
add it as a follow up patch but I'd like to get the pluggability and probes in 
first.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #283: CASSANDRA-14459: DynamicEndpointSnitch should n...

2018-11-30 Thread jolynch
Github user jolynch commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/283#discussion_r237961057
  
--- Diff: src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java 
---
@@ -154,31 +326,203 @@ private void registerMBean()
 
 public void close()
 {
-updateSchedular.cancel(false);
-resetSchedular.cancel(false);
+if (updateScoresScheduler != null)
+updateScoresScheduler.cancel(false);
+if (updateSamplesScheduler != null)
+updateSamplesScheduler.cancel(false);
+
+for (AnnotatedMeasurement measurement : samples.values())
+{
+if (measurement.probeFuture != null)
+measurement.probeFuture.cancel(false);
+
+measurement.millisSinceLastMeasure.set(0);
+measurement.millisSinceLastRequest.set(MAX_PROBE_INTERVAL_MS);
+measurement.probeTimerMillis = 0;
+}
 
 MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
 try
 {
-mbs.unregisterMBean(new ObjectName(mbeanName));
+if (mbeanRegistered)
+mbs.unregisterMBean(new ObjectName(mbeanName));
 }
 catch (Exception e)
 {
 throw new RuntimeException(e);
 }
 }
 
+/**
+ * Background task running on the samples dictionary. The default 
implementation sends latency probes (PING)
+ * messages to explore nodes that we have not received timings for 
recently but have ranked in
+ * {@link DynamicEndpointSnitch#sortedByProximity(InetAddressAndPort, 
ReplicaCollection)}.
+ */
+protected void updateSamples()
+{
+// Split calculation of probe timers from sending probes for 
testability
+calculateProbes(samples, dynamicLatencyProbeInterval);
+
+if (!StorageService.instance.isGossipActive())
+return;
+
+schedulePings(samples);
+}
+
+/**
+ * This method mutates the passed AnnotatedMeasurements to implement 
capped exponential backoff per endpoint.
+ *
+ * The algorithm is as follows:
+ * 1. All samples get their millisSinceLastMeasure and 
millisSinceLastRequest fields
+ *incremented by the passed interval
+ * 2. Any recently requested (ranked) endpoints that have not been 
measured recently (e.g. because the snitch
+ *has sent them no traffic) get probes with exponential backoff.
+ *
+ * The backoff is capped at MAX_PROBE_INTERVAL_MS. Furthermore the 
probes are stopped after
+ * MAX_PROBE_INTERVAL_MS of no ranking requests as well.
+ *
+ * At the end of this method, any passed AnnotatedMeasurements that 
need latency probes will have non zero
+ * probeTimerMillis members set.
+ */
+@VisibleForTesting
+static void calculateProbes(Map samples, long intervalMillis) {
+for (Map.Entry entry: 
samples.entrySet())
+{
+if 
(entry.getKey().equals(FBUtilities.getBroadcastAddressAndPort()))
+continue;
+
+AnnotatedMeasurement measurement = entry.getValue();
+long lastMeasure = 
measurement.millisSinceLastMeasure.getAndAdd(intervalMillis);
+long lastRequest = 
measurement.millisSinceLastRequest.getAndAdd(intervalMillis);
+
+if (lastMeasure >= MIN_PROBE_INTERVAL_MS && lastRequest < 
MAX_PROBE_INTERVAL_MS)
+{
+if (measurement.probeTimerMillis == 0)
+{
+measurement.probeTimerMillis = intervalMillis;
+}
+else if (measurement.probeFuture != null && 
measurement.probeFuture.isDone())
+{
+measurement.probeTimerMillis = 
Math.min(MAX_PROBE_INTERVAL_MS, measurement.probeTimerMillis * 2);
+}
+}
+else
+{
+measurement.probeTimerMillis = 0;
+}
+}
+}
+
+@VisibleForTesting
+void schedulePings(Map 
samples)
+{
+for (Map.Entry entry: 
samples.entrySet())
+{
+AnnotatedMeasurement measurement = entry.getValue();
+long delay = measurement.probeTimerMillis;
+long millisSinceLastRequest = 
measurement.millisSinceLastRequest.get();
+
+if (millisSinceLastRequest > MAX_PROBE_INTERVAL_MS && 
!Gossiper.instance.isAlive(entry.getKey()))
+{
+samples.remove(entry.getKey());
+}
+
+if (delay > 0 && millisSinc

[GitHub] cassandra pull request #283: CASSANDRA-14459: DynamicEndpointSnitch should n...

2018-11-30 Thread jolynch
Github user jolynch commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/283#discussion_r237960721
  
--- Diff: src/java/org/apache/cassandra/service/StorageService.java ---
@@ -4940,42 +4940,61 @@ public int getDynamicUpdateInterval()
 return DatabaseDescriptor.getDynamicUpdateInterval();
 }
 
+@Override
 public void updateSnitch(String epSnitchClassName, Boolean dynamic, 
Integer dynamicUpdateInterval, Integer dynamicResetInterval, Double 
dynamicBadnessThreshold) throws ClassNotFoundException
+{
+updateSnitch(epSnitchClassName, dynamic ? "" : null, 
dynamicUpdateInterval, null, dynamicBadnessThreshold);
+}
+
+@Override
+public void updateSnitch(String epSnitchClassName, String 
dynamicSnitchClassName, Integer dynamicUpdateInterval, Integer 
dynamicSampleUpdateInterval, Double dynamicBadnessThreshold) throws 
ClassNotFoundException
 {
 // apply dynamic snitch configuration
 if (dynamicUpdateInterval != null)
 
DatabaseDescriptor.setDynamicUpdateInterval(dynamicUpdateInterval);
-if (dynamicResetInterval != null)
-
DatabaseDescriptor.setDynamicResetInterval(dynamicResetInterval);
 if (dynamicBadnessThreshold != null)
 
DatabaseDescriptor.setDynamicBadnessThreshold(dynamicBadnessThreshold);
+if (dynamicSampleUpdateInterval != null)
--- End diff --

That's what I figured, also makes testing a lot easier. I added a bunch of 
tests of the various swaps as well.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #283: CASSANDRA-14459: DynamicEndpointSnitch should n...

2018-11-30 Thread jolynch
Github user jolynch commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/283#discussion_r237960560
  
--- Diff: src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java 
---
@@ -154,20 +310,155 @@ private void registerMBean()
 
 public void close()
 {
-updateSchedular.cancel(false);
-resetSchedular.cancel(false);
+if (updateScheduler != null)
+updateScheduler.cancel(false);
+if (latencyProbeScheduler != null)
+latencyProbeScheduler.cancel(false);
 
 MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
 try
 {
-mbs.unregisterMBean(new ObjectName(mbeanName));
+if (mbeanRegistered)
+mbs.unregisterMBean(new ObjectName(mbeanName));
 }
 catch (Exception e)
 {
 throw new RuntimeException(e);
 }
 }
 
+/**
+ * Determines if latency probes need to be sent, and potentially sends 
a single latency probe per invocation
+ */
+protected void maybeSendLatencyProbe()
+{
+if (!StorageService.instance.isGossipActive())
+return;
+
+currentProbePosition = latencyProbeNeeded(samples, 
latencyProbeSequence, currentProbePosition);
+
+if (currentProbePosition < latencyProbeSequence.size())
+{
+try
+{
+InetAddressAndPort peer = 
latencyProbeSequence.get(currentProbePosition);
+sendPingMessageToPeer(peer);
+}
+catch (IndexOutOfBoundsException ignored) {}
+}
+}
+
+/**
+ * This method (unfortunately) mutates a lot of state so that it 
doesn't create any garbage and only iterates the
+ * sample map a single time . In particular on every call we:
+ *  - increment every sample's intervalsSinceLastMeasure
+ *
+ * When probes should be generated we also potentially:
+ *  - reset sample's recentlyRequested that have reached the 
"CONSTANT" phase of probing (10 minutes by default)
+ *  - add any InetAddressAndPort's that need probing to the provided 
endpointsToProbe
+ *  - shuffle the endpointsToProbe
+ *
+ * If there are probes to be sent, this method short circuits all 
generation of probes and just returns the
+ * passed probePosition plus one.
+ * @return The position of the passed endpointsToProbe that should be 
probed.
+ */
+@VisibleForTesting
+int latencyProbeNeeded(Map 
samples,
+ List 
endpointsToProbe, int probePosition) {
+boolean shouldGenerateProbes = (probePosition >= 
endpointsToProbe.size());
+
+if (shouldGenerateProbes)
+{
+endpointsToProbe.clear();
+samples.keySet().retainAll(Gossiper.instance.getLiveMembers());
+}
+
+// We have to increment intervalsSinceLastMeasure regardless of if 
we generate probes
+for (Map.Entry entry: 
samples.entrySet())
+{
+AnnotatedMeasurement measurement = entry.getValue();
+long intervalsSinceLastMeasure = 
measurement.intervalsSinceLastMeasure.getAndIncrement();
--- End diff --

I believe that I've addressed this feedback, although I didn't end up 
looking at the futures in receiveTiming since that's performance critical 
(we'll maybe send an extra probe, not a big deal imo)


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #283: CASSANDRA-14459: DynamicEndpointSnitch should n...

2018-11-30 Thread jolynch
Github user jolynch commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/283#discussion_r237958792
  
--- Diff: 
src/java/org/apache/cassandra/locator/dynamicsnitch/DynamicEndpointSnitchEMA.java
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.locator.dynamicsnitch;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.cassandra.locator.DynamicEndpointSnitch;
+import org.apache.cassandra.locator.IEndpointSnitch;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.metrics.ExponentialMovingAverage;
+
+
+/**
+ * A dynamic snitching implementation that uses Exponentially Moving 
Averages as a low pass filter to prefer
+ * or de-prefer hosts
+ *
+ * This implementation generates a few orders of magnitude less garbage 
than histograms and is close to 10x faster,
+ * but as it is not a Median LPF (it is an Average LPF), it is more 
vulnerable to noise. This may be acceptable but
+ * given the significant change in behavior this is not the default in 4.0
+ */
+public class DynamicEndpointSnitchEMA extends DynamicEndpointSnitch
+{
+// A ~10 sample EMA heavily weighted to the past values to minimize 
noise
+private static final double EMA_ALPHA = 0.10;
+
+protected static class EMASnitchMeasurement implements 
ISnitchMeasurement
+{
+public final ExponentialMovingAverage avg;
+
+EMASnitchMeasurement(double initial)
+{
+avg = new ExponentialMovingAverage(EMA_ALPHA, initial);
+}
+
+@Override
+public void sample(long value)
+{
+avg.update(value);
+}
+
+@Override
+public double measure()
+{
+return avg.getAvg();
+}
+
+@Override
+public Iterable measurements()
+{
+return Collections.singletonList(avg.getAvg());
+}
+}
+
+// Called via reflection
+public DynamicEndpointSnitchEMA(IEndpointSnitch snitch)
+{
+this(snitch, "ema");
+}
+
+public DynamicEndpointSnitchEMA(IEndpointSnitch snitch, String 
instance)
+{
+super(snitch, instance);
+}
+
+@Override
+protected ISnitchMeasurement measurementImpl(long initialValue)
+{
+return new EMASnitchMeasurement(initialValue);
+}
+
+/**
+ * Unlike the Histogram implementation, calling this measure method is 
reasonably cheap (doesn't require a
+ * Snapshot or anything) so we can skip a round of iterations and just 
normalize the scores slightly
+ * differently
+ */
+@Override
+public Map calculateScores()
+{
+// We're going to weight the latency for each host against the 
worst one we see, to
+// arrive at sort of a 'badness percentage' for them. First, find 
the worst for each:
+HashMap newScores = new 
HashMap<>(samples.size());
+Optional maxObservedAvgLatency = samples.values().stream()
--- End diff --

I went ahead and removed the EMA entirely, let's follow up on that in 
another change.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #283: CASSANDRA-14459: DynamicEndpointSnitch should n...

2018-11-30 Thread jolynch
Github user jolynch commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/283#discussion_r237956751
  
--- Diff: src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java 
---
@@ -154,31 +326,203 @@ private void registerMBean()
 
 public void close()
 {
-updateSchedular.cancel(false);
-resetSchedular.cancel(false);
+if (updateScoresScheduler != null)
+updateScoresScheduler.cancel(false);
+if (updateSamplesScheduler != null)
+updateSamplesScheduler.cancel(false);
+
+for (AnnotatedMeasurement measurement : samples.values())
+{
+if (measurement.probeFuture != null)
+measurement.probeFuture.cancel(false);
+
+measurement.millisSinceLastMeasure.set(0);
+measurement.millisSinceLastRequest.set(MAX_PROBE_INTERVAL_MS);
+measurement.probeTimerMillis = 0;
+}
 
 MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
 try
 {
-mbs.unregisterMBean(new ObjectName(mbeanName));
+if (mbeanRegistered)
+mbs.unregisterMBean(new ObjectName(mbeanName));
 }
 catch (Exception e)
 {
 throw new RuntimeException(e);
 }
 }
 
+/**
+ * Background task running on the samples dictionary. The default 
implementation sends latency probes (PING)
+ * messages to explore nodes that we have not received timings for 
recently but have ranked in
+ * {@link DynamicEndpointSnitch#sortedByProximity(InetAddressAndPort, 
ReplicaCollection)}.
+ */
+protected void updateSamples()
+{
+// Split calculation of probe timers from sending probes for 
testability
+calculateProbes(samples, dynamicLatencyProbeInterval);
+
+if (!StorageService.instance.isGossipActive())
+return;
+
+schedulePings(samples);
+}
+
+/**
+ * This method mutates the passed AnnotatedMeasurements to implement 
capped exponential backoff per endpoint.
+ *
+ * The algorithm is as follows:
+ * 1. All samples get their millisSinceLastMeasure and 
millisSinceLastRequest fields
+ *incremented by the passed interval
+ * 2. Any recently requested (ranked) endpoints that have not been 
measured recently (e.g. because the snitch
+ *has sent them no traffic) get probes with exponential backoff.
+ *
+ * The backoff is capped at MAX_PROBE_INTERVAL_MS. Furthermore the 
probes are stopped after
+ * MAX_PROBE_INTERVAL_MS of no ranking requests as well.
+ *
+ * At the end of this method, any passed AnnotatedMeasurements that 
need latency probes will have non zero
+ * probeTimerMillis members set.
+ */
+@VisibleForTesting
+static void calculateProbes(Map samples, long intervalMillis) {
+for (Map.Entry entry: 
samples.entrySet())
+{
+if 
(entry.getKey().equals(FBUtilities.getBroadcastAddressAndPort()))
+continue;
+
+AnnotatedMeasurement measurement = entry.getValue();
+long lastMeasure = 
measurement.millisSinceLastMeasure.getAndAdd(intervalMillis);
+long lastRequest = 
measurement.millisSinceLastRequest.getAndAdd(intervalMillis);
+
+if (lastMeasure >= MIN_PROBE_INTERVAL_MS && lastRequest < 
MAX_PROBE_INTERVAL_MS)
+{
+if (measurement.probeTimerMillis == 0)
+{
+measurement.probeTimerMillis = intervalMillis;
+}
+else if (measurement.probeFuture != null && 
measurement.probeFuture.isDone())
+{
+measurement.probeTimerMillis = 
Math.min(MAX_PROBE_INTERVAL_MS, measurement.probeTimerMillis * 2);
+}
+}
+else
+{
+measurement.probeTimerMillis = 0;
+}
+}
+}
+
+@VisibleForTesting
+void schedulePings(Map 
samples)
+{
+for (Map.Entry entry: 
samples.entrySet())
+{
+AnnotatedMeasurement measurement = entry.getValue();
+long delay = measurement.probeTimerMillis;
+long millisSinceLastRequest = 
measurement.millisSinceLastRequest.get();
+
+if (millisSinceLastRequest > MAX_PROBE_INTERVAL_MS && 
!Gossiper.instance.isAlive(entry.getKey()))
+{
+samples.remove(entry.getKey());
+}
+
+if (delay > 0 && millisSinc

[GitHub] cassandra pull request #294: Fixing logging of netty client related ioexcept...

2018-11-29 Thread sumanth-pasupuleti
Github user sumanth-pasupuleti commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/294#discussion_r237562392
  
--- Diff: src/java/org/apache/cassandra/transport/Message.java ---
@@ -710,7 +710,7 @@ public boolean apply(Throwable exception)
 boolean isIOException = exception instanceof IOException || 
(exception.getCause() instanceof IOException);
 if (!alwaysLogAtError && isIOException)
 {
-if 
(ioExceptionsAtDebugLevel.contains(exception.getMessage()))
+if 
(ioExceptionsAtDebugLevel.stream().anyMatch(ioExceptionAtDebugLevel -> 
exception.getMessage().contains(ioExceptionAtDebugLevel)))
--- End diff --

Thank you, I was under the delusion, streaming API is better than for loop. 
 Fixed.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #294: Fixing logging of netty client related ioexcept...

2018-11-29 Thread jasobrown
Github user jasobrown commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/294#discussion_r237509397
  
--- Diff: src/java/org/apache/cassandra/transport/Message.java ---
@@ -710,7 +710,7 @@ public boolean apply(Throwable exception)
 boolean isIOException = exception instanceof IOException || 
(exception.getCause() instanceof IOException);
 if (!alwaysLogAtError && isIOException)
 {
-if 
(ioExceptionsAtDebugLevel.contains(exception.getMessage()))
+if 
(ioExceptionsAtDebugLevel.stream().anyMatch(ioExceptionAtDebugLevel -> 
exception.getMessage().contains(ioExceptionAtDebugLevel)))
--- End diff --

I'd prefer to avoid the streaming API as it creates a bunch of excess 
garbage. How about something like this instead:

```java
// exceptions thrown from the netty epoll transport add the 
name of the function that failed
// to the exception string (which is simply wrapping a JDK 
exception), so we can't do a simple/naive comparison
String errorMessage = exception.getMessage();
boolean logAtTrace = false;

for (String s : ioExceptionsAtDebugLevel)
{
if (errorMessage.contains(s))
{
logAtTrace = true;
break;
}
}

if (logAtTrace)
{
// Likely unclean client disconnects
logger.trace(message, exception);
}
else
{
// Generally unhandled IO exceptions are network 
issues, not actual ERRORS
logger.info(message, exception);
}
```


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #283: CASSANDRA-14459: DynamicEndpointSnitch should n...

2018-11-27 Thread jolynch
Github user jolynch commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/283#discussion_r236776225
  
--- Diff: src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java 
---
@@ -154,31 +326,203 @@ private void registerMBean()
 
 public void close()
 {
-updateSchedular.cancel(false);
-resetSchedular.cancel(false);
+if (updateScoresScheduler != null)
+updateScoresScheduler.cancel(false);
+if (updateSamplesScheduler != null)
+updateSamplesScheduler.cancel(false);
+
+for (AnnotatedMeasurement measurement : samples.values())
+{
+if (measurement.probeFuture != null)
+measurement.probeFuture.cancel(false);
+
+measurement.millisSinceLastMeasure.set(0);
+measurement.millisSinceLastRequest.set(MAX_PROBE_INTERVAL_MS);
+measurement.probeTimerMillis = 0;
+}
 
 MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
 try
 {
-mbs.unregisterMBean(new ObjectName(mbeanName));
+if (mbeanRegistered)
+mbs.unregisterMBean(new ObjectName(mbeanName));
 }
 catch (Exception e)
 {
 throw new RuntimeException(e);
 }
 }
 
+/**
+ * Background task running on the samples dictionary. The default 
implementation sends latency probes (PING)
+ * messages to explore nodes that we have not received timings for 
recently but have ranked in
+ * {@link DynamicEndpointSnitch#sortedByProximity(InetAddressAndPort, 
ReplicaCollection)}.
+ */
+protected void updateSamples()
+{
+// Split calculation of probe timers from sending probes for 
testability
+calculateProbes(samples, dynamicLatencyProbeInterval);
+
+if (!StorageService.instance.isGossipActive())
+return;
+
+schedulePings(samples);
+}
+
+/**
+ * This method mutates the passed AnnotatedMeasurements to implement 
capped exponential backoff per endpoint.
+ *
+ * The algorithm is as follows:
+ * 1. All samples get their millisSinceLastMeasure and 
millisSinceLastRequest fields
+ *incremented by the passed interval
+ * 2. Any recently requested (ranked) endpoints that have not been 
measured recently (e.g. because the snitch
+ *has sent them no traffic) get probes with exponential backoff.
+ *
+ * The backoff is capped at MAX_PROBE_INTERVAL_MS. Furthermore the 
probes are stopped after
+ * MAX_PROBE_INTERVAL_MS of no ranking requests as well.
+ *
+ * At the end of this method, any passed AnnotatedMeasurements that 
need latency probes will have non zero
+ * probeTimerMillis members set.
+ */
+@VisibleForTesting
+static void calculateProbes(Map samples, long intervalMillis) {
+for (Map.Entry entry: 
samples.entrySet())
+{
+if 
(entry.getKey().equals(FBUtilities.getBroadcastAddressAndPort()))
+continue;
+
+AnnotatedMeasurement measurement = entry.getValue();
+long lastMeasure = 
measurement.millisSinceLastMeasure.getAndAdd(intervalMillis);
+long lastRequest = 
measurement.millisSinceLastRequest.getAndAdd(intervalMillis);
+
+if (lastMeasure >= MIN_PROBE_INTERVAL_MS && lastRequest < 
MAX_PROBE_INTERVAL_MS)
+{
+if (measurement.probeTimerMillis == 0)
+{
+measurement.probeTimerMillis = intervalMillis;
+}
+else if (measurement.probeFuture != null && 
measurement.probeFuture.isDone())
+{
+measurement.probeTimerMillis = 
Math.min(MAX_PROBE_INTERVAL_MS, measurement.probeTimerMillis * 2);
--- End diff --

Ack, +1


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #283: CASSANDRA-14459: DynamicEndpointSnitch should n...

2018-11-27 Thread jolynch
Github user jolynch commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/283#discussion_r236775944
  
--- Diff: src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java 
---
@@ -23,128 +23,300 @@
 import java.net.UnknownHostException;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 
-import com.codahale.metrics.ExponentiallyDecayingReservoir;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
-import com.codahale.metrics.Snapshot;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.RateLimiter;
+import com.google.common.util.concurrent.SettableFuture;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import 
org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
 import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.exceptions.RequestFailureReason;
 import org.apache.cassandra.gms.ApplicationState;
 import org.apache.cassandra.gms.EndpointState;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.gms.VersionedValue;
+import 
org.apache.cassandra.locator.dynamicsnitch.DynamicEndpointSnitchHistogram;
+import org.apache.cassandra.net.IAsyncCallbackWithFailure;
+import org.apache.cassandra.net.LatencyMeasurementType;
+import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.PingMessage;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 
+import static org.apache.cassandra.net.MessagingService.Verb.PING;
+import static 
org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType.LARGE_MESSAGE;
+import static 
org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType.SMALL_MESSAGE;
+
+
 /**
  * A dynamic snitch that sorts endpoints by latency with an adapted phi 
failure detector
+ * Note that the subclasses (e.g. {@link DynamicEndpointSnitchHistogram}) 
are responsible for actually measuring
+ * latency and providing an ISnitchMeasurement implementation back to this 
class.
  */
-public class DynamicEndpointSnitch extends AbstractEndpointSnitch 
implements ILatencySubscriber, DynamicEndpointSnitchMBean
+public abstract class DynamicEndpointSnitch extends AbstractEndpointSnitch 
implements ILatencySubscriber, DynamicEndpointSnitchMBean
 {
-private static final boolean USE_SEVERITY = 
!Boolean.getBoolean("cassandra.ignore_dynamic_snitch_severity");
-
-private static final double ALPHA = 0.75; // set to 0.75 to make EDS 
more biased to towards the newer values
-private static final int WINDOW_SIZE = 100;
-
-private volatile int dynamicUpdateInterval = 
DatabaseDescriptor.getDynamicUpdateInterval();
-private volatile int dynamicResetInterval = 
DatabaseDescriptor.getDynamicResetInterval();
-private volatile double dynamicBadnessThreshold = 
DatabaseDescriptor.getDynamicBadnessThreshold();
+private static final Logger logger = 
LoggerFactory.getLogger(DynamicEndpointSnitch.class);
+
+// Latency measurement and ranking. The samples contain latency 
measurements and the scores are used for ranking
+protected boolean registered = false;
+protected static final boolean USE_SEVERITY = 
!Boolean.getBoolean("cassandra.ignore_dynamic_snitch_severity");
+protected volatile Map scores = new 
HashMap<>();
+protected final Map samples 
= new ConcurrentHashMap<>();
+
+// Latency probe functionality for actively probing endpoints that we 
haven't measured recently but are ranking
+public static final long MAX_PROBE_INTERVAL_MS = 
Long.getLong("cassandra.dynamic_snitch_max_probe_interval_ms", 60 * 10 * 1000L);
+public static final long MIN_PROBE_INTERVAL_MS = 
Long.getLong("cassandra.dynamic_snitch_min_probe_interval_ms", 60 * 1000L) ;
+// The probe rate is set later when configuration is read
+protected static final RateLimiter probeRateLimiter = 
RateLimiter.create(1);
+protected static final DebuggableScheduledThreadPoolExecutor 
latencyProbeExecutor = new DebuggableScheduledThreadPoolExecutor(1, 
"LatencyProbes", Thread.MIN_PRIORITY);
--- End diff --

   

[GitHub] cassandra pull request #283: CASSANDRA-14459: DynamicEndpointSnitch should n...

2018-11-27 Thread jolynch
Github user jolynch commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/283#discussion_r236765727
  
--- Diff: 
test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java ---
@@ -19,34 +19,104 @@
 package org.apache.cassandra.locator;
 
 import java.io.IOException;
+import java.net.UnknownHostException;
 import java.util.*;
 
-import org.junit.BeforeClass;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import org.apache.cassandra.Util;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.locator.dynamicsnitch.DynamicEndpointSnitchEMA;
+import 
org.apache.cassandra.locator.dynamicsnitch.DynamicEndpointSnitchHistogram;
+import 
org.apache.cassandra.locator.dynamicsnitch.DynamicEndpointSnitchLegacyHistogram;
+import org.apache.cassandra.net.LatencyMeasurementType;
+import org.apache.cassandra.net.async.TestScheduledFuture;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(Parameterized.class)
 public class DynamicEndpointSnitchTest
 {
+private static InetAddressAndPort[] hosts;
+// Reduce the update interval significantly so that tests run quickly
+private static final long UPDATE_INTERVAL_MS = 10;
+// Intentially 31 and a little bit instead of 30 seconds flat so this 
doesn't divide evenly into the default
+// MAX_PROBE_INTERVAL_MS. Also pretty high so latency probes don't 
interfere with the unit tests
+private static final long PING_INTERVAL_MS = 31 * 1003;
+
+private final DynamicEndpointSnitch dsnitch;
+
+public DynamicEndpointSnitchTest(DynamicEndpointSnitch dsnitch)
+{
+this.dsnitch = dsnitch;
+}
+
+@Before
+public void prepareDES()
+{
+for (InetAddressAndPort host : hosts)
+{
+Gossiper.instance.initializeNodeUnsafe(host, 
UUID.randomUUID(), 1);
+Gossiper.instance.realMarkAlive(host, 
Gossiper.instance.getEndpointStateForEndpoint(host));
+}
+dsnitch.reset();
+}
 
-@BeforeClass
-public static void setupDD()
+@Parameterized.Parameters(name="{index}: {0}")
+public static Iterable getDESImplementation() throws 
UnknownHostException
 {
 DatabaseDescriptor.daemonInitialization();
+// do this because SS needs to be initialized before DES can work 
properly.
+StorageService.instance.unsafeInitialize();
+
+hosts = new InetAddressAndPort[] {
+FBUtilities.getBroadcastAddressAndPort(),
+InetAddressAndPort.getByName("127.0.0.2"),
+InetAddressAndPort.getByName("127.0.0.3"),
+InetAddressAndPort.getByName("127.0.0.4"),
+InetAddressAndPort.getByName("127.0.0.5"),
+};
+
+SimpleSnitch ss1 = new SimpleSnitch();
+DynamicEndpointSnitch probeDES = new 
DynamicEndpointSnitchHistogram(ss1, String.valueOf(ss1.hashCode()));
+probeDES.applyConfigChanges((int) UPDATE_INTERVAL_MS, (int) 
PING_INTERVAL_MS, DatabaseDescriptor.getDynamicBadnessThreshold());
+
+SimpleSnitch ss2 = new SimpleSnitch();
+DynamicEndpointSnitch oldDES = new 
DynamicEndpointSnitchLegacyHistogram(ss2, String.valueOf(ss2.hashCode()));
+oldDES.applyConfigChanges((int) UPDATE_INTERVAL_MS, (int) 
PING_INTERVAL_MS, DatabaseDescriptor.getDynamicBadnessThreshold());
+
+SimpleSnitch ss3 = new SimpleSnitch();
+DynamicEndpointSnitch emaDES = new DynamicEndpointSnitchEMA(ss3, 
String.valueOf(ss3.hashCode()));
+emaDES.applyConfigChanges((int) UPDATE_INTERVAL_MS, (int) 
PING_INTERVAL_MS, DatabaseDescriptor.getDynamicBadnessThreshold());
+
+return Arrays.asList(probeDES, oldDES, emaDES);
+}
+
+@After
+public void resetDES()
+{
+dsnitch.reset();
 }
 
 private static void setScores(DynamicEndpointSnitch dsnitch, int 
rounds, List hosts, Integer... scores) throws 
InterruptedException
 {
 for (int round = 0; round < rounds; round++)
 {
 for (int i = 0; i < hosts.size(); i++)
-dsnitch.receiveTiming(hosts.get(i), scores[i]);
+dsnitch.receiveTiming(hosts.get(i), scores[i], 
LatencyM

[GitHub] cassandra pull request #283: CASSANDRA-14459: DynamicEndpointSnitch should n...

2018-11-27 Thread jolynch
Github user jolynch commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/283#discussion_r236764154
  
--- Diff: 
test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java ---
@@ -60,57 +130,229 @@ private static EndpointsForRange 
full(InetAddressAndPort... endpoints)
 }
 
 @Test
-public void testSnitch() throws InterruptedException, IOException, 
ConfigurationException
+public void testSortedByProximity() throws InterruptedException, 
IOException, ConfigurationException
 {
-// do this because SS needs to be initialized before DES can work 
properly.
-StorageService.instance.unsafeInitialize();
-SimpleSnitch ss = new SimpleSnitch();
-DynamicEndpointSnitch dsnitch = new DynamicEndpointSnitch(ss, 
String.valueOf(ss.hashCode()));
-InetAddressAndPort self = FBUtilities.getBroadcastAddressAndPort();
-InetAddressAndPort host1 = 
InetAddressAndPort.getByName("127.0.0.2");
-InetAddressAndPort host2 = 
InetAddressAndPort.getByName("127.0.0.3");
-InetAddressAndPort host3 = 
InetAddressAndPort.getByName("127.0.0.4");
-InetAddressAndPort host4 = 
InetAddressAndPort.getByName("127.0.0.5");
-List hosts = Arrays.asList(host1, host2, 
host3);
+InetAddressAndPort self = hosts[0];
+List allHosts = Arrays.asList(hosts[1], 
hosts[2], hosts[3]);
 
 // first, make all hosts equal
-setScores(dsnitch, 1, hosts, 10, 10, 10);
-EndpointsForRange order = full(host1, host2, host3);
-Util.assertRCEquals(order, dsnitch.sortedByProximity(self, 
full(host1, host2, host3)));
-
-// make host1 a little worse
-setScores(dsnitch, 1, hosts, 20, 10, 10);
-order = full(host2, host3, host1);
-Util.assertRCEquals(order, dsnitch.sortedByProximity(self, 
full(host1, host2, host3)));
-
-// make host2 as bad as host1
-setScores(dsnitch, 2, hosts, 15, 20, 10);
-order = full(host3, host1, host2);
-Util.assertRCEquals(order, dsnitch.sortedByProximity(self, 
full(host1, host2, host3)));
-
-// make host3 the worst
-setScores(dsnitch, 3, hosts, 10, 10, 30);
-order = full(host1, host2, host3);
-Util.assertRCEquals(order, dsnitch.sortedByProximity(self, 
full(host1, host2, host3)));
-
-// make host3 equal to the others
-setScores(dsnitch, 5, hosts, 10, 10, 10);
-order = full(host1, host2, host3);
-Util.assertRCEquals(order, dsnitch.sortedByProximity(self, 
full(host1, host2, host3)));
+setScores(dsnitch, 1, allHosts, 10, 10, 10);
+EndpointsForRange order = full(hosts[1], hosts[2], hosts[3]);
+Util.assertRCEquals(order, dsnitch.sortedByProximity(self, 
full(hosts[1], hosts[2], hosts[3])));
+
+// make hosts[1] a little worse
+setScores(dsnitch, 2, allHosts, 20, 10, 10);
+order = full(hosts[2], hosts[3], hosts[1]);
+Util.assertRCEquals(order, dsnitch.sortedByProximity(self, 
full(hosts[1], hosts[2], hosts[3])));
+
+// make hosts[2] as bad as hosts[1]
+setScores(dsnitch, 4, allHosts, 15, 20, 10);
+order = full(hosts[3], hosts[1], hosts[2]);
+Util.assertRCEquals(order, dsnitch.sortedByProximity(self, 
full(hosts[1], hosts[2], hosts[3])));
+
+// make hosts[3] the worst
+setScores(dsnitch, 10, allHosts, 10, 10, 30);
+order = full(hosts[1], hosts[2], hosts[3]);
+Util.assertRCEquals(order, dsnitch.sortedByProximity(self, 
full(hosts[1], hosts[2], hosts[3])));
+
+// make hosts[3] equal to the others
+setScores(dsnitch, 15, allHosts, 10, 10, 10);
+order = full(hosts[1], hosts[2], hosts[3]);
+Util.assertRCEquals(order, dsnitch.sortedByProximity(self, 
full(hosts[1], hosts[2], hosts[3])));
 
 /// Tests CASSANDRA-6683 improvements
 // make the scores differ enough from the ideal order that we sort 
by score; under the old
 // dynamic snitch behavior (where we only compared neighbors), 
these wouldn't get sorted
-setScores(dsnitch, 20, hosts, 10, 70, 20);
-order = full(host1, host3, host2);
-Util.assertRCEquals(order, dsnitch.sortedByProximity(self, 
full(host1, host2, host3)));
+dsnitch.reset();
+setScores(dsnitch, 20, allHosts, 10, 70, 20);
+order = full(hosts[1], hosts[3], hosts[2]);
+Util.assertRCEquals(order, dsnitch.sortedByProximity(self, 
full(hosts[1], hosts[2], hosts[3])));
+
+order = full(hosts[4], hosts[1], hosts[3], hosts[2]);
+Util.assertRCEquals(order, dsnitch.sortedByProximity(s

[GitHub] cassandra pull request #283: CASSANDRA-14459: DynamicEndpointSnitch should n...

2018-11-27 Thread jolynch
Github user jolynch commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/283#discussion_r236763554
  
--- Diff: 
test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java ---
@@ -60,57 +130,229 @@ private static EndpointsForRange 
full(InetAddressAndPort... endpoints)
 }
 
 @Test
-public void testSnitch() throws InterruptedException, IOException, 
ConfigurationException
+public void testSortedByProximity() throws InterruptedException, 
IOException, ConfigurationException
 {
-// do this because SS needs to be initialized before DES can work 
properly.
-StorageService.instance.unsafeInitialize();
-SimpleSnitch ss = new SimpleSnitch();
-DynamicEndpointSnitch dsnitch = new DynamicEndpointSnitch(ss, 
String.valueOf(ss.hashCode()));
-InetAddressAndPort self = FBUtilities.getBroadcastAddressAndPort();
-InetAddressAndPort host1 = 
InetAddressAndPort.getByName("127.0.0.2");
-InetAddressAndPort host2 = 
InetAddressAndPort.getByName("127.0.0.3");
-InetAddressAndPort host3 = 
InetAddressAndPort.getByName("127.0.0.4");
-InetAddressAndPort host4 = 
InetAddressAndPort.getByName("127.0.0.5");
-List hosts = Arrays.asList(host1, host2, 
host3);
+InetAddressAndPort self = hosts[0];
+List allHosts = Arrays.asList(hosts[1], 
hosts[2], hosts[3]);
 
 // first, make all hosts equal
-setScores(dsnitch, 1, hosts, 10, 10, 10);
-EndpointsForRange order = full(host1, host2, host3);
-Util.assertRCEquals(order, dsnitch.sortedByProximity(self, 
full(host1, host2, host3)));
-
-// make host1 a little worse
-setScores(dsnitch, 1, hosts, 20, 10, 10);
-order = full(host2, host3, host1);
-Util.assertRCEquals(order, dsnitch.sortedByProximity(self, 
full(host1, host2, host3)));
-
-// make host2 as bad as host1
-setScores(dsnitch, 2, hosts, 15, 20, 10);
-order = full(host3, host1, host2);
-Util.assertRCEquals(order, dsnitch.sortedByProximity(self, 
full(host1, host2, host3)));
-
-// make host3 the worst
-setScores(dsnitch, 3, hosts, 10, 10, 30);
-order = full(host1, host2, host3);
-Util.assertRCEquals(order, dsnitch.sortedByProximity(self, 
full(host1, host2, host3)));
-
-// make host3 equal to the others
-setScores(dsnitch, 5, hosts, 10, 10, 10);
-order = full(host1, host2, host3);
-Util.assertRCEquals(order, dsnitch.sortedByProximity(self, 
full(host1, host2, host3)));
+setScores(dsnitch, 1, allHosts, 10, 10, 10);
+EndpointsForRange order = full(hosts[1], hosts[2], hosts[3]);
+Util.assertRCEquals(order, dsnitch.sortedByProximity(self, 
full(hosts[1], hosts[2], hosts[3])));
+
+// make hosts[1] a little worse
+setScores(dsnitch, 2, allHosts, 20, 10, 10);
+order = full(hosts[2], hosts[3], hosts[1]);
+Util.assertRCEquals(order, dsnitch.sortedByProximity(self, 
full(hosts[1], hosts[2], hosts[3])));
+
+// make hosts[2] as bad as hosts[1]
+setScores(dsnitch, 4, allHosts, 15, 20, 10);
+order = full(hosts[3], hosts[1], hosts[2]);
+Util.assertRCEquals(order, dsnitch.sortedByProximity(self, 
full(hosts[1], hosts[2], hosts[3])));
+
+// make hosts[3] the worst
+setScores(dsnitch, 10, allHosts, 10, 10, 30);
+order = full(hosts[1], hosts[2], hosts[3]);
+Util.assertRCEquals(order, dsnitch.sortedByProximity(self, 
full(hosts[1], hosts[2], hosts[3])));
+
+// make hosts[3] equal to the others
+setScores(dsnitch, 15, allHosts, 10, 10, 10);
+order = full(hosts[1], hosts[2], hosts[3]);
+Util.assertRCEquals(order, dsnitch.sortedByProximity(self, 
full(hosts[1], hosts[2], hosts[3])));
 
 /// Tests CASSANDRA-6683 improvements
 // make the scores differ enough from the ideal order that we sort 
by score; under the old
 // dynamic snitch behavior (where we only compared neighbors), 
these wouldn't get sorted
-setScores(dsnitch, 20, hosts, 10, 70, 20);
-order = full(host1, host3, host2);
-Util.assertRCEquals(order, dsnitch.sortedByProximity(self, 
full(host1, host2, host3)));
+dsnitch.reset();
+setScores(dsnitch, 20, allHosts, 10, 70, 20);
+order = full(hosts[1], hosts[3], hosts[2]);
+Util.assertRCEquals(order, dsnitch.sortedByProximity(self, 
full(hosts[1], hosts[2], hosts[3])));
+
+order = full(hosts[4], hosts[1], hosts[3], hosts[2]);
+Util.assertRCEquals(order, dsnitch.sortedByProximity(s

[GitHub] cassandra pull request #283: CASSANDRA-14459: DynamicEndpointSnitch should n...

2018-11-27 Thread jolynch
Github user jolynch commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/283#discussion_r236761954
  
--- Diff: 
test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java ---
@@ -60,57 +130,229 @@ private static EndpointsForRange 
full(InetAddressAndPort... endpoints)
 }
 
 @Test
-public void testSnitch() throws InterruptedException, IOException, 
ConfigurationException
+public void testSortedByProximity() throws InterruptedException, 
IOException, ConfigurationException
 {
-// do this because SS needs to be initialized before DES can work 
properly.
-StorageService.instance.unsafeInitialize();
-SimpleSnitch ss = new SimpleSnitch();
-DynamicEndpointSnitch dsnitch = new DynamicEndpointSnitch(ss, 
String.valueOf(ss.hashCode()));
-InetAddressAndPort self = FBUtilities.getBroadcastAddressAndPort();
-InetAddressAndPort host1 = 
InetAddressAndPort.getByName("127.0.0.2");
-InetAddressAndPort host2 = 
InetAddressAndPort.getByName("127.0.0.3");
-InetAddressAndPort host3 = 
InetAddressAndPort.getByName("127.0.0.4");
-InetAddressAndPort host4 = 
InetAddressAndPort.getByName("127.0.0.5");
-List hosts = Arrays.asList(host1, host2, 
host3);
+InetAddressAndPort self = hosts[0];
+List allHosts = Arrays.asList(hosts[1], 
hosts[2], hosts[3]);
 
 // first, make all hosts equal
-setScores(dsnitch, 1, hosts, 10, 10, 10);
-EndpointsForRange order = full(host1, host2, host3);
-Util.assertRCEquals(order, dsnitch.sortedByProximity(self, 
full(host1, host2, host3)));
-
-// make host1 a little worse
-setScores(dsnitch, 1, hosts, 20, 10, 10);
-order = full(host2, host3, host1);
-Util.assertRCEquals(order, dsnitch.sortedByProximity(self, 
full(host1, host2, host3)));
-
-// make host2 as bad as host1
-setScores(dsnitch, 2, hosts, 15, 20, 10);
-order = full(host3, host1, host2);
-Util.assertRCEquals(order, dsnitch.sortedByProximity(self, 
full(host1, host2, host3)));
-
-// make host3 the worst
-setScores(dsnitch, 3, hosts, 10, 10, 30);
-order = full(host1, host2, host3);
-Util.assertRCEquals(order, dsnitch.sortedByProximity(self, 
full(host1, host2, host3)));
-
-// make host3 equal to the others
-setScores(dsnitch, 5, hosts, 10, 10, 10);
-order = full(host1, host2, host3);
-Util.assertRCEquals(order, dsnitch.sortedByProximity(self, 
full(host1, host2, host3)));
+setScores(dsnitch, 1, allHosts, 10, 10, 10);
+EndpointsForRange order = full(hosts[1], hosts[2], hosts[3]);
+Util.assertRCEquals(order, dsnitch.sortedByProximity(self, 
full(hosts[1], hosts[2], hosts[3])));
+
+// make hosts[1] a little worse
+setScores(dsnitch, 2, allHosts, 20, 10, 10);
+order = full(hosts[2], hosts[3], hosts[1]);
+Util.assertRCEquals(order, dsnitch.sortedByProximity(self, 
full(hosts[1], hosts[2], hosts[3])));
+
+// make hosts[2] as bad as hosts[1]
+setScores(dsnitch, 4, allHosts, 15, 20, 10);
+order = full(hosts[3], hosts[1], hosts[2]);
+Util.assertRCEquals(order, dsnitch.sortedByProximity(self, 
full(hosts[1], hosts[2], hosts[3])));
+
+// make hosts[3] the worst
+setScores(dsnitch, 10, allHosts, 10, 10, 30);
+order = full(hosts[1], hosts[2], hosts[3]);
+Util.assertRCEquals(order, dsnitch.sortedByProximity(self, 
full(hosts[1], hosts[2], hosts[3])));
+
+// make hosts[3] equal to the others
+setScores(dsnitch, 15, allHosts, 10, 10, 10);
+order = full(hosts[1], hosts[2], hosts[3]);
+Util.assertRCEquals(order, dsnitch.sortedByProximity(self, 
full(hosts[1], hosts[2], hosts[3])));
 
 /// Tests CASSANDRA-6683 improvements
 // make the scores differ enough from the ideal order that we sort 
by score; under the old
 // dynamic snitch behavior (where we only compared neighbors), 
these wouldn't get sorted
-setScores(dsnitch, 20, hosts, 10, 70, 20);
-order = full(host1, host3, host2);
-Util.assertRCEquals(order, dsnitch.sortedByProximity(self, 
full(host1, host2, host3)));
+dsnitch.reset();
+setScores(dsnitch, 20, allHosts, 10, 70, 20);
+order = full(hosts[1], hosts[3], hosts[2]);
+Util.assertRCEquals(order, dsnitch.sortedByProximity(self, 
full(hosts[1], hosts[2], hosts[3])));
+
+order = full(hosts[4], hosts[1], hosts[3], hosts[2]);
+Util.assertRCEquals(order, dsnitch.sortedByProximity(s

[GitHub] cassandra pull request #283: CASSANDRA-14459: DynamicEndpointSnitch should n...

2018-11-27 Thread jolynch
Github user jolynch commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/283#discussion_r236752030
  
--- Diff: 
test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java ---
@@ -60,57 +130,229 @@ private static EndpointsForRange 
full(InetAddressAndPort... endpoints)
 }
 
 @Test
-public void testSnitch() throws InterruptedException, IOException, 
ConfigurationException
+public void testSortedByProximity() throws InterruptedException, 
IOException, ConfigurationException
 {
-// do this because SS needs to be initialized before DES can work 
properly.
-StorageService.instance.unsafeInitialize();
-SimpleSnitch ss = new SimpleSnitch();
-DynamicEndpointSnitch dsnitch = new DynamicEndpointSnitch(ss, 
String.valueOf(ss.hashCode()));
-InetAddressAndPort self = FBUtilities.getBroadcastAddressAndPort();
-InetAddressAndPort host1 = 
InetAddressAndPort.getByName("127.0.0.2");
-InetAddressAndPort host2 = 
InetAddressAndPort.getByName("127.0.0.3");
-InetAddressAndPort host3 = 
InetAddressAndPort.getByName("127.0.0.4");
-InetAddressAndPort host4 = 
InetAddressAndPort.getByName("127.0.0.5");
-List hosts = Arrays.asList(host1, host2, 
host3);
+InetAddressAndPort self = hosts[0];
+List allHosts = Arrays.asList(hosts[1], 
hosts[2], hosts[3]);
 
 // first, make all hosts equal
-setScores(dsnitch, 1, hosts, 10, 10, 10);
-EndpointsForRange order = full(host1, host2, host3);
-Util.assertRCEquals(order, dsnitch.sortedByProximity(self, 
full(host1, host2, host3)));
-
-// make host1 a little worse
-setScores(dsnitch, 1, hosts, 20, 10, 10);
-order = full(host2, host3, host1);
-Util.assertRCEquals(order, dsnitch.sortedByProximity(self, 
full(host1, host2, host3)));
-
-// make host2 as bad as host1
-setScores(dsnitch, 2, hosts, 15, 20, 10);
-order = full(host3, host1, host2);
-Util.assertRCEquals(order, dsnitch.sortedByProximity(self, 
full(host1, host2, host3)));
-
-// make host3 the worst
-setScores(dsnitch, 3, hosts, 10, 10, 30);
-order = full(host1, host2, host3);
-Util.assertRCEquals(order, dsnitch.sortedByProximity(self, 
full(host1, host2, host3)));
-
-// make host3 equal to the others
-setScores(dsnitch, 5, hosts, 10, 10, 10);
-order = full(host1, host2, host3);
-Util.assertRCEquals(order, dsnitch.sortedByProximity(self, 
full(host1, host2, host3)));
+setScores(dsnitch, 1, allHosts, 10, 10, 10);
+EndpointsForRange order = full(hosts[1], hosts[2], hosts[3]);
+Util.assertRCEquals(order, dsnitch.sortedByProximity(self, 
full(hosts[1], hosts[2], hosts[3])));
+
+// make hosts[1] a little worse
+setScores(dsnitch, 2, allHosts, 20, 10, 10);
+order = full(hosts[2], hosts[3], hosts[1]);
+Util.assertRCEquals(order, dsnitch.sortedByProximity(self, 
full(hosts[1], hosts[2], hosts[3])));
+
+// make hosts[2] as bad as hosts[1]
+setScores(dsnitch, 4, allHosts, 15, 20, 10);
+order = full(hosts[3], hosts[1], hosts[2]);
+Util.assertRCEquals(order, dsnitch.sortedByProximity(self, 
full(hosts[1], hosts[2], hosts[3])));
+
+// make hosts[3] the worst
+setScores(dsnitch, 10, allHosts, 10, 10, 30);
+order = full(hosts[1], hosts[2], hosts[3]);
+Util.assertRCEquals(order, dsnitch.sortedByProximity(self, 
full(hosts[1], hosts[2], hosts[3])));
+
+// make hosts[3] equal to the others
+setScores(dsnitch, 15, allHosts, 10, 10, 10);
+order = full(hosts[1], hosts[2], hosts[3]);
+Util.assertRCEquals(order, dsnitch.sortedByProximity(self, 
full(hosts[1], hosts[2], hosts[3])));
 
 /// Tests CASSANDRA-6683 improvements
 // make the scores differ enough from the ideal order that we sort 
by score; under the old
 // dynamic snitch behavior (where we only compared neighbors), 
these wouldn't get sorted
-setScores(dsnitch, 20, hosts, 10, 70, 20);
-order = full(host1, host3, host2);
-Util.assertRCEquals(order, dsnitch.sortedByProximity(self, 
full(host1, host2, host3)));
+dsnitch.reset();
+setScores(dsnitch, 20, allHosts, 10, 70, 20);
+order = full(hosts[1], hosts[3], hosts[2]);
+Util.assertRCEquals(order, dsnitch.sortedByProximity(self, 
full(hosts[1], hosts[2], hosts[3])));
+
+order = full(hosts[4], hosts[1], hosts[3], hosts[2]);
+Util.assertRCEquals(order, dsnitch.sortedByProximity(s

[GitHub] cassandra pull request #288: In BufferPool, make allocating thread receive a...

2018-11-26 Thread jonmeredith
Github user jonmeredith commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/288#discussion_r236362932
  
--- Diff: src/java/org/apache/cassandra/utils/memory/BufferPool.java ---
@@ -237,23 +237,25 @@ void check()
 /** Return a chunk, the caller will take owership of the parent 
chunk. */
 public Chunk get()
 {
-while (true)
-{
-Chunk chunk = chunks.poll();
-if (chunk != null)
-return chunk;
+Chunk chunk = chunks.poll();
+if (chunk != null)
+return chunk;
 
-if (!allocateMoreChunks())
-// give it one last attempt, in case someone else 
allocated before us
-return chunks.poll();
-}
+chunk = allocateMoreChunks();
+if (chunk != null)
+return chunk;
+
+/*  another thread may have just allocated last macro chunk, so
+**  make one final attempt before returning null
--- End diff --

thx for the pointers, updated to a single line.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #294: Fixing logging of netty client related ioexcept...

2018-11-22 Thread sumanth-pasupuleti
GitHub user sumanth-pasupuleti opened a pull request:

https://github.com/apache/cassandra/pull/294

Fixing logging of netty client related ioexceptions to be at trace level



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sumanth-pasupuleti/cassandra 
debuglevelioexceptions

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/cassandra/pull/294.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #294


commit 3cd3cbe7233123f0c0461b11124a7f45c7df449e
Author: sumanthpasupuleti 
Date:   2018-11-21T17:11:06Z

Fixing logging of netty client related ioexceptions to be at trace level




---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #293: 14855 - 3.0 backport immediate flusher

2018-11-22 Thread belliottsmith
Github user belliottsmith commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/293#discussion_r235729649
  
--- Diff: conf/cassandra.yaml ---
@@ -1003,3 +1003,9 @@ windows_timer_interval: 1
 # An interval of 0 disables any wait time, which is the behavior of former 
Cassandra versions.
 #
 # otc_backlog_expiration_interval_ms: 200
+
+# Define use of immediate flusher for replies to TCP connections. This is 
an alternate simplified flusher that does not
+# depend on any event loop scheduling. Details around why this has been 
backported from trunk: CASSANDRA-14855.
+# Default is false.
+# native_transport_use_immediate_flusher: false
--- End diff --

perhaps native_transport_flush_messages_immediately ?


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #293: 14855 - 3.0 backport immediate flusher

2018-11-20 Thread sumanth-pasupuleti
Github user sumanth-pasupuleti commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/293#discussion_r235090149
  
--- Diff: conf/cassandra.yaml ---
@@ -1003,3 +1003,9 @@ windows_timer_interval: 1
 # An interval of 0 disables any wait time, which is the behavior of former 
Cassandra versions.
 #
 # otc_backlog_expiration_interval_ms: 200
+
+# Define use of immediate flusher for replies to TCP connections. This is 
an alternate simplified flusher that does not
+# depend on any event loop scheduling. Details around why this has been 
backported from trunk: CASSANDRA-14855.
+# Default is false.
+# native_transport_use_immediate_flusher: false
--- End diff --

other more descriptive name I had in mind was 
"native_transport_use_immediate_flusher_in_place_of_batches_flusher", but felt 
it could be too long a name


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #293: 14855 - 3.0 backport immediate flusher

2018-11-20 Thread sumanth-pasupuleti
GitHub user sumanth-pasupuleti opened a pull request:

https://github.com/apache/cassandra/pull/293

14855 - 3.0 backport immediate flusher



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sumanth-pasupuleti/cassandra 
3.0_backport_immediate_flusher

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/cassandra/pull/293.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #293


commit 713825b33c85099484991219e43e863a804ce96b
Author: Michael Burman 
Date:   2018-05-08T12:40:54Z

Backporting ImmediateFlusher from trunk

commit a6d4e517716a08b72166f226751c7e97d392750a
Author: sumanthpasupuleti 
Date:   2018-11-20T16:17:01Z

Making immediate flusher OFF by default




---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #292: provide ZSTD compression support to cassandra (...

2018-11-19 Thread sushmaad
GitHub user sushmaad opened a pull request:

https://github.com/apache/cassandra/pull/292

provide ZSTD compression support to cassandra (CASSANDRA-14482)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sushmaad/cassandra 14482-4.0

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/cassandra/pull/292.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #292


commit fcdf9e66c6d688eff23c5147b5ea94584d871d6d
Author: Sushma Devendrappa 
Date:   2018-11-19T23:43:50Z

provide ZSTD compression support to cassandra (CASSANDRA-14482)




---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #277: 12106 - blacklisting bad partitions for point r...

2018-11-17 Thread sumanth-pasupuleti
Github user sumanth-pasupuleti commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/277#discussion_r234413324
  
--- Diff: 
src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java ---
@@ -307,6 +320,55 @@ public static void setViewRemoved(String keyspaceName, 
String viewName)
 forceBlockingFlush(VIEW_BUILD_STATUS);
 }
 
+/**
+ * Reads blacklisted partitions from 
system_distributed.blacklisted_partitions table.
+ * Stops reading partitions upon exceeding the cache size limit by 
logging a warning.
+ * @return
+ */
+public static Set getBlacklistedPartitions()
+{
+String query = "SELECT keyspace_name, columnfamily_name, 
partition_key FROM %s.%s";
+UntypedResultSet results;
+try
+{
+results = QueryProcessor.execute(format(query, 
SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, BLACKLISTED_PARTITIONS),
--- End diff --

execute() call can be replaced with executeInternalWithPaging(), but with 
paging or otherwise, we will still end up holding entire result set in memory 
through "results" variable here, isn't it? You think it could still help using 
paging?


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #277: 12106 - blacklisting bad partitions for point r...

2018-11-17 Thread sumanth-pasupuleti
Github user sumanth-pasupuleti commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/277#discussion_r234413081
  
--- Diff: src/java/org/apache/cassandra/service/CassandraDaemon.java ---
@@ -426,6 +426,9 @@ public void uncaughtException(Thread t, Throwable e)
 // Native transport
 nativeTransportService = new NativeTransportService();
 
+// load blacklisted partitions into cache
+StorageService.instance.refreshBlacklistedPartitionsCache();
--- End diff --

yeah, I think the first node of the new version would create the table. 
Either way, it wouldn't be a catastrophic failure for server start since 
possible failure during reading blacklisted partitions is handled.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #291: Fix typo

2018-11-16 Thread kaito0228
GitHub user kaito0228 opened a pull request:

https://github.com/apache/cassandra/pull/291

Fix typo

**I fixed two typos.**

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kaito0228/cassandra FixTypo

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/cassandra/pull/291.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #291


commit 15557ca5566523b7b917dad5b481a0f56da52f12
Author: ryo.s 
Date:   2018-11-16T15:50:08Z

Fix typo




---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #212: Rework the wait for healthy logic to count down...

2018-11-12 Thread jolynch
Github user jolynch closed the pull request at:

https://github.com/apache/cassandra/pull/212


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #212: Rework the wait for healthy logic to count down...

2018-11-12 Thread aweisberg
Github user aweisberg commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/212#discussion_r232731480
  
--- Diff: 
src/java/org/apache/cassandra/net/StartupClusterConnectivityChecker.java ---
@@ -48,81 +51,133 @@
 {
 private static final Logger logger = 
LoggerFactory.getLogger(StartupClusterConnectivityChecker.class);
 
-private final int targetPercent;
+private final boolean blockForRemoteDcs;
 private final long timeoutNanos;
 
-public static StartupClusterConnectivityChecker create(int 
targetPercent, int timeoutSecs)
+public static StartupClusterConnectivityChecker create(long 
timeoutSecs, boolean blockForRemoteDcs)
 {
-timeoutSecs = Math.max(1, timeoutSecs);
+if (timeoutSecs < 0)
+logger.warn("skipping block-for-peers due to negative timeout. 
You may encounter errors or timeouts on" +
--- End diff --

The reason I thought it was strange was because -1 was a valid config 
option.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #283: CASSANDRA-14459: DynamicEndpointSnitch should n...

2018-11-09 Thread jolynch
Github user jolynch commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/283#discussion_r232436329
  
--- Diff: src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java 
---
@@ -154,31 +326,203 @@ private void registerMBean()
 
 public void close()
 {
-updateSchedular.cancel(false);
-resetSchedular.cancel(false);
+if (updateScoresScheduler != null)
+updateScoresScheduler.cancel(false);
+if (updateSamplesScheduler != null)
+updateSamplesScheduler.cancel(false);
+
+for (AnnotatedMeasurement measurement : samples.values())
+{
+if (measurement.probeFuture != null)
+measurement.probeFuture.cancel(false);
+
+measurement.millisSinceLastMeasure.set(0);
+measurement.millisSinceLastRequest.set(MAX_PROBE_INTERVAL_MS);
+measurement.probeTimerMillis = 0;
+}
 
 MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
 try
 {
-mbs.unregisterMBean(new ObjectName(mbeanName));
+if (mbeanRegistered)
+mbs.unregisterMBean(new ObjectName(mbeanName));
 }
 catch (Exception e)
 {
 throw new RuntimeException(e);
 }
 }
 
+/**
+ * Background task running on the samples dictionary. The default 
implementation sends latency probes (PING)
+ * messages to explore nodes that we have not received timings for 
recently but have ranked in
+ * {@link DynamicEndpointSnitch#sortedByProximity(InetAddressAndPort, 
ReplicaCollection)}.
+ */
+protected void updateSamples()
+{
+// Split calculation of probe timers from sending probes for 
testability
+calculateProbes(samples, dynamicLatencyProbeInterval);
+
+if (!StorageService.instance.isGossipActive())
+return;
+
+schedulePings(samples);
--- End diff --

Sure, done.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #283: CASSANDRA-14459: DynamicEndpointSnitch should n...

2018-11-09 Thread jolynch
Github user jolynch commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/283#discussion_r232436157
  
--- Diff: src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java 
---
@@ -23,128 +23,300 @@
 import java.net.UnknownHostException;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 
-import com.codahale.metrics.ExponentiallyDecayingReservoir;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
-import com.codahale.metrics.Snapshot;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.RateLimiter;
+import com.google.common.util.concurrent.SettableFuture;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import 
org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
 import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.exceptions.RequestFailureReason;
 import org.apache.cassandra.gms.ApplicationState;
 import org.apache.cassandra.gms.EndpointState;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.gms.VersionedValue;
+import 
org.apache.cassandra.locator.dynamicsnitch.DynamicEndpointSnitchHistogram;
+import org.apache.cassandra.net.IAsyncCallbackWithFailure;
+import org.apache.cassandra.net.LatencyMeasurementType;
+import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.PingMessage;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 
+import static org.apache.cassandra.net.MessagingService.Verb.PING;
+import static 
org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType.LARGE_MESSAGE;
+import static 
org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType.SMALL_MESSAGE;
+
+
 /**
  * A dynamic snitch that sorts endpoints by latency with an adapted phi 
failure detector
+ * Note that the subclasses (e.g. {@link DynamicEndpointSnitchHistogram}) 
are responsible for actually measuring
+ * latency and providing an ISnitchMeasurement implementation back to this 
class.
  */
-public class DynamicEndpointSnitch extends AbstractEndpointSnitch 
implements ILatencySubscriber, DynamicEndpointSnitchMBean
+public abstract class DynamicEndpointSnitch extends AbstractEndpointSnitch 
implements ILatencySubscriber, DynamicEndpointSnitchMBean
 {
-private static final boolean USE_SEVERITY = 
!Boolean.getBoolean("cassandra.ignore_dynamic_snitch_severity");
-
-private static final double ALPHA = 0.75; // set to 0.75 to make EDS 
more biased to towards the newer values
-private static final int WINDOW_SIZE = 100;
-
-private volatile int dynamicUpdateInterval = 
DatabaseDescriptor.getDynamicUpdateInterval();
-private volatile int dynamicResetInterval = 
DatabaseDescriptor.getDynamicResetInterval();
-private volatile double dynamicBadnessThreshold = 
DatabaseDescriptor.getDynamicBadnessThreshold();
+private static final Logger logger = 
LoggerFactory.getLogger(DynamicEndpointSnitch.class);
+
+// Latency measurement and ranking. The samples contain latency 
measurements and the scores are used for ranking
+protected boolean registered = false;
+protected static final boolean USE_SEVERITY = 
!Boolean.getBoolean("cassandra.ignore_dynamic_snitch_severity");
+protected volatile Map scores = new 
HashMap<>();
+protected final Map samples 
= new ConcurrentHashMap<>();
+
+// Latency probe functionality for actively probing endpoints that we 
haven't measured recently but are ranking
+public static final long MAX_PROBE_INTERVAL_MS = 
Long.getLong("cassandra.dynamic_snitch_max_probe_interval_ms", 60 * 10 * 1000L);
+public static final long MIN_PROBE_INTERVAL_MS = 
Long.getLong("cassandra.dynamic_snitch_min_probe_interval_ms", 60 * 1000L) ;
+// The probe rate is set later when configuration is read
+protected static final RateLimiter probeRateLimiter = 
RateLimiter.create(1);
+protected static final DebuggableScheduledThreadPoolExecutor 
latencyProbeExecutor = new DebuggableScheduledThreadPoolExecutor(1, 
"LatencyProbes", Thread.MIN_PRIORITY);
+
+// User confi

[GitHub] cassandra pull request #283: CASSANDRA-14459: DynamicEndpointSnitch should n...

2018-11-09 Thread jolynch
Github user jolynch commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/283#discussion_r232435926
  
--- Diff: src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java 
---
@@ -23,128 +23,300 @@
 import java.net.UnknownHostException;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 
-import com.codahale.metrics.ExponentiallyDecayingReservoir;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
-import com.codahale.metrics.Snapshot;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.RateLimiter;
+import com.google.common.util.concurrent.SettableFuture;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import 
org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
 import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.exceptions.RequestFailureReason;
 import org.apache.cassandra.gms.ApplicationState;
 import org.apache.cassandra.gms.EndpointState;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.gms.VersionedValue;
+import 
org.apache.cassandra.locator.dynamicsnitch.DynamicEndpointSnitchHistogram;
+import org.apache.cassandra.net.IAsyncCallbackWithFailure;
+import org.apache.cassandra.net.LatencyMeasurementType;
+import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.PingMessage;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 
+import static org.apache.cassandra.net.MessagingService.Verb.PING;
+import static 
org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType.LARGE_MESSAGE;
+import static 
org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType.SMALL_MESSAGE;
+
+
 /**
  * A dynamic snitch that sorts endpoints by latency with an adapted phi 
failure detector
+ * Note that the subclasses (e.g. {@link DynamicEndpointSnitchHistogram}) 
are responsible for actually measuring
+ * latency and providing an ISnitchMeasurement implementation back to this 
class.
  */
-public class DynamicEndpointSnitch extends AbstractEndpointSnitch 
implements ILatencySubscriber, DynamicEndpointSnitchMBean
+public abstract class DynamicEndpointSnitch extends AbstractEndpointSnitch 
implements ILatencySubscriber, DynamicEndpointSnitchMBean
 {
-private static final boolean USE_SEVERITY = 
!Boolean.getBoolean("cassandra.ignore_dynamic_snitch_severity");
-
-private static final double ALPHA = 0.75; // set to 0.75 to make EDS 
more biased to towards the newer values
-private static final int WINDOW_SIZE = 100;
-
-private volatile int dynamicUpdateInterval = 
DatabaseDescriptor.getDynamicUpdateInterval();
-private volatile int dynamicResetInterval = 
DatabaseDescriptor.getDynamicResetInterval();
-private volatile double dynamicBadnessThreshold = 
DatabaseDescriptor.getDynamicBadnessThreshold();
+private static final Logger logger = 
LoggerFactory.getLogger(DynamicEndpointSnitch.class);
+
+// Latency measurement and ranking. The samples contain latency 
measurements and the scores are used for ranking
+protected boolean registered = false;
+protected static final boolean USE_SEVERITY = 
!Boolean.getBoolean("cassandra.ignore_dynamic_snitch_severity");
+protected volatile Map scores = new 
HashMap<>();
+protected final Map samples 
= new ConcurrentHashMap<>();
+
+// Latency probe functionality for actively probing endpoints that we 
haven't measured recently but are ranking
+public static final long MAX_PROBE_INTERVAL_MS = 
Long.getLong("cassandra.dynamic_snitch_max_probe_interval_ms", 60 * 10 * 1000L);
+public static final long MIN_PROBE_INTERVAL_MS = 
Long.getLong("cassandra.dynamic_snitch_min_probe_interval_ms", 60 * 1000L) ;
+// The probe rate is set later when configuration is read
+protected static final RateLimiter probeRateLimiter = 
RateLimiter.create(1);
+protected static final DebuggableScheduledThreadPoolExecutor 
latencyProbeExecutor = new DebuggableScheduledThreadPoolExecutor(1, 
"LatencyProbes", Thread.MIN_PRIORITY);
+
+// User confi

[GitHub] cassandra pull request #283: CASSANDRA-14459: DynamicEndpointSnitch should n...

2018-11-09 Thread jolynch
Github user jolynch commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/283#discussion_r232435703
  
--- Diff: src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java 
---
@@ -23,128 +23,300 @@
 import java.net.UnknownHostException;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 
-import com.codahale.metrics.ExponentiallyDecayingReservoir;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
-import com.codahale.metrics.Snapshot;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.RateLimiter;
+import com.google.common.util.concurrent.SettableFuture;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import 
org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
 import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.exceptions.RequestFailureReason;
 import org.apache.cassandra.gms.ApplicationState;
 import org.apache.cassandra.gms.EndpointState;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.gms.VersionedValue;
+import 
org.apache.cassandra.locator.dynamicsnitch.DynamicEndpointSnitchHistogram;
+import org.apache.cassandra.net.IAsyncCallbackWithFailure;
+import org.apache.cassandra.net.LatencyMeasurementType;
+import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.PingMessage;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 
+import static org.apache.cassandra.net.MessagingService.Verb.PING;
+import static 
org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType.LARGE_MESSAGE;
+import static 
org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType.SMALL_MESSAGE;
+
+
 /**
  * A dynamic snitch that sorts endpoints by latency with an adapted phi 
failure detector
+ * Note that the subclasses (e.g. {@link DynamicEndpointSnitchHistogram}) 
are responsible for actually measuring
+ * latency and providing an ISnitchMeasurement implementation back to this 
class.
  */
-public class DynamicEndpointSnitch extends AbstractEndpointSnitch 
implements ILatencySubscriber, DynamicEndpointSnitchMBean
+public abstract class DynamicEndpointSnitch extends AbstractEndpointSnitch 
implements ILatencySubscriber, DynamicEndpointSnitchMBean
 {
-private static final boolean USE_SEVERITY = 
!Boolean.getBoolean("cassandra.ignore_dynamic_snitch_severity");
-
-private static final double ALPHA = 0.75; // set to 0.75 to make EDS 
more biased to towards the newer values
-private static final int WINDOW_SIZE = 100;
-
-private volatile int dynamicUpdateInterval = 
DatabaseDescriptor.getDynamicUpdateInterval();
-private volatile int dynamicResetInterval = 
DatabaseDescriptor.getDynamicResetInterval();
-private volatile double dynamicBadnessThreshold = 
DatabaseDescriptor.getDynamicBadnessThreshold();
+private static final Logger logger = 
LoggerFactory.getLogger(DynamicEndpointSnitch.class);
+
+// Latency measurement and ranking. The samples contain latency 
measurements and the scores are used for ranking
+protected boolean registered = false;
+protected static final boolean USE_SEVERITY = 
!Boolean.getBoolean("cassandra.ignore_dynamic_snitch_severity");
+protected volatile Map scores = new 
HashMap<>();
+protected final Map samples 
= new ConcurrentHashMap<>();
+
+// Latency probe functionality for actively probing endpoints that we 
haven't measured recently but are ranking
+public static final long MAX_PROBE_INTERVAL_MS = 
Long.getLong("cassandra.dynamic_snitch_max_probe_interval_ms", 60 * 10 * 1000L);
+public static final long MIN_PROBE_INTERVAL_MS = 
Long.getLong("cassandra.dynamic_snitch_min_probe_interval_ms", 60 * 1000L) ;
+// The probe rate is set later when configuration is read
+protected static final RateLimiter probeRateLimiter = 
RateLimiter.create(1);
+protected static final DebuggableScheduledThreadPoolExecutor 
latencyProbeExecutor = new DebuggableScheduledThreadPoolExecutor(1, 
"LatencyProbes", Thread.MIN_PRIORITY);
+
+// User confi

[GitHub] cassandra pull request #283: CASSANDRA-14459: DynamicEndpointSnitch should n...

2018-11-09 Thread jolynch
Github user jolynch commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/283#discussion_r232435653
  
--- Diff: src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java 
---
@@ -23,128 +23,300 @@
 import java.net.UnknownHostException;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 
-import com.codahale.metrics.ExponentiallyDecayingReservoir;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
-import com.codahale.metrics.Snapshot;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.RateLimiter;
+import com.google.common.util.concurrent.SettableFuture;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import 
org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
 import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.exceptions.RequestFailureReason;
 import org.apache.cassandra.gms.ApplicationState;
 import org.apache.cassandra.gms.EndpointState;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.gms.VersionedValue;
+import 
org.apache.cassandra.locator.dynamicsnitch.DynamicEndpointSnitchHistogram;
+import org.apache.cassandra.net.IAsyncCallbackWithFailure;
+import org.apache.cassandra.net.LatencyMeasurementType;
+import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.PingMessage;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 
+import static org.apache.cassandra.net.MessagingService.Verb.PING;
+import static 
org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType.LARGE_MESSAGE;
+import static 
org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType.SMALL_MESSAGE;
+
+
 /**
  * A dynamic snitch that sorts endpoints by latency with an adapted phi 
failure detector
+ * Note that the subclasses (e.g. {@link DynamicEndpointSnitchHistogram}) 
are responsible for actually measuring
+ * latency and providing an ISnitchMeasurement implementation back to this 
class.
  */
-public class DynamicEndpointSnitch extends AbstractEndpointSnitch 
implements ILatencySubscriber, DynamicEndpointSnitchMBean
+public abstract class DynamicEndpointSnitch extends AbstractEndpointSnitch 
implements ILatencySubscriber, DynamicEndpointSnitchMBean
 {
-private static final boolean USE_SEVERITY = 
!Boolean.getBoolean("cassandra.ignore_dynamic_snitch_severity");
-
-private static final double ALPHA = 0.75; // set to 0.75 to make EDS 
more biased to towards the newer values
-private static final int WINDOW_SIZE = 100;
-
-private volatile int dynamicUpdateInterval = 
DatabaseDescriptor.getDynamicUpdateInterval();
-private volatile int dynamicResetInterval = 
DatabaseDescriptor.getDynamicResetInterval();
-private volatile double dynamicBadnessThreshold = 
DatabaseDescriptor.getDynamicBadnessThreshold();
+private static final Logger logger = 
LoggerFactory.getLogger(DynamicEndpointSnitch.class);
+
+// Latency measurement and ranking. The samples contain latency 
measurements and the scores are used for ranking
+protected boolean registered = false;
+protected static final boolean USE_SEVERITY = 
!Boolean.getBoolean("cassandra.ignore_dynamic_snitch_severity");
+protected volatile Map scores = new 
HashMap<>();
+protected final Map samples 
= new ConcurrentHashMap<>();
+
+// Latency probe functionality for actively probing endpoints that we 
haven't measured recently but are ranking
+public static final long MAX_PROBE_INTERVAL_MS = 
Long.getLong("cassandra.dynamic_snitch_max_probe_interval_ms", 60 * 10 * 1000L);
+public static final long MIN_PROBE_INTERVAL_MS = 
Long.getLong("cassandra.dynamic_snitch_min_probe_interval_ms", 60 * 1000L) ;
+// The probe rate is set later when configuration is read
+protected static final RateLimiter probeRateLimiter = 
RateLimiter.create(1);
+protected static final DebuggableScheduledThreadPoolExecutor 
latencyProbeExecutor = new DebuggableScheduledThreadPoolExecutor(1, 
"LatencyProbes", Thread.MIN_PRIORITY);
+
+// User confi

[GitHub] cassandra pull request #284: Expose schema in virtual table for CASSANDRA-14...

2018-11-09 Thread clohfink
Github user clohfink commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/284#discussion_r232325693
  
--- Diff: src/java/org/apache/cassandra/db/virtual/DescribeTables.java ---
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.virtual;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Streams;
+
+import org.apache.cassandra.cql3.functions.Function;
+import org.apache.cassandra.cql3.functions.FunctionName;
+import org.apache.cassandra.cql3.functions.UDAggregate;
+import org.apache.cassandra.cql3.functions.UDFunction;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.SchemaCQLHelper;
+import org.apache.cassandra.db.marshal.CompositeType;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.db.marshal.UserType;
+import org.apache.cassandra.dht.LocalPartitioner;
+import org.apache.cassandra.schema.Functions;
+import org.apache.cassandra.schema.IndexMetadata;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.schema.Types;
+import org.apache.cassandra.schema.ViewMetadata;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+public class DescribeTables
+{
+private static final String KEYSPACE = "keyspace_name";
+private static final String CQL = "cql";
+
+private static final CompositeType utfComposite = 
CompositeType.getInstance(UTF8Type.instance, UTF8Type.instance);
+
+public static Collection getAll(String name)
+{
+return ImmutableList.of(new DescribeKeyspaceTable(name),
+new DescribeIndexesTable(name),
+new DescribeTypesTable(name),
+new DescribeAggregatesTable(name),
+new DescribeFunctionsTable(name),
+new DescribeViewsTable(name),
+new DescribeTablesTable(name));
+}
+
+static final class DescribeKeyspaceTable extends AbstractVirtualTable
+{
+DescribeKeyspaceTable(String keyspace)
+{
+super(TableMetadata.builder(keyspace, "describe_keyspace")
+   .comment("cql for keyspace metadata")
+   .kind(TableMetadata.Kind.VIRTUAL)
+   .partitioner(new 
LocalPartitioner(UTF8Type.instance))
+   .addPartitionKeyColumn(KEYSPACE, 
UTF8Type.instance)
+   .addRegularColumn(CQL, UTF8Type.instance)
+   .build());
+}
+
+@Override
+public DataSet data(DecoratedKey partitionKey)
+{
+String keyspace = 
UTF8Type.instance.compose(partitionKey.getKey());
+
+SimpleDataSet result = new SimpleDataSet(metadata());
+result.row(keyspace)
+  .column(CQL, SchemaCQLHelper.getKeyspaceAsCQL(keyspace));
+return result;
+}
+
+public DataSet data()
+{
+SimpleDataSet result = new SimpleDataSet(metadata());
+for (String keyspace : Schema.instance.getKeyspaces())
+{
+result.row(keyspace)
+  .column(CQL, 
SchemaCQLHelper.getKeyspaceAsCQL(keyspace));
+}
+return result;
+}
+}
+
+static abstract class AbstractDescribeTable extends 
AbstractVirtualTable
   

[GitHub] cassandra pull request #284: Expose schema in virtual table for CASSANDRA-14...

2018-11-09 Thread JeremiahDJordan
Github user JeremiahDJordan commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/284#discussion_r232303844
  
--- Diff: src/java/org/apache/cassandra/db/virtual/DescribeTables.java ---
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.virtual;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Streams;
+
+import org.apache.cassandra.cql3.functions.Function;
+import org.apache.cassandra.cql3.functions.FunctionName;
+import org.apache.cassandra.cql3.functions.UDAggregate;
+import org.apache.cassandra.cql3.functions.UDFunction;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.SchemaCQLHelper;
+import org.apache.cassandra.db.marshal.CompositeType;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.db.marshal.UserType;
+import org.apache.cassandra.dht.LocalPartitioner;
+import org.apache.cassandra.schema.Functions;
+import org.apache.cassandra.schema.IndexMetadata;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.schema.Types;
+import org.apache.cassandra.schema.ViewMetadata;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+public class DescribeTables
+{
+private static final String KEYSPACE = "keyspace_name";
+private static final String CQL = "cql";
+
+private static final CompositeType utfComposite = 
CompositeType.getInstance(UTF8Type.instance, UTF8Type.instance);
+
+public static Collection getAll(String name)
+{
+return ImmutableList.of(new DescribeKeyspaceTable(name),
+new DescribeIndexesTable(name),
+new DescribeTypesTable(name),
+new DescribeAggregatesTable(name),
+new DescribeFunctionsTable(name),
+new DescribeViewsTable(name),
+new DescribeTablesTable(name));
+}
+
+static final class DescribeKeyspaceTable extends AbstractVirtualTable
+{
+DescribeKeyspaceTable(String keyspace)
+{
+super(TableMetadata.builder(keyspace, "describe_keyspace")
+   .comment("cql for keyspace metadata")
+   .kind(TableMetadata.Kind.VIRTUAL)
+   .partitioner(new 
LocalPartitioner(UTF8Type.instance))
+   .addPartitionKeyColumn(KEYSPACE, 
UTF8Type.instance)
+   .addRegularColumn(CQL, UTF8Type.instance)
+   .build());
+}
+
+@Override
+public DataSet data(DecoratedKey partitionKey)
+{
+String keyspace = 
UTF8Type.instance.compose(partitionKey.getKey());
+
+SimpleDataSet result = new SimpleDataSet(metadata());
+result.row(keyspace)
+  .column(CQL, SchemaCQLHelper.getKeyspaceAsCQL(keyspace));
+return result;
+}
+
+public DataSet data()
+{
+SimpleDataSet result = new SimpleDataSet(metadata());
+for (String keyspace : Schema.instance.getKeyspaces())
+{
+result.row(keyspace)
+  .column(CQL, 
SchemaCQLHelper.getKeyspaceAsCQL(keyspace));
+}
+return result;
+}
+}
+
+static abstract class AbstractDescribeTable extends 
AbstractVirtualTa

[GitHub] cassandra pull request #212: Rework the wait for healthy logic to count down...

2018-11-07 Thread jolynch
Github user jolynch commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/212#discussion_r231711085
  
--- Diff: src/java/org/apache/cassandra/config/Config.java ---
@@ -376,9 +376,31 @@
 
 public String full_query_log_dir = null;
 
-// parameters to adjust how much to delay startup until a certain 
amount of the cluster is connect to and marked alive
-public int block_for_peers_percentage = 70;
+/**
+ * When a node first starts up it intially thinks all other peers are 
DOWN, and then as the initial gossip
+ * broadcast messages comes back nodes transition to UP. These options 
configure how many nodes can remain in
+ * DOWN state before we make this node available as a coordinator, as 
well as an overall timeout on this process
+ * to ensure that startup is not delayed too much.
+ *
+ * The defaults are tuned for LOCAL_ONE consistency levels with RF=3, 
and have natural settings for other CLs:
+ *
+ * Consistency Level | local_dc | all_dcs
+ * 
+ * LOCAL_ONE | default (2)  | default (any)
+ * LOCAL_QUORUM  | 1| default (any)
+ * ONE   | any  | RF - 1
+ * QUORUM| any  | (RF / 2) - 1
+ * ALL   | default  | 0
+ *
+ * A concrete example with QUORUM would be if you have 3 replicas in 2 
datacenters, then you would set
+ * block_for_peers_all_dcs to (6 / 2) - 1 = 2 because that guarantees 
that at most 2 hosts in all datacenters
+ * are down when you start taking client traffic, which should 
satistfy QUORUM for all RF=6 QUORUM queries.
+ */
+public int block_for_peers_local_dc = 2;
--- End diff --

We ended up going the direction of a boolean.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #212: Rework the wait for healthy logic to count down...

2018-11-07 Thread jolynch
Github user jolynch commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/212#discussion_r231705256
  
--- Diff: 
test/unit/org/apache/cassandra/net/StartupClusterConnectivityCheckerTest.java 
---
@@ -69,33 +113,102 @@ public void tearDown()
 @Test
 public void execute_HappyPath()
 {
-Sink sink = new Sink(true, true);
+Sink sink = new Sink(true, true, peers);
 MessagingService.instance().addMessageSink(sink);
-Assert.assertTrue(connectivityChecker.execute(peers));
+Assert.assertTrue(localQuorumConnectivityChecker.execute(peers, 
this::getDatacenter));
 checkAllConnectionTypesSeen(sink);
 }
 
 @Test
 public void execute_NotAlive()
 {
-Sink sink = new Sink(false, true);
+Sink sink = new Sink(false, true, peers);
 MessagingService.instance().addMessageSink(sink);
-Assert.assertFalse(connectivityChecker.execute(peers));
+Assert.assertFalse(localQuorumConnectivityChecker.execute(peers, 
this::getDatacenter));
 checkAllConnectionTypesSeen(sink);
 }
 
 @Test
 public void execute_NoConnectionsAcks()
 {
-Sink sink = new Sink(true, false);
+Sink sink = new Sink(true, false, peers);
 MessagingService.instance().addMessageSink(sink);
-Assert.assertFalse(connectivityChecker.execute(peers));
+Assert.assertFalse(localQuorumConnectivityChecker.execute(peers, 
this::getDatacenter));
+}
+
+@Test
+public void execute_LocalQuorum()
+{
+// local peer plus 3 peers from same dc shouldn't pass (4/6)
+Set available = new HashSet<>();
+copyCount(peersAMinusLocal, available, NUM_PER_DC - 3);
+checkAvailable(localQuorumConnectivityChecker, available, false, 
true);
+
+// local peer plus 4 peers from same dc should pass (5/6)
+available.clear();
+copyCount(peersAMinusLocal, available, NUM_PER_DC - 2);
+checkAvailable(localQuorumConnectivityChecker, available, true, 
true);
+}
+
+@Test
+public void execute_GlobalQuorum()
+{
+// local dc passing shouldn'nt pass globally with two hosts down 
in datacenterB
+Set available = new HashSet<>();
+copyCount(peersAMinusLocal, available, NUM_PER_DC - 2);
+copyCount(peersB, available, NUM_PER_DC - 2);
+copyCount(peersC, available, NUM_PER_DC - 1);
+checkAvailable(globalQuorumConnectivityChecker, available, false, 
true);
+
+available.clear();
+copyCount(peersAMinusLocal, available, NUM_PER_DC -2);
+copyCount(peersB, available, NUM_PER_DC - 1);
+copyCount(peersC, available, NUM_PER_DC - 1);
+checkAvailable(globalQuorumConnectivityChecker, available, true, 
true);
+}
+
+@Test
+public void execute_Noop()
+{
+checkAvailable(noopChecker, new HashSet<>(), true, false);
+}
+
+@Test
+public void execute_ZeroWait()
+{
+checkAvailable(zeroWaitChecker, new HashSet<>(), false, false);
--- End diff --

Makes sense, I refactored this test to look at that instead of just 
returning immediately.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #212: Rework the wait for healthy logic to count down...

2018-11-07 Thread jolynch
Github user jolynch commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/212#discussion_r231697341
  
--- Diff: 
test/unit/org/apache/cassandra/net/StartupClusterConnectivityCheckerTest.java 
---
@@ -69,33 +113,102 @@ public void tearDown()
 @Test
 public void execute_HappyPath()
 {
-Sink sink = new Sink(true, true);
+Sink sink = new Sink(true, true, peers);
 MessagingService.instance().addMessageSink(sink);
-Assert.assertTrue(connectivityChecker.execute(peers));
+Assert.assertTrue(localQuorumConnectivityChecker.execute(peers, 
this::getDatacenter));
 checkAllConnectionTypesSeen(sink);
 }
 
 @Test
 public void execute_NotAlive()
 {
-Sink sink = new Sink(false, true);
+Sink sink = new Sink(false, true, peers);
 MessagingService.instance().addMessageSink(sink);
-Assert.assertFalse(connectivityChecker.execute(peers));
+Assert.assertFalse(localQuorumConnectivityChecker.execute(peers, 
this::getDatacenter));
 checkAllConnectionTypesSeen(sink);
 }
 
 @Test
 public void execute_NoConnectionsAcks()
 {
-Sink sink = new Sink(true, false);
+Sink sink = new Sink(true, false, peers);
 MessagingService.instance().addMessageSink(sink);
-Assert.assertFalse(connectivityChecker.execute(peers));
+Assert.assertFalse(localQuorumConnectivityChecker.execute(peers, 
this::getDatacenter));
+}
+
+@Test
+public void execute_LocalQuorum()
+{
+// local peer plus 3 peers from same dc shouldn't pass (4/6)
+Set available = new HashSet<>();
+copyCount(peersAMinusLocal, available, NUM_PER_DC - 3);
+checkAvailable(localQuorumConnectivityChecker, available, false, 
true);
+
+// local peer plus 4 peers from same dc should pass (5/6)
+available.clear();
+copyCount(peersAMinusLocal, available, NUM_PER_DC - 2);
+checkAvailable(localQuorumConnectivityChecker, available, true, 
true);
+}
+
+@Test
+public void execute_GlobalQuorum()
+{
+// local dc passing shouldn'nt pass globally with two hosts down 
in datacenterB
--- End diff --

Ack, fixed.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #212: Rework the wait for healthy logic to count down...

2018-11-07 Thread jolynch
Github user jolynch commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/212#discussion_r231696993
  
--- Diff: 
test/unit/org/apache/cassandra/net/StartupClusterConnectivityCheckerTest.java 
---
@@ -36,13 +36,34 @@
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.gms.HeartBeatState;
 import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.utils.FBUtilities;
 
 import static 
org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType.SMALL_MESSAGE;
 
 public class StartupClusterConnectivityCheckerTest
 {
-private StartupClusterConnectivityChecker connectivityChecker;
+private StartupClusterConnectivityChecker 
localQuorumConnectivityChecker;
+private StartupClusterConnectivityChecker 
globalQuorumConnectivityChecker;
+private StartupClusterConnectivityChecker noopChecker;
+private StartupClusterConnectivityChecker zeroWaitChecker;
+
+private static final int NUM_PER_DC = 6;
 private Set peers;
+private Set peersA;
+private Set peersAMinusLocal;
+private Set peersB;
+private Set peersC;
+
+private String getDatacenter(InetAddressAndPort endpoint)
+{
+if (peersA.contains(endpoint))
+return "datacenterA";
+if (peersB.contains(endpoint))
+return "datacenterB";
+else if (peersC.contains(endpoint))
+return "datacenterC";
+return "NA";
--- End diff --

Right, so it'll fail fast, done.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #212: Rework the wait for healthy logic to count down...

2018-11-07 Thread jolynch
Github user jolynch commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/212#discussion_r231696207
  
--- Diff: 
src/java/org/apache/cassandra/net/StartupClusterConnectivityChecker.java ---
@@ -48,81 +51,133 @@
 {
 private static final Logger logger = 
LoggerFactory.getLogger(StartupClusterConnectivityChecker.class);
 
-private final int targetPercent;
+private final boolean blockForRemoteDcs;
 private final long timeoutNanos;
 
-public static StartupClusterConnectivityChecker create(int 
targetPercent, int timeoutSecs)
+public static StartupClusterConnectivityChecker create(long 
timeoutSecs, boolean blockForRemoteDcs)
 {
-timeoutSecs = Math.max(1, timeoutSecs);
+if (timeoutSecs < 0)
+logger.warn("skipping block-for-peers due to negative timeout. 
You may encounter errors or timeouts on" +
+" the first user query");
 if (timeoutSecs > 100)
 logger.warn("setting the block-for-peers timeout (in seconds) 
to {} might be a bit excessive, but using it nonetheless", timeoutSecs);
 long timeoutNanos = TimeUnit.SECONDS.toNanos(timeoutSecs);
 
-return new StartupClusterConnectivityChecker(targetPercent, 
timeoutNanos);
+return new StartupClusterConnectivityChecker(timeoutNanos, 
blockForRemoteDcs);
 }
 
 @VisibleForTesting
-StartupClusterConnectivityChecker(int targetPercent, long timeoutNanos)
+StartupClusterConnectivityChecker(long timeoutNanos, boolean 
blockForRemoteDcs)
 {
-this.targetPercent = Math.min(100, Math.max(0, targetPercent));
+this.blockForRemoteDcs = blockForRemoteDcs;
 this.timeoutNanos = timeoutNanos;
 }
 
 /**
  * @param peers The currently known peers in the cluster; argument is 
not modified.
+ * @param getDatacenterSource A function for mapping peers to their 
datacenter.
  * @return true if the requested percentage of peers are marked ALIVE 
in gossip and have their connections opened;
  * else false.
  */
-public boolean execute(Set peers)
+public boolean execute(Set peers, 
Function getDatacenterSource)
 {
-if (targetPercent == 0 || peers == null)
+if (peers == null || this.timeoutNanos < 0)
 return true;
 
 // make a copy of the set, to avoid mucking with the input (in 
case it's a sensitive collection)
 peers = new HashSet<>(peers);
-peers.remove(FBUtilities.getBroadcastAddressAndPort());
+InetAddressAndPort localAddress = 
FBUtilities.getBroadcastAddressAndPort();
+String localDc = getDatacenterSource.apply(localAddress);
 
+peers.remove(localAddress);
 if (peers.isEmpty())
 return true;
 
-logger.info("choosing to block until {}% of the {} known peers are 
marked alive and connections are established; max time to wait = {} seconds",
-targetPercent, peers.size(), 
TimeUnit.NANOSECONDS.toSeconds(timeoutNanos));
+// make a copy of the datacenter mapping (in case gossip updates 
happen during this method or some such)
+Map datacenterMap = peers.stream()
+ 
.collect(Collectors.toMap(k -> k, getDatacenterSource));
+Function getDatacenter = 
datacenterMap::get;
 
-long startNanos = System.nanoTime();
+Map> peersByDc = peers.stream()
+  
.collect(Collectors.groupingBy(getDatacenter,
+   
  Collectors.toSet()));
+
+if (!blockForRemoteDcs)
+{
+peersByDc.keySet().retainAll(Collections.singleton(localDc));
+logger.info("Blocking coordination until only a single peer is 
DOWN in the local datacenter, timeout={}s",
+TimeUnit.NANOSECONDS.toSeconds(timeoutNanos));
+}
+else
+{
+logger.info("Blocking coordination until only a single peer is 
DOWN in each datacenter, timeout={}s",
+TimeUnit.NANOSECONDS.toSeconds(timeoutNanos));
+}
 
 AckMap acks = new AckMap(3);
-int target = (int) ((targetPercent / 100.0) * peers.size());
-CountDownLatch latch = new CountDownLatch(target);
+Map latchMap = new 
HashMap<>(peersByDc.size());
+for (Map.Entry> entry: 
peersByDc.entrySet())
+{
+latchMap.put(entry.getKey(), new 
CountDownLatch(Math.max(entry.getValue().size() - 1, 0)));
+  

[GitHub] cassandra pull request #212: Rework the wait for healthy logic to count down...

2018-11-07 Thread jolynch
Github user jolynch commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/212#discussion_r231695792
  
--- Diff: 
src/java/org/apache/cassandra/net/StartupClusterConnectivityChecker.java ---
@@ -48,81 +51,133 @@
 {
 private static final Logger logger = 
LoggerFactory.getLogger(StartupClusterConnectivityChecker.class);
 
-private final int targetPercent;
+private final boolean blockForRemoteDcs;
 private final long timeoutNanos;
 
-public static StartupClusterConnectivityChecker create(int 
targetPercent, int timeoutSecs)
+public static StartupClusterConnectivityChecker create(long 
timeoutSecs, boolean blockForRemoteDcs)
 {
-timeoutSecs = Math.max(1, timeoutSecs);
+if (timeoutSecs < 0)
+logger.warn("skipping block-for-peers due to negative timeout. 
You may encounter errors or timeouts on" +
+" the first user query");
 if (timeoutSecs > 100)
 logger.warn("setting the block-for-peers timeout (in seconds) 
to {} might be a bit excessive, but using it nonetheless", timeoutSecs);
 long timeoutNanos = TimeUnit.SECONDS.toNanos(timeoutSecs);
 
-return new StartupClusterConnectivityChecker(targetPercent, 
timeoutNanos);
+return new StartupClusterConnectivityChecker(timeoutNanos, 
blockForRemoteDcs);
 }
 
 @VisibleForTesting
-StartupClusterConnectivityChecker(int targetPercent, long timeoutNanos)
+StartupClusterConnectivityChecker(long timeoutNanos, boolean 
blockForRemoteDcs)
 {
-this.targetPercent = Math.min(100, Math.max(0, targetPercent));
+this.blockForRemoteDcs = blockForRemoteDcs;
 this.timeoutNanos = timeoutNanos;
 }
 
 /**
  * @param peers The currently known peers in the cluster; argument is 
not modified.
+ * @param getDatacenterSource A function for mapping peers to their 
datacenter.
  * @return true if the requested percentage of peers are marked ALIVE 
in gossip and have their connections opened;
  * else false.
  */
-public boolean execute(Set peers)
+public boolean execute(Set peers, 
Function getDatacenterSource)
 {
-if (targetPercent == 0 || peers == null)
+if (peers == null || this.timeoutNanos < 0)
 return true;
 
 // make a copy of the set, to avoid mucking with the input (in 
case it's a sensitive collection)
 peers = new HashSet<>(peers);
-peers.remove(FBUtilities.getBroadcastAddressAndPort());
+InetAddressAndPort localAddress = 
FBUtilities.getBroadcastAddressAndPort();
+String localDc = getDatacenterSource.apply(localAddress);
 
+peers.remove(localAddress);
 if (peers.isEmpty())
 return true;
 
-logger.info("choosing to block until {}% of the {} known peers are 
marked alive and connections are established; max time to wait = {} seconds",
-targetPercent, peers.size(), 
TimeUnit.NANOSECONDS.toSeconds(timeoutNanos));
+// make a copy of the datacenter mapping (in case gossip updates 
happen during this method or some such)
+Map datacenterMap = peers.stream()
+ 
.collect(Collectors.toMap(k -> k, getDatacenterSource));
+Function getDatacenter = 
datacenterMap::get;
 
-long startNanos = System.nanoTime();
+Map> peersByDc = peers.stream()
+  
.collect(Collectors.groupingBy(getDatacenter,
+   
  Collectors.toSet()));
+
+if (!blockForRemoteDcs)
+{
+peersByDc.keySet().retainAll(Collections.singleton(localDc));
--- End diff --

Ack, added the comment.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #212: Rework the wait for healthy logic to count down...

2018-11-07 Thread jolynch
Github user jolynch commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/212#discussion_r231695413
  
--- Diff: 
src/java/org/apache/cassandra/net/StartupClusterConnectivityChecker.java ---
@@ -48,81 +51,133 @@
 {
 private static final Logger logger = 
LoggerFactory.getLogger(StartupClusterConnectivityChecker.class);
 
-private final int targetPercent;
+private final boolean blockForRemoteDcs;
 private final long timeoutNanos;
 
-public static StartupClusterConnectivityChecker create(int 
targetPercent, int timeoutSecs)
+public static StartupClusterConnectivityChecker create(long 
timeoutSecs, boolean blockForRemoteDcs)
 {
-timeoutSecs = Math.max(1, timeoutSecs);
+if (timeoutSecs < 0)
+logger.warn("skipping block-for-peers due to negative timeout. 
You may encounter errors or timeouts on" +
+" the first user query");
 if (timeoutSecs > 100)
 logger.warn("setting the block-for-peers timeout (in seconds) 
to {} might be a bit excessive, but using it nonetheless", timeoutSecs);
 long timeoutNanos = TimeUnit.SECONDS.toNanos(timeoutSecs);
 
-return new StartupClusterConnectivityChecker(targetPercent, 
timeoutNanos);
+return new StartupClusterConnectivityChecker(timeoutNanos, 
blockForRemoteDcs);
 }
 
 @VisibleForTesting
-StartupClusterConnectivityChecker(int targetPercent, long timeoutNanos)
+StartupClusterConnectivityChecker(long timeoutNanos, boolean 
blockForRemoteDcs)
 {
-this.targetPercent = Math.min(100, Math.max(0, targetPercent));
+this.blockForRemoteDcs = blockForRemoteDcs;
 this.timeoutNanos = timeoutNanos;
 }
 
 /**
  * @param peers The currently known peers in the cluster; argument is 
not modified.
+ * @param getDatacenterSource A function for mapping peers to their 
datacenter.
  * @return true if the requested percentage of peers are marked ALIVE 
in gossip and have their connections opened;
  * else false.
  */
-public boolean execute(Set peers)
+public boolean execute(Set peers, 
Function getDatacenterSource)
 {
-if (targetPercent == 0 || peers == null)
+if (peers == null || this.timeoutNanos < 0)
 return true;
 
 // make a copy of the set, to avoid mucking with the input (in 
case it's a sensitive collection)
 peers = new HashSet<>(peers);
-peers.remove(FBUtilities.getBroadcastAddressAndPort());
+InetAddressAndPort localAddress = 
FBUtilities.getBroadcastAddressAndPort();
+String localDc = getDatacenterSource.apply(localAddress);
 
+peers.remove(localAddress);
 if (peers.isEmpty())
 return true;
 
-logger.info("choosing to block until {}% of the {} known peers are 
marked alive and connections are established; max time to wait = {} seconds",
-targetPercent, peers.size(), 
TimeUnit.NANOSECONDS.toSeconds(timeoutNanos));
+// make a copy of the datacenter mapping (in case gossip updates 
happen during this method or some such)
+Map datacenterMap = peers.stream()
+ 
.collect(Collectors.toMap(k -> k, getDatacenterSource));
+Function getDatacenter = 
datacenterMap::get;
 
-long startNanos = System.nanoTime();
+Map> peersByDc = peers.stream()
+  
.collect(Collectors.groupingBy(getDatacenter,
+   
  Collectors.toSet()));
+
+if (!blockForRemoteDcs)
+{
+peersByDc.keySet().retainAll(Collections.singleton(localDc));
+logger.info("Blocking coordination until only a single peer is 
DOWN in the local datacenter, timeout={}s",
+TimeUnit.NANOSECONDS.toSeconds(timeoutNanos));
+}
+else
+{
+logger.info("Blocking coordination until only a single peer is 
DOWN in each datacenter, timeout={}s",
+TimeUnit.NANOSECONDS.toSeconds(timeoutNanos));
+}
 
 AckMap acks = new AckMap(3);
-int target = (int) ((targetPercent / 100.0) * peers.size());
-CountDownLatch latch = new CountDownLatch(target);
+Map latchMap = new 
HashMap<>(peersByDc.size());
--- End diff --

I called it `dcToRemainingPeers` to try to convey what it was counting 
down. Let me know if that's not clear and I'll change it.


---

-

[GitHub] cassandra pull request #212: Rework the wait for healthy logic to count down...

2018-11-07 Thread jolynch
Github user jolynch commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/212#discussion_r231693230
  
--- Diff: 
src/java/org/apache/cassandra/net/StartupClusterConnectivityChecker.java ---
@@ -48,81 +51,133 @@
 {
 private static final Logger logger = 
LoggerFactory.getLogger(StartupClusterConnectivityChecker.class);
 
-private final int targetPercent;
+private final boolean blockForRemoteDcs;
 private final long timeoutNanos;
 
-public static StartupClusterConnectivityChecker create(int 
targetPercent, int timeoutSecs)
+public static StartupClusterConnectivityChecker create(long 
timeoutSecs, boolean blockForRemoteDcs)
 {
-timeoutSecs = Math.max(1, timeoutSecs);
+if (timeoutSecs < 0)
+logger.warn("skipping block-for-peers due to negative timeout. 
You may encounter errors or timeouts on" +
+" the first user query");
 if (timeoutSecs > 100)
 logger.warn("setting the block-for-peers timeout (in seconds) 
to {} might be a bit excessive, but using it nonetheless", timeoutSecs);
 long timeoutNanos = TimeUnit.SECONDS.toNanos(timeoutSecs);
 
-return new StartupClusterConnectivityChecker(targetPercent, 
timeoutNanos);
+return new StartupClusterConnectivityChecker(timeoutNanos, 
blockForRemoteDcs);
 }
 
 @VisibleForTesting
-StartupClusterConnectivityChecker(int targetPercent, long timeoutNanos)
+StartupClusterConnectivityChecker(long timeoutNanos, boolean 
blockForRemoteDcs)
 {
-this.targetPercent = Math.min(100, Math.max(0, targetPercent));
+this.blockForRemoteDcs = blockForRemoteDcs;
 this.timeoutNanos = timeoutNanos;
 }
 
 /**
  * @param peers The currently known peers in the cluster; argument is 
not modified.
+ * @param getDatacenterSource A function for mapping peers to their 
datacenter.
  * @return true if the requested percentage of peers are marked ALIVE 
in gossip and have their connections opened;
  * else false.
  */
-public boolean execute(Set peers)
+public boolean execute(Set peers, 
Function getDatacenterSource)
 {
-if (targetPercent == 0 || peers == null)
+if (peers == null || this.timeoutNanos < 0)
 return true;
 
 // make a copy of the set, to avoid mucking with the input (in 
case it's a sensitive collection)
 peers = new HashSet<>(peers);
-peers.remove(FBUtilities.getBroadcastAddressAndPort());
+InetAddressAndPort localAddress = 
FBUtilities.getBroadcastAddressAndPort();
+String localDc = getDatacenterSource.apply(localAddress);
 
+peers.remove(localAddress);
 if (peers.isEmpty())
 return true;
 
-logger.info("choosing to block until {}% of the {} known peers are 
marked alive and connections are established; max time to wait = {} seconds",
-targetPercent, peers.size(), 
TimeUnit.NANOSECONDS.toSeconds(timeoutNanos));
+// make a copy of the datacenter mapping (in case gossip updates 
happen during this method or some such)
+Map datacenterMap = peers.stream()
+ 
.collect(Collectors.toMap(k -> k, getDatacenterSource));
+Function getDatacenter = 
datacenterMap::get;
 
-long startNanos = System.nanoTime();
+Map> peersByDc = peers.stream()
+  
.collect(Collectors.groupingBy(getDatacenter,
+   
  Collectors.toSet()));
+
+if (!blockForRemoteDcs)
+{
+peersByDc.keySet().retainAll(Collections.singleton(localDc));
+logger.info("Blocking coordination until only a single peer is 
DOWN in the local datacenter, timeout={}s",
+TimeUnit.NANOSECONDS.toSeconds(timeoutNanos));
+}
+else
+{
+logger.info("Blocking coordination until only a single peer is 
DOWN in each datacenter, timeout={}s",
+TimeUnit.NANOSECONDS.toSeconds(timeoutNanos));
+}
 
 AckMap acks = new AckMap(3);
-int target = (int) ((targetPercent / 100.0) * peers.size());
-CountDownLatch latch = new CountDownLatch(target);
+Map latchMap = new 
HashMap<>(peersByDc.size());
+for (Map.Entry> entry: 
peersByDc.entrySet())
+{
+latchMap.put(entry.getKey(), new 
CountDownLatch(Math.max(entry.getValue().size() - 1, 0)));
+  

[GitHub] cassandra pull request #212: Rework the wait for healthy logic to count down...

2018-11-07 Thread jolynch
Github user jolynch commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/212#discussion_r231691984
  
--- Diff: 
src/java/org/apache/cassandra/net/StartupClusterConnectivityChecker.java ---
@@ -48,81 +51,133 @@
 {
 private static final Logger logger = 
LoggerFactory.getLogger(StartupClusterConnectivityChecker.class);
 
-private final int targetPercent;
+private final boolean blockForRemoteDcs;
 private final long timeoutNanos;
 
-public static StartupClusterConnectivityChecker create(int 
targetPercent, int timeoutSecs)
+public static StartupClusterConnectivityChecker create(long 
timeoutSecs, boolean blockForRemoteDcs)
 {
-timeoutSecs = Math.max(1, timeoutSecs);
+if (timeoutSecs < 0)
+logger.warn("skipping block-for-peers due to negative timeout. 
You may encounter errors or timeouts on" +
+" the first user query");
 if (timeoutSecs > 100)
 logger.warn("setting the block-for-peers timeout (in seconds) 
to {} might be a bit excessive, but using it nonetheless", timeoutSecs);
 long timeoutNanos = TimeUnit.SECONDS.toNanos(timeoutSecs);
 
-return new StartupClusterConnectivityChecker(targetPercent, 
timeoutNanos);
+return new StartupClusterConnectivityChecker(timeoutNanos, 
blockForRemoteDcs);
 }
 
 @VisibleForTesting
-StartupClusterConnectivityChecker(int targetPercent, long timeoutNanos)
+StartupClusterConnectivityChecker(long timeoutNanos, boolean 
blockForRemoteDcs)
 {
-this.targetPercent = Math.min(100, Math.max(0, targetPercent));
+this.blockForRemoteDcs = blockForRemoteDcs;
 this.timeoutNanos = timeoutNanos;
 }
 
 /**
  * @param peers The currently known peers in the cluster; argument is 
not modified.
+ * @param getDatacenterSource A function for mapping peers to their 
datacenter.
  * @return true if the requested percentage of peers are marked ALIVE 
in gossip and have their connections opened;
  * else false.
  */
-public boolean execute(Set peers)
+public boolean execute(Set peers, 
Function getDatacenterSource)
 {
-if (targetPercent == 0 || peers == null)
+if (peers == null || this.timeoutNanos < 0)
 return true;
 
 // make a copy of the set, to avoid mucking with the input (in 
case it's a sensitive collection)
 peers = new HashSet<>(peers);
-peers.remove(FBUtilities.getBroadcastAddressAndPort());
+InetAddressAndPort localAddress = 
FBUtilities.getBroadcastAddressAndPort();
+String localDc = getDatacenterSource.apply(localAddress);
 
+peers.remove(localAddress);
 if (peers.isEmpty())
 return true;
 
-logger.info("choosing to block until {}% of the {} known peers are 
marked alive and connections are established; max time to wait = {} seconds",
-targetPercent, peers.size(), 
TimeUnit.NANOSECONDS.toSeconds(timeoutNanos));
+// make a copy of the datacenter mapping (in case gossip updates 
happen during this method or some such)
+Map datacenterMap = peers.stream()
+ 
.collect(Collectors.toMap(k -> k, getDatacenterSource));
+Function getDatacenter = 
datacenterMap::get;
 
-long startNanos = System.nanoTime();
+Map> peersByDc = peers.stream()
+  
.collect(Collectors.groupingBy(getDatacenter,
+   
  Collectors.toSet()));
+
+if (!blockForRemoteDcs)
+{
+peersByDc.keySet().retainAll(Collections.singleton(localDc));
+logger.info("Blocking coordination until only a single peer is 
DOWN in the local datacenter, timeout={}s",
+TimeUnit.NANOSECONDS.toSeconds(timeoutNanos));
+}
+else
+{
+logger.info("Blocking coordination until only a single peer is 
DOWN in each datacenter, timeout={}s",
+TimeUnit.NANOSECONDS.toSeconds(timeoutNanos));
+}
 
 AckMap acks = new AckMap(3);
-int target = (int) ((targetPercent / 100.0) * peers.size());
-CountDownLatch latch = new CountDownLatch(target);
+Map latchMap = new 
HashMap<>(peersByDc.size());
+for (Map.Entry> entry: 
peersByDc.entrySet())
+{
+latchMap.put(entry.getKey(), new 
CountDownLatch(Math.max(entry.getValue().size() - 1, 0)));
+  

[GitHub] cassandra pull request #212: Rework the wait for healthy logic to count down...

2018-11-07 Thread jolynch
Github user jolynch commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/212#discussion_r231688792
  
--- Diff: 
src/java/org/apache/cassandra/net/StartupClusterConnectivityChecker.java ---
@@ -48,81 +51,133 @@
 {
 private static final Logger logger = 
LoggerFactory.getLogger(StartupClusterConnectivityChecker.class);
 
-private final int targetPercent;
+private final boolean blockForRemoteDcs;
 private final long timeoutNanos;
 
-public static StartupClusterConnectivityChecker create(int 
targetPercent, int timeoutSecs)
+public static StartupClusterConnectivityChecker create(long 
timeoutSecs, boolean blockForRemoteDcs)
 {
-timeoutSecs = Math.max(1, timeoutSecs);
+if (timeoutSecs < 0)
+logger.warn("skipping block-for-peers due to negative timeout. 
You may encounter errors or timeouts on" +
+" the first user query");
 if (timeoutSecs > 100)
 logger.warn("setting the block-for-peers timeout (in seconds) 
to {} might be a bit excessive, but using it nonetheless", timeoutSecs);
 long timeoutNanos = TimeUnit.SECONDS.toNanos(timeoutSecs);
 
-return new StartupClusterConnectivityChecker(targetPercent, 
timeoutNanos);
+return new StartupClusterConnectivityChecker(timeoutNanos, 
blockForRemoteDcs);
 }
 
 @VisibleForTesting
-StartupClusterConnectivityChecker(int targetPercent, long timeoutNanos)
+StartupClusterConnectivityChecker(long timeoutNanos, boolean 
blockForRemoteDcs)
 {
-this.targetPercent = Math.min(100, Math.max(0, targetPercent));
+this.blockForRemoteDcs = blockForRemoteDcs;
 this.timeoutNanos = timeoutNanos;
 }
 
 /**
  * @param peers The currently known peers in the cluster; argument is 
not modified.
+ * @param getDatacenterSource A function for mapping peers to their 
datacenter.
  * @return true if the requested percentage of peers are marked ALIVE 
in gossip and have their connections opened;
  * else false.
  */
-public boolean execute(Set peers)
+public boolean execute(Set peers, 
Function getDatacenterSource)
 {
-if (targetPercent == 0 || peers == null)
+if (peers == null || this.timeoutNanos < 0)
 return true;
 
 // make a copy of the set, to avoid mucking with the input (in 
case it's a sensitive collection)
 peers = new HashSet<>(peers);
-peers.remove(FBUtilities.getBroadcastAddressAndPort());
+InetAddressAndPort localAddress = 
FBUtilities.getBroadcastAddressAndPort();
+String localDc = getDatacenterSource.apply(localAddress);
 
+peers.remove(localAddress);
 if (peers.isEmpty())
 return true;
 
-logger.info("choosing to block until {}% of the {} known peers are 
marked alive and connections are established; max time to wait = {} seconds",
-targetPercent, peers.size(), 
TimeUnit.NANOSECONDS.toSeconds(timeoutNanos));
+// make a copy of the datacenter mapping (in case gossip updates 
happen during this method or some such)
+Map datacenterMap = peers.stream()
+ 
.collect(Collectors.toMap(k -> k, getDatacenterSource));
+Function getDatacenter = 
datacenterMap::get;
 
-long startNanos = System.nanoTime();
+Map> peersByDc = peers.stream()
--- End diff --

Sure, done.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #212: Rework the wait for healthy logic to count down...

2018-11-07 Thread jolynch
Github user jolynch commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/212#discussion_r231688756
  
--- Diff: 
src/java/org/apache/cassandra/net/StartupClusterConnectivityChecker.java ---
@@ -48,81 +51,133 @@
 {
 private static final Logger logger = 
LoggerFactory.getLogger(StartupClusterConnectivityChecker.class);
 
-private final int targetPercent;
+private final boolean blockForRemoteDcs;
 private final long timeoutNanos;
 
-public static StartupClusterConnectivityChecker create(int 
targetPercent, int timeoutSecs)
+public static StartupClusterConnectivityChecker create(long 
timeoutSecs, boolean blockForRemoteDcs)
 {
-timeoutSecs = Math.max(1, timeoutSecs);
+if (timeoutSecs < 0)
+logger.warn("skipping block-for-peers due to negative timeout. 
You may encounter errors or timeouts on" +
+" the first user query");
 if (timeoutSecs > 100)
 logger.warn("setting the block-for-peers timeout (in seconds) 
to {} might be a bit excessive, but using it nonetheless", timeoutSecs);
 long timeoutNanos = TimeUnit.SECONDS.toNanos(timeoutSecs);
 
-return new StartupClusterConnectivityChecker(targetPercent, 
timeoutNanos);
+return new StartupClusterConnectivityChecker(timeoutNanos, 
blockForRemoteDcs);
 }
 
 @VisibleForTesting
-StartupClusterConnectivityChecker(int targetPercent, long timeoutNanos)
+StartupClusterConnectivityChecker(long timeoutNanos, boolean 
blockForRemoteDcs)
 {
-this.targetPercent = Math.min(100, Math.max(0, targetPercent));
+this.blockForRemoteDcs = blockForRemoteDcs;
 this.timeoutNanos = timeoutNanos;
 }
 
 /**
  * @param peers The currently known peers in the cluster; argument is 
not modified.
+ * @param getDatacenterSource A function for mapping peers to their 
datacenter.
  * @return true if the requested percentage of peers are marked ALIVE 
in gossip and have their connections opened;
  * else false.
  */
-public boolean execute(Set peers)
+public boolean execute(Set peers, 
Function getDatacenterSource)
 {
-if (targetPercent == 0 || peers == null)
+if (peers == null || this.timeoutNanos < 0)
 return true;
 
 // make a copy of the set, to avoid mucking with the input (in 
case it's a sensitive collection)
 peers = new HashSet<>(peers);
-peers.remove(FBUtilities.getBroadcastAddressAndPort());
+InetAddressAndPort localAddress = 
FBUtilities.getBroadcastAddressAndPort();
+String localDc = getDatacenterSource.apply(localAddress);
 
+peers.remove(localAddress);
 if (peers.isEmpty())
 return true;
 
-logger.info("choosing to block until {}% of the {} known peers are 
marked alive and connections are established; max time to wait = {} seconds",
-targetPercent, peers.size(), 
TimeUnit.NANOSECONDS.toSeconds(timeoutNanos));
+// make a copy of the datacenter mapping (in case gossip updates 
happen during this method or some such)
+Map datacenterMap = peers.stream()
--- End diff --

Sure thing, makes sense and I've made both refactors.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #212: Rework the wait for healthy logic to count down...

2018-11-07 Thread jolynch
Github user jolynch commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/212#discussion_r231681120
  
--- Diff: 
src/java/org/apache/cassandra/net/StartupClusterConnectivityChecker.java ---
@@ -48,81 +51,133 @@
 {
 private static final Logger logger = 
LoggerFactory.getLogger(StartupClusterConnectivityChecker.class);
 
-private final int targetPercent;
+private final boolean blockForRemoteDcs;
 private final long timeoutNanos;
 
-public static StartupClusterConnectivityChecker create(int 
targetPercent, int timeoutSecs)
+public static StartupClusterConnectivityChecker create(long 
timeoutSecs, boolean blockForRemoteDcs)
 {
-timeoutSecs = Math.max(1, timeoutSecs);
+if (timeoutSecs < 0)
+logger.warn("skipping block-for-peers due to negative timeout. 
You may encounter errors or timeouts on" +
--- End diff --

I thought it should be for the same reason we warn them if they set the 
timeout to more than 100, but I don't care too much. I removed it.


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #212: Rework the wait for healthy logic to count down...

2018-11-07 Thread jolynch
Github user jolynch commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/212#discussion_r231680735
  
--- Diff: src/java/org/apache/cassandra/config/Config.java ---
@@ -380,9 +380,23 @@
 public RepairCommandPoolFullStrategy repair_command_pool_full_strategy 
= RepairCommandPoolFullStrategy.queue;
 public int repair_command_pool_size = concurrent_validations;
 
-// parameters to adjust how much to delay startup until a certain 
amount of the cluster is connect to and marked alive
-public int block_for_peers_percentage = 70;
+/**
+ * When a node first starts up it intially considers all other peers 
as DOWN, and then as the initial gossip
+ * broadcast messages comes back nodes transition to UP. These options 
configure how long we wait for peers to
+ * connect before we make this node available as a coordinator. 
Furthermore, if this feature is enabled
+ * (timeout >= 0) Cassandra initiates the non gossip channel internode 
connections on startup as well and waits
--- End diff --

Ok, I reworded it. I think that having an escape hatch makes sense, what do 
you think? 


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #277: 12106 - blacklisting bad partitions for point r...

2018-11-05 Thread jolynch
Github user jolynch commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/277#discussion_r230865548
  
--- Diff: src/java/org/apache/cassandra/service/CassandraDaemon.java ---
@@ -426,6 +426,9 @@ public void uncaughtException(Thread t, Throwable e)
 // Native transport
 nativeTransportService = new NativeTransportService();
 
+// load blacklisted partitions into cache
+StorageService.instance.refreshBlacklistedPartitionsCache();
--- End diff --

During a cluster upgrade, I'm not sure that we can assume this call will 
succeed. I'm not entirely sure how that works tbh with system_distributed 
tables. I guess that the first node of the new version will create the table 
maybe?


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #277: 12106 - blacklisting bad partitions for point r...

2018-11-05 Thread jolynch
Github user jolynch commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/277#discussion_r230865112
  
--- Diff: 
src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java ---
@@ -307,6 +320,55 @@ public static void setViewRemoved(String keyspaceName, 
String viewName)
 forceBlockingFlush(VIEW_BUILD_STATUS);
 }
 
+/**
+ * Reads blacklisted partitions from 
system_distributed.blacklisted_partitions table.
+ * Stops reading partitions upon exceeding the cache size limit by 
logging a warning.
+ * @return
+ */
+public static Set getBlacklistedPartitions()
+{
+String query = "SELECT keyspace_name, columnfamily_name, 
partition_key FROM %s.%s";
+UntypedResultSet results;
+try
+{
+results = QueryProcessor.execute(format(query, 
SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, BLACKLISTED_PARTITIONS),
--- End diff --

I think there is a local execution with pagination option (which should 
avoid oom right)?


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #277: 12106 - blacklisting bad partitions for point r...

2018-11-05 Thread jolynch
Github user jolynch commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/277#discussion_r230864957
  
--- Diff: src/java/org/apache/cassandra/cache/BlacklistedPartition.java ---
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cache;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.schema.TableMetadataRef;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.ObjectSizes;
+
+/**
+ * Class to represent a blacklisted partition
+ */
+public class BlacklistedPartition implements IMeasurableMemory
--- End diff --

Hm, not sure I agree since this is in the same package, but I don't really 
care :-)


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #277: 12106 - blacklisting bad partitions for point r...

2018-11-05 Thread jolynch
Github user jolynch commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/277#discussion_r230864731
  
--- Diff: doc/source/operating/blacklisting_partitions.rst ---
@@ -0,0 +1,63 @@
+.. Licensed to the Apache Software Foundation (ASF) under one
+.. or more contributor license agreements.  See the NOTICE file
+.. distributed with this work for additional information
+.. regarding copyright ownership.  The ASF licenses this file
+.. to you under the Apache License, Version 2.0 (the
+.. "License"); you may not use this file except in compliance
+.. with the License.  You may obtain a copy of the License at
+..
+.. http://www.apache.org/licenses/LICENSE-2.0
+..
+.. Unless required by applicable law or agreed to in writing, software
+.. distributed under the License is distributed on an "AS IS" BASIS,
+.. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+.. See the License for the specific language governing permissions and
+.. limitations under the License.
+
+.. highlight:: none
+
+
+Blacklisting Partitions (CASSANDRA-12106)
+-
+
+This feature allows partition keys to be blacklisted i.e. Cassandra would 
not process following operations on those
+blacklisted partitions.
+
+- Point READs
+
+Response would be InvalidQueryException.
+
+It is important to note that, this would not have any effect on range 
reads or write operations.
+
+How to blacklist a partition key
+
+``system_distributed.blacklisted_partitions`` table can be used to 
blacklist partitions. Essentially, you will have to
+insert a new row into the table with the following details:
+
+- Keyspace name
+- Table name
+- Partition key
+
+The way partition key needs to be mentioned is exactly similar to how 
``nodetool getendpoints`` works.
+Following are several examples for blacklisting partition keys in 
keyspace, say ks, and table, say table1 for different
+possibilities of table1 partition key:
+- For a single column partition key, say Id,
+  Id is a simple type - insert into 
system_distributed.blacklisted_partitions (keyspace_name ,columnfamily_name 
,partition_key) VALUES ('ks','table1','1');
+  Id is a blob- insert into 
system_distributed.blacklisted_partitions (keyspace_name ,columnfamily_name 
,partition_key) VALUES ('ks','table1','12345f');
+  Id has a colon  - insert into 
system_distributed.blacklisted_partitions (keyspace_name ,columnfamily_name 
,partition_key) VALUES ('ks','table1','1\:2');
+
+- For a composite column partition key, say (Key1, Key2),
+  insert into system_distributed.blacklisted_partitions (keyspace_name 
,columnfamily_name, partition_key) VALUES ('ks','table1','k11:k21');
+
+BlacklistedPartitions Cache
+^^^
+Cassandra internally maintains an on-heap cache that loads partition keys 
from ``system_distributed.blacklisted_partitions``.
+Any partition key mentioned against a non-existent keyspace name and table 
name will not be cached.
+
+Cache gets refreshed in following ways
+- During Cassandra start up
+- Scheduled cache refresh at a default interval of 10 minutes (can be 
overridden using ``blacklisted_partitions_cache_refresh_period_in_sec`` yaml 
property)
+- Using nodetool refreshblacklistedpartitionscache
+
+There is no bound on how much on-heap memory this cache can occupy - so be 
cautious on how many keys you would want to blacklist.
+``blacklisted_partitions_cache_size_warn_threshold_in_mb`` yaml property 
can be used to be notified (via warning logs) if cache size exceeds the set 
threshold.
--- End diff --

The docs have a `cassandra.yaml` section that you can link to to share 
context on configuration options. I was suggesting you could link to that page 
potentially? 
http://cassandra.apache.org/doc/latest/configuration/cassandra_config_file.html


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #277: 12106 - blacklisting bad partitions for point r...

2018-11-03 Thread sumanth-pasupuleti
Github user sumanth-pasupuleti commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/277#discussion_r230548740
  
--- Diff: 
test/unit/org/apache/cassandra/cache/BlacklistedPartitionsCacheTest.java ---
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.cache;
+
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.exceptions.InvalidQueryException;
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.repair.SystemDistributedKeyspace;
+import org.apache.cassandra.schema.SchemaConstants;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+public class BlacklistedPartitionsCacheTest extends CQLTester
--- End diff --

makes sense. Added a test case where reading blacklist fails due to absence 
of blacklisted_partitions table


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #277: 12106 - blacklisting bad partitions for point r...

2018-11-03 Thread sumanth-pasupuleti
Github user sumanth-pasupuleti commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/277#discussion_r230548472
  
--- Diff: 
src/java/org/apache/cassandra/tools/nodetool/BlacklistedPartitions.java ---
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools.nodetool;
+
+import io.airlift.airline.Command;
+import io.airlift.airline.Option;
+import org.apache.cassandra.tools.NodeProbe;
+import org.apache.cassandra.tools.NodeTool;
+
+@Command(name = "blacklistedpartitions", description = "Refresh 
blacklisted partitions cache from system_distributed.blacklisted_partitions 
table")
+public class BlacklistedPartitions extends NodeTool.NodeToolCmd
+{
+@Option(title = "refresh_cache", name = { "--refresh-cache"}, 
description = "Refresh blacklisted partitions cache. Default = false.")
--- End diff --

done


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



[GitHub] cassandra pull request #277: 12106 - blacklisting bad partitions for point r...

2018-11-03 Thread sumanth-pasupuleti
Github user sumanth-pasupuleti commented on a diff in the pull request:

https://github.com/apache/cassandra/pull/277#discussion_r230548454
  
--- Diff: src/java/org/apache/cassandra/service/CassandraDaemon.java ---
@@ -426,6 +426,9 @@ public void uncaughtException(Thread t, Throwable e)
 // Native transport
 nativeTransportService = new NativeTransportService();
 
+// load blacklisted partitions into cache
+StorageService.instance.refreshBlacklistedPartitionsCache();
--- End diff --

can you elaborate please? not having what?


---

-
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org



  1   2   3   4   5   6   7   8   9   10   >