dcapwell commented on code in PR #2804:
URL: https://github.com/apache/cassandra/pull/2804#discussion_r1358770645


##########
src/java/org/apache/cassandra/repair/CommonRange.java:
##########
@@ -21,21 +21,30 @@
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Objects;
 import java.util.Set;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableSet;
 
+import org.apache.cassandra.cache.IMeasurableMemory;
+import org.apache.cassandra.dht.Murmur3Partitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.ObjectSizes;
 
 /**
  * Groups ranges with identical endpoints/transient endpoints
  */
-public class CommonRange
+public class CommonRange implements IMeasurableMemory
 {
+    // REVIEW: I've heard from some that these should be on a single line, 
others prefer them split out into separate lines

Review Comment:
   
https://cassandra.apache.org/_/development/code_style.html#multiline-statements
   
   > Where possible prefer keeping a logical action to a single line. Prefer 
introducing additional variables, or well-named methods encapsulating actions, 
to multi-line statements - unless this harms clarity (e.g. in an already short 
method).
   > Try to keep lines under 120 characters, but use good judgment. It is 
better to exceed this limit, than to split a line that has no natural splitting 
points, particularly when the remainder of the line is boilerplate or easily 
inferred by the reader.



##########
src/java/org/apache/cassandra/utils/ObjectSizes.java:
##########
@@ -47,6 +51,8 @@ public class ObjectSizes
 
     public static final long IPV6_SOCKET_ADDRESS_SIZE = 
ObjectSizes.measureDeep(new InetSocketAddress(getIpvAddress(16), 42));
 
+    private static final long UUID_SIZE = measure(new UUID(0L, 0L));

Review Comment:
   this is dead code



##########
src/java/org/apache/cassandra/repair/state/WeightedHierarchy.java:
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair.state;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.cassandra.cache.IMeasurableMemory;
+
+/**
+ * A pattern for tracking the weight of a hierarchy of references. Registered 
objects are treated as though they are
+ * immutable, and are only measured on registration.
+ *
+ * Internal nodes must maintain a reference to the root, in order to notify it 
of any new registrations that may impact
+ * the total weight of the hierarchy.
+ */
+class WeightedHierarchy
+{
+    interface Node
+    {
+        // How much does this object retain without any of its nested state
+        long independentRetainedSize();
+    }
+
+    interface InternalNode extends Node
+    {
+        Root root();
+
+        default void onNestedStateRegistration(Node nested)
+        {
+            Root root = root();
+            
root.totalNestedRetainedSize().addAndGet(nested.independentRetainedSize());
+            root.onRetainedSizeUpdate();
+        }
+    }
+
+    interface Root extends InternalNode, IMeasurableMemory
+    {
+        AtomicLong totalNestedRetainedSize();

Review Comment:
   this is leaky... why return the `AtomicLong` rather than return `long`?



##########
src/java/org/apache/cassandra/service/ActiveRepairService.java:
##########
@@ -245,16 +250,64 @@ public ActiveRepairService(SharedContext ctx)
                                              .build();
 
         DurationSpec.LongNanosecondsBound duration = getRepairStateExpires();
-        int numElements = getRepairStateSize();
-        logger.info("Storing repair state for {} or for {} elements", 
duration, numElements);
-        repairs = CacheBuilder.newBuilder()
+        DataStorageSpec.IntBytesBound maxRetainedSize = 
getRepairStateHeapSize();
+        Integer numElements = getRepairStateSize();
+        if (numElements != null)
+        {
+            logger.info("Storing repair state for {} or for {} cached 
elements", duration, numElements);
+            repairs = Caffeine.newBuilder()
                               .expireAfterWrite(duration.quantity(), 
duration.unit())
                               .maximumSize(numElements)
+                              .executor(ImmediateExecutor.INSTANCE)
                               .build();
-        participates = CacheBuilder.newBuilder()
+            participates = Caffeine.newBuilder()
                                    .expireAfterWrite(duration.quantity(), 
duration.unit())
                                    .maximumSize(numElements)
+                                   .executor(ImmediateExecutor.INSTANCE)
                                    .build();
+        }
+        else if (maxRetainedSize != null)
+        {
+            logger.info("Storing repair state for {} or for {} retained size", 
duration, maxRetainedSize);
+            repairs = Caffeine.newBuilder()
+                              .expireAfterWrite(duration.quantity(), 
duration.unit())
+                              .weigher(new Weigher<TimeUUID, 
CoordinatorState>()
+                              {
+                                  @Override
+                                  public int weigh(TimeUUID id, 
CoordinatorState coordinatorState)
+                                  {
+                                      // REVIEW: This addition is unchecked - 
I was planning to not check memory accounting
+                                      // additions into long because the 
maximum before overflow is ~thousands of PB, but

Review Comment:
   sounds good to me



##########
src/java/org/apache/cassandra/repair/state/WeightedHierarchy.java:
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair.state;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.cassandra.cache.IMeasurableMemory;
+
+/**
+ * A pattern for tracking the weight of a hierarchy of references. Registered 
objects are treated as though they are
+ * immutable, and are only measured on registration.
+ *
+ * Internal nodes must maintain a reference to the root, in order to notify it 
of any new registrations that may impact
+ * the total weight of the hierarchy.
+ */
+class WeightedHierarchy
+{
+    interface Node
+    {
+        // How much does this object retain without any of its nested state
+        long independentRetainedSize();
+    }
+
+    interface InternalNode extends Node
+    {
+        Root root();
+
+        default void onNestedStateRegistration(Node nested)
+        {
+            Root root = root();
+            
root.totalNestedRetainedSize().addAndGet(nested.independentRetainedSize());
+            root.onRetainedSizeUpdate();
+        }
+    }
+
+    interface Root extends InternalNode, IMeasurableMemory

Review Comment:
   based off the usage, we really don't care about `totalNestedRetainedSize` 
and `independentRetainedSize`... we only care about `unsharedHeapSize`



##########
src/java/org/apache/cassandra/repair/state/WeightedHierarchy.java:
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair.state;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.cassandra.cache.IMeasurableMemory;
+
+/**
+ * A pattern for tracking the weight of a hierarchy of references. Registered 
objects are treated as though they are
+ * immutable, and are only measured on registration.
+ *
+ * Internal nodes must maintain a reference to the root, in order to notify it 
of any new registrations that may impact
+ * the total weight of the hierarchy.
+ */
+class WeightedHierarchy
+{
+    interface Node
+    {
+        // How much does this object retain without any of its nested state
+        long independentRetainedSize();
+    }
+
+    interface InternalNode extends Node
+    {
+        Root root();
+
+        default void onNestedStateRegistration(Node nested)
+        {
+            Root root = root();
+            
root.totalNestedRetainedSize().addAndGet(nested.independentRetainedSize());
+            root.onRetainedSizeUpdate();
+        }
+    }
+
+    interface Root extends InternalNode, IMeasurableMemory
+    {
+        AtomicLong totalNestedRetainedSize();
+
+        default long unsharedHeapSize()

Review Comment:
   ```suggestion
           @Override
           default long unsharedHeapSize()
   ```



##########
src/java/org/apache/cassandra/repair/state/WeightedHierarchy.java:
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair.state;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.cassandra.cache.IMeasurableMemory;
+
+/**
+ * A pattern for tracking the weight of a hierarchy of references. Registered 
objects are treated as though they are
+ * immutable, and are only measured on registration.
+ *
+ * Internal nodes must maintain a reference to the root, in order to notify it 
of any new registrations that may impact
+ * the total weight of the hierarchy.
+ */
+class WeightedHierarchy
+{
+    interface Node
+    {
+        // How much does this object retain without any of its nested state
+        long independentRetainedSize();

Review Comment:
   we spoke before but new readers won't know why you did this vs 
`IMeasurableMemory`... can you document this?



##########
src/java/org/apache/cassandra/service/ActiveRepairService.java:
##########
@@ -245,16 +250,64 @@ public ActiveRepairService(SharedContext ctx)
                                              .build();
 
         DurationSpec.LongNanosecondsBound duration = getRepairStateExpires();
-        int numElements = getRepairStateSize();
-        logger.info("Storing repair state for {} or for {} elements", 
duration, numElements);
-        repairs = CacheBuilder.newBuilder()
+        DataStorageSpec.IntBytesBound maxRetainedSize = 
getRepairStateHeapSize();
+        Integer numElements = getRepairStateSize();
+        if (numElements != null)
+        {
+            logger.info("Storing repair state for {} or for {} cached 
elements", duration, numElements);
+            repairs = Caffeine.newBuilder()
                               .expireAfterWrite(duration.quantity(), 
duration.unit())
                               .maximumSize(numElements)
+                              .executor(ImmediateExecutor.INSTANCE)
                               .build();
-        participates = CacheBuilder.newBuilder()
+            participates = Caffeine.newBuilder()
                                    .expireAfterWrite(duration.quantity(), 
duration.unit())
                                    .maximumSize(numElements)
+                                   .executor(ImmediateExecutor.INSTANCE)
                                    .build();
+        }
+        else if (maxRetainedSize != null)
+        {
+            logger.info("Storing repair state for {} or for {} retained size", 
duration, maxRetainedSize);
+            repairs = Caffeine.newBuilder()
+                              .expireAfterWrite(duration.quantity(), 
duration.unit())
+                              .weigher(new Weigher<TimeUUID, 
CoordinatorState>()
+                              {
+                                  @Override
+                                  public int weigh(TimeUUID id, 
CoordinatorState coordinatorState)
+                                  {
+                                      // REVIEW: This addition is unchecked - 
I was planning to not check memory accounting
+                                      // additions into long because the 
maximum before overflow is ~thousands of PB, but
+                                      // I check any casts into ints (4GB). Is 
that a reasonable policy?
+
+                                      long retained = 
coordinatorState.totalNestedRetainedSize().get() + 
coordinatorState.independentRetainedSize();

Review Comment:
   ```suggestion
                                         long retained = 
coordinatorState.unsharedHeapSize();
   ```



##########
src/java/org/apache/cassandra/repair/state/WeightedHierarchy.java:
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair.state;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.cassandra.cache.IMeasurableMemory;
+
+/**
+ * A pattern for tracking the weight of a hierarchy of references. Registered 
objects are treated as though they are
+ * immutable, and are only measured on registration.
+ *
+ * Internal nodes must maintain a reference to the root, in order to notify it 
of any new registrations that may impact
+ * the total weight of the hierarchy.
+ */
+class WeightedHierarchy

Review Comment:
   really not a fan of this class, feels like its more complex than it really 
needs to be...  why did you go this route vs your original?



##########
src/java/org/apache/cassandra/service/ActiveRepairService.java:
##########
@@ -245,16 +250,64 @@ public ActiveRepairService(SharedContext ctx)
                                              .build();
 
         DurationSpec.LongNanosecondsBound duration = getRepairStateExpires();
-        int numElements = getRepairStateSize();
-        logger.info("Storing repair state for {} or for {} elements", 
duration, numElements);
-        repairs = CacheBuilder.newBuilder()
+        DataStorageSpec.IntBytesBound maxRetainedSize = 
getRepairStateHeapSize();
+        Integer numElements = getRepairStateSize();
+        if (numElements != null)
+        {
+            logger.info("Storing repair state for {} or for {} cached 
elements", duration, numElements);
+            repairs = Caffeine.newBuilder()
                               .expireAfterWrite(duration.quantity(), 
duration.unit())
                               .maximumSize(numElements)
+                              .executor(ImmediateExecutor.INSTANCE)
                               .build();
-        participates = CacheBuilder.newBuilder()
+            participates = Caffeine.newBuilder()
                                    .expireAfterWrite(duration.quantity(), 
duration.unit())
                                    .maximumSize(numElements)
+                                   .executor(ImmediateExecutor.INSTANCE)
                                    .build();
+        }
+        else if (maxRetainedSize != null)
+        {
+            logger.info("Storing repair state for {} or for {} retained size", 
duration, maxRetainedSize);
+            repairs = Caffeine.newBuilder()
+                              .expireAfterWrite(duration.quantity(), 
duration.unit())
+                              .weigher(new Weigher<TimeUUID, 
CoordinatorState>()
+                              {
+                                  @Override
+                                  public int weigh(TimeUUID id, 
CoordinatorState coordinatorState)
+                                  {
+                                      // REVIEW: This addition is unchecked - 
I was planning to not check memory accounting
+                                      // additions into long because the 
maximum before overflow is ~thousands of PB, but
+                                      // I check any casts into ints (4GB). Is 
that a reasonable policy?
+
+                                      long retained = 
coordinatorState.totalNestedRetainedSize().get() + 
coordinatorState.independentRetainedSize();
+                                      int clamped = 
Ints.saturatedCast(retained);
+                                      return Math.max(1, clamped);
+                                  }
+                              })
+                              .maximumWeight(maxRetainedSize.toBytes())
+                              .executor(ImmediateExecutor.INSTANCE)
+                              .build();
+            participates = Caffeine.newBuilder()
+                                   .expireAfterWrite(duration.quantity(), 
duration.unit())
+                                   .weigher(new Weigher<TimeUUID, 
ParticipateState>()
+                                   {
+                                       @Override
+                                       public int weigh(TimeUUID id, 
ParticipateState participateState)
+                                       {
+                                           long retained = 
participateState.totalNestedRetainedSize().get() + 
participateState.independentRetainedSize();

Review Comment:
   ```suggestion
                                              long retained = 
participateState.unsharedHeapSize();
   ```



##########
src/java/org/apache/cassandra/config/Config.java:
##########
@@ -910,7 +910,11 @@ public static void setClientMode(boolean clientMode)
     public volatile Map<StartupCheckType, Map<String, Object>> startup_checks 
= new HashMap<>();
 
     public volatile DurationSpec.LongNanosecondsBound repair_state_expires = 
new DurationSpec.LongNanosecondsBound("3d");
-    public volatile int repair_state_size = 100_000;
+
+    // Only one of repair_state_size and repair_state_heap_size should be set
+    @Deprecated
+    public volatile Integer repair_state_size = null;
+    public volatile DataStorageSpec.IntBytesBound repair_state_heap_size = new 
DataStorageSpec.IntBytesBound(5, DataStorageSpec.DataStorageUnit.MEBIBYTES);

Review Comment:
   spoke in slack, this is a bug as we can not tell if the user set `5mb` or 
not... 
   
   here is the semantic we would want in yamll
   
   ```
   repair_state_size: 42
   ```
   
   we are size based
   
   ```
   repair_state_heap_size: 10mb
   ```
   
   we are heap based
   
   ```
   repair_state_size: 42
   repair_state_heap_size: 10mb
   ```
   
   reject



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to