frankgh commented on code in PR #111: URL: https://github.com/apache/cassandra-sidecar/pull/111#discussion_r1561784788
########## src/main/java/org/apache/cassandra/sidecar/concurrent/TaskExecutorPool.java: ########## @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.concurrent; + +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; + +import io.vertx.core.AsyncResult; +import io.vertx.core.Future; +import io.vertx.core.Handler; +import io.vertx.core.Promise; +import io.vertx.core.Vertx; +import io.vertx.core.WorkerExecutor; +import org.apache.cassandra.sidecar.config.WorkerPoolConfiguration; +import org.apache.cassandra.sidecar.metrics.ResourceMetrics; +import org.apache.cassandra.sidecar.metrics.Timer; + +/** + * A pool of executors backed by {@link WorkerExecutor} and {@link Vertx} + */ +public abstract class TaskExecutorPool implements WorkerExecutor +{ + private final Vertx vertx; + private final WorkerExecutor workerExecutor; + + private TaskExecutorPool(Vertx vertx, WorkerPoolConfiguration config) + { + this.vertx = vertx; + this.workerExecutor = vertx.createSharedWorkerExecutor(config.workerPoolName(), + config.workerPoolSize(), + config.workerMaxExecutionTimeMillis(), + TimeUnit.MILLISECONDS); + } + + /** + * Like {@link #setPeriodic(long, Handler, boolean)} with the handler guaranteed to be executed in a + * worker thread and ordered = false. + * + * @param delay the delay in milliseconds, after which the timer will fire + * @param handler the handler that will be called with the timer ID when the timer fires + * @return the unique identifier for the timer + */ + public long setPeriodic(long delay, Handler<Long> handler) + { + return setPeriodic(delay, handler, false); + } + + /** + * Like {@link Vertx#setPeriodic(long, Handler)} with the handler guaranteed to be executed in a worker thread. + * + * @param delay the delay in milliseconds, after which the timer will fire + * @param handler the handler that will be called with the timer ID when the timer fires + * @param ordered if true and if executeBlocking is called several times on the same context, the executions + * for that context will be executed serially, not in parallel. The periodic scheduled + * executions could be delayed if the prior execution on the same context is taking longer. + * If false then they will be no ordering guarantees + * @return the unique identifier for the timer + */ + public long setPeriodic(long delay, Handler<Long> handler, boolean ordered) + { + return setPeriodic(delay, delay, handler, ordered); + } + + /** + * Set a periodic timer to fire every {@code delay} milliseconds with initial delay, at which point + * {@code handler} will be called with the id of the timer. + * + * @param initialDelay the initial delay in milliseconds + * @param delay the delay in milliseconds, after which the timer will fire + * @param handler the handler that will be called with the timer ID when the timer fires + * @return the unique ID of the timer + */ + public long setPeriodic(long initialDelay, long delay, Handler<Long> handler) + { + return setPeriodic(initialDelay, delay, handler, false); + } + + /** + * Set a periodic timer to fire every {@code delay} milliseconds with initial delay, at which point + * {@code handler} will be called with the id of the timer. + * + * @param initialDelay the initial delay in milliseconds + * @param delay the delay in milliseconds, after which the timer will fire + * @param handler the handler that will be called with the timer ID when the timer fires + * @param ordered if true then executeBlocking is called several times on the same context, the + * executions for that context will be executed serially, not in parallel. if false + * then they will be no ordering guarantees + * @return the unique ID of the timer + */ + public long setPeriodic(long initialDelay, long delay, Handler<Long> handler, boolean ordered) + { + return vertx.setPeriodic(initialDelay, + delay, + id -> workerExecutor.executeBlocking(() -> { + long startTime = System.nanoTime(); + handler.handle(id); + recordTimeTaken(System.nanoTime() - startTime); + return id; + }, ordered)); + } + + /** + * Like {@link #setTimer(long, Handler)} with the handler guaranteed to be executed in a + * worker thread and ordered = false. + * + * @param delay the delay in milliseconds, after which the timer will fire + * @param handler the handler that will be called with the timer ID when the timer fires + * @return the unique identifier for the timer + */ + public long setTimer(long delay, Handler<Long> handler) + { + return vertx.setTimer(delay, id -> + workerExecutor.executeBlocking(promise -> { + long startTime = System.nanoTime(); + handler.handle(id); + recordTimeTaken(System.nanoTime() - startTime); Review Comment: unrelated to the PR, but do we need to complete the `promise` here? ########## src/main/java/org/apache/cassandra/sidecar/metrics/instance/InstanceRestoreMetrics.java: ########## @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.metrics.instance; + +import java.util.Objects; + +import com.codahale.metrics.DefaultSettableGauge; +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.Timer; +import org.apache.cassandra.sidecar.metrics.NamedMetric; + +import static org.apache.cassandra.sidecar.metrics.instance.InstanceMetrics.INSTANCE_PREFIX; + +/** + * {@link InstanceRestoreMetrics} contains metrics to track restore task for a Cassandra instance maintained by Sidecar. + */ +public class InstanceRestoreMetrics +{ + public static final String DOMAIN = INSTANCE_PREFIX + ".restore"; + protected final MetricRegistry metricRegistry; + public final NamedMetric<Timer> sliceCompletionTime; + public final NamedMetric<Timer> sliceImportTime; + public final NamedMetric<Timer> sliceStageTime; + public final NamedMetric<Timer> sliceUnzipTime; + public final NamedMetric<Timer> sliceValidationTime; + public final NamedMetric<DefaultSettableGauge<Integer>> sliceDownloadTimeouts; + public final NamedMetric<DefaultSettableGauge<Integer>> sliceDownloadRetries; + public final NamedMetric<DefaultSettableGauge<Integer>> sliceChecksumMismatches; + public final NamedMetric<DefaultSettableGauge<Integer>> sliceImportQueueLength; + public final NamedMetric<DefaultSettableGauge<Integer>> pendingSliceCount; + public final NamedMetric<DefaultSettableGauge<Long>> dataSSTableComponentSize; + public final NamedMetric<Timer> sliceDownloadTime; + public final NamedMetric<DefaultSettableGauge<Long>> sliceCompressedSizeInBytes; + public final NamedMetric<DefaultSettableGauge<Long>> sliceUncompressedSizeInBytes; + public final NamedMetric<Timer> slowRestoreTaskTime; + + public InstanceRestoreMetrics(MetricRegistry metricRegistry) + { + this.metricRegistry = Objects.requireNonNull(metricRegistry, "Metric registry can not be null"); + + sliceCompletionTime + = NamedMetric.builder(metricRegistry::timer).withDomain(DOMAIN).withName("SliceCompletionTime").build(); + sliceImportTime + = NamedMetric.builder(metricRegistry::timer).withDomain(DOMAIN).withName("SliceImportTime").build(); + sliceStageTime + = NamedMetric.builder(metricRegistry::timer).withDomain(DOMAIN).withName("SliceStageTime").build(); + sliceUnzipTime + = NamedMetric.builder(metricRegistry::timer).withDomain(DOMAIN).withName("SliceUnzipTime").build(); + sliceValidationTime + = NamedMetric.builder(metricRegistry::timer).withDomain(DOMAIN).withName("SliceValidationTime").build(); + sliceDownloadTimeouts + = NamedMetric.builder(name -> metricRegistry.gauge(name, () -> new DefaultSettableGauge<>(0))) + .withDomain(DOMAIN) + .withName("SliceDownloadTimeouts") + .build(); + sliceDownloadRetries + = NamedMetric.builder(name -> metricRegistry.gauge(name, () -> new DefaultSettableGauge<>(0))) + .withDomain(DOMAIN) + .withName("SliceCompletionRetries") Review Comment: The field is called `sliceDownloadRetries`, should the name match the field ? ########## src/main/java/org/apache/cassandra/sidecar/metrics/instance/InstanceRestoreMetrics.java: ########## @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.metrics.instance; + +import java.util.Objects; + +import com.codahale.metrics.DefaultSettableGauge; +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.Timer; +import org.apache.cassandra.sidecar.metrics.NamedMetric; + +import static org.apache.cassandra.sidecar.metrics.instance.InstanceMetrics.INSTANCE_PREFIX; + +/** + * {@link InstanceRestoreMetrics} contains metrics to track restore task for a Cassandra instance maintained by Sidecar. + */ +public class InstanceRestoreMetrics +{ + public static final String DOMAIN = INSTANCE_PREFIX + ".restore"; Review Comment: NIT: for consistency with the uppercasing in other metrics ```suggestion public static final String DOMAIN = INSTANCE_PREFIX + ".Restore"; ``` ########## src/main/java/org/apache/cassandra/sidecar/restore/RestoreProcessor.java: ########## @@ -312,13 +325,17 @@ void decrementActiveSliceCount(RestoreSlice slice) void captureImportQueueLength() { activeSliceCounterPerInstance.forEach((instanceId, counter) -> - stats.captureSliceImportQueueLength(instanceId, counter.get())); + metadataFetcher.instance(instanceId) Review Comment: should capture be specific to the instance we are processing at the moment? i.e. just pass the `slice` as parameter and report it for the individual slice? ########## src/main/java/org/apache/cassandra/sidecar/metrics/instance/SSTableImportMetrics.java: ########## @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.metrics.instance; + +import java.util.Objects; + +import com.codahale.metrics.DefaultSettableGauge; +import com.codahale.metrics.MetricRegistry; +import org.apache.cassandra.sidecar.metrics.NamedMetric; + +import static org.apache.cassandra.sidecar.metrics.instance.InstanceMetrics.INSTANCE_PREFIX; + +/** + * Tracks metrics during SSTable import operation done in Cassandra instance + */ +public class SSTableImportMetrics +{ + public static final String DOMAIN = INSTANCE_PREFIX + ".SSTableImport"; + protected final MetricRegistry metricRegistry; + public final NamedMetric<DefaultSettableGauge<Integer>> cassandraUnavailable; Review Comment: These metrics are a bit confusing. For `cassandraUnavailable` , it tracks on the handler but it doesn't track on the `SSTableImporter`. For `pendingImports`, it tracks on `SSTableImporter` . Should `cassandraUnavailable` track for the unavailable in `SSTableImporter` as well? ########## src/main/java/org/apache/cassandra/sidecar/metrics/instance/UploadSSTableMetrics.java: ########## @@ -55,8 +62,9 @@ public static class UploadSSTableComponentMetrics { protected final MetricRegistry metricRegistry; public final String sstableComponent; - public final NamedMetric<Meter> rateLimitedCalls; - public final NamedMetric<Meter> diskUsageHigh; + public final NamedMetric<DefaultSettableGauge<Integer>> rateLimitedCalls; + public final NamedMetric<DefaultSettableGauge<Integer>> diskUsageHigh; Review Comment: nit ```suggestion public final NamedMetric<DefaultSettableGauge<Integer>> highDiskUsage; ``` ########## src/main/java/org/apache/cassandra/sidecar/metrics/RestoreMetrics.java: ########## @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.metrics; + +import java.util.Objects; + +import com.codahale.metrics.DefaultSettableGauge; +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.Timer; +import com.google.inject.Inject; +import com.google.inject.Singleton; + +/** + * Tracks metrics related to restore functionality provided by Sidecar. + */ +@Singleton +public class RestoreMetrics +{ + public static final String DOMAIN = "sidecar.restore"; Review Comment: NIT ```suggestion public static final String DOMAIN = "Sidecar.Restore"; ``` ########## src/main/java/org/apache/cassandra/sidecar/metrics/Timer.java: ########## @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.cassandra.sidecar.stats; +package org.apache.cassandra.sidecar.metrics; Review Comment: we have some unused methods in this class. Should we clean up? ########## src/main/java/org/apache/cassandra/sidecar/metrics/instance/InstanceRestoreMetrics.java: ########## @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.metrics.instance; + +import java.util.Objects; + +import com.codahale.metrics.DefaultSettableGauge; +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.Timer; +import org.apache.cassandra.sidecar.metrics.NamedMetric; + +import static org.apache.cassandra.sidecar.metrics.instance.InstanceMetrics.INSTANCE_PREFIX; + +/** + * {@link InstanceRestoreMetrics} contains metrics to track restore task for a Cassandra instance maintained by Sidecar. + */ +public class InstanceRestoreMetrics +{ + public static final String DOMAIN = INSTANCE_PREFIX + ".restore"; + protected final MetricRegistry metricRegistry; + public final NamedMetric<Timer> sliceCompletionTime; + public final NamedMetric<Timer> sliceImportTime; + public final NamedMetric<Timer> sliceStageTime; + public final NamedMetric<Timer> sliceUnzipTime; + public final NamedMetric<Timer> sliceValidationTime; + public final NamedMetric<DefaultSettableGauge<Integer>> sliceDownloadTimeouts; + public final NamedMetric<DefaultSettableGauge<Integer>> sliceDownloadRetries; + public final NamedMetric<DefaultSettableGauge<Integer>> sliceChecksumMismatches; + public final NamedMetric<DefaultSettableGauge<Integer>> sliceImportQueueLength; + public final NamedMetric<DefaultSettableGauge<Integer>> pendingSliceCount; + public final NamedMetric<DefaultSettableGauge<Long>> dataSSTableComponentSize; + public final NamedMetric<Timer> sliceDownloadTime; + public final NamedMetric<DefaultSettableGauge<Long>> sliceCompressedSizeInBytes; + public final NamedMetric<DefaultSettableGauge<Long>> sliceUncompressedSizeInBytes; + public final NamedMetric<Timer> slowRestoreTaskTime; + + public InstanceRestoreMetrics(MetricRegistry metricRegistry) + { + this.metricRegistry = Objects.requireNonNull(metricRegistry, "Metric registry can not be null"); + + sliceCompletionTime + = NamedMetric.builder(metricRegistry::timer).withDomain(DOMAIN).withName("SliceCompletionTime").build(); + sliceImportTime + = NamedMetric.builder(metricRegistry::timer).withDomain(DOMAIN).withName("SliceImportTime").build(); + sliceStageTime + = NamedMetric.builder(metricRegistry::timer).withDomain(DOMAIN).withName("SliceStageTime").build(); + sliceUnzipTime + = NamedMetric.builder(metricRegistry::timer).withDomain(DOMAIN).withName("SliceUnzipTime").build(); + sliceValidationTime + = NamedMetric.builder(metricRegistry::timer).withDomain(DOMAIN).withName("SliceValidationTime").build(); + sliceDownloadTimeouts + = NamedMetric.builder(name -> metricRegistry.gauge(name, () -> new DefaultSettableGauge<>(0))) + .withDomain(DOMAIN) + .withName("SliceDownloadTimeouts") + .build(); + sliceDownloadRetries + = NamedMetric.builder(name -> metricRegistry.gauge(name, () -> new DefaultSettableGauge<>(0))) + .withDomain(DOMAIN) + .withName("SliceCompletionRetries") + .build(); + sliceChecksumMismatches + = NamedMetric.builder(name -> metricRegistry.gauge(name, () -> new DefaultSettableGauge<>(0))) + .withDomain(DOMAIN) + .withName("SliceChecksumMismatches") + .build(); + sliceImportQueueLength + = NamedMetric.builder(name -> metricRegistry.gauge(name, () -> new DefaultSettableGauge<>(0))) + .withDomain(DOMAIN) + .withName("SliceImportQueueLength") + .build(); + pendingSliceCount + = NamedMetric.builder(name -> metricRegistry.gauge(name, () -> new DefaultSettableGauge<>(0))) + .withDomain(DOMAIN) + .withName("PendingSliceCount") + .build(); + dataSSTableComponentSize + = NamedMetric.builder(name -> metricRegistry.gauge(name, () -> new DefaultSettableGauge<>(0L))) + .withDomain(DOMAIN) + .withName("RestoreDataSizeBytes") + .addTag("Component", "db") Review Comment: Should this be Data.db instead? db is not unique for the data file ########## src/main/java/org/apache/cassandra/sidecar/config/yaml/VertxMetricsConfigurationImpl.java: ########## @@ -32,34 +29,28 @@ public class VertxMetricsConfigurationImpl implements VertxMetricsConfiguration public static final boolean DEFAULT_ENABLED = true; public static final boolean DEFAULT_EXPOSE_VIA_JMX = false; public static final String DEFAULT_JMX_DOMAIN_NAME = "sidecar.vertx.jmx_domain"; - public static final List<String> DEFAULT_MONITORED_SERVER_ROUTE_REGEXES = Collections.singletonList("/api/v1/.*"); Review Comment: where will this go? ########## src/main/java/org/apache/cassandra/sidecar/metrics/instance/SSTableImportMetrics.java: ########## @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.metrics.instance; + +import java.util.Objects; + +import com.codahale.metrics.DefaultSettableGauge; +import com.codahale.metrics.MetricRegistry; +import org.apache.cassandra.sidecar.metrics.NamedMetric; + +import static org.apache.cassandra.sidecar.metrics.instance.InstanceMetrics.INSTANCE_PREFIX; + +/** + * Tracks metrics during SSTable import operation done in Cassandra instance + */ +public class SSTableImportMetrics +{ + public static final String DOMAIN = INSTANCE_PREFIX + ".SSTableImport"; + protected final MetricRegistry metricRegistry; + public final NamedMetric<DefaultSettableGauge<Integer>> cassandraUnavailable; + public final NamedMetric<DefaultSettableGauge<Integer>> pendingImports; Review Comment: I think it makes sense to track successful imports as well, what do you think? ########## src/main/java/org/apache/cassandra/sidecar/restore/RestoreProcessor.java: ########## @@ -141,21 +146,26 @@ public void execute(Promise<Void> promise) sliceQueue.captureImportQueueLength(); RestoreSliceHandler task = slice.toAsyncTask(s3ClientPool, pool, importer, requiredUsableSpacePercentage, - sliceDatabaseAccessor, stats, - restoreJobUtil); + sliceDatabaseAccessor, + restoreJobUtil, + restoreMetrics); activeTasks.add(task); pool.executeBlocking(task, false) // unordered; run in parallel .onSuccess(restoreSlice -> { int instanceId = slice.owner().id(); Review Comment: `instanceId` is now unused ########## src/main/java/org/apache/cassandra/sidecar/concurrent/TaskExecutorPool.java: ########## @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.concurrent; + +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; + +import io.vertx.core.AsyncResult; +import io.vertx.core.Future; +import io.vertx.core.Handler; +import io.vertx.core.Promise; +import io.vertx.core.Vertx; +import io.vertx.core.WorkerExecutor; +import org.apache.cassandra.sidecar.config.WorkerPoolConfiguration; +import org.apache.cassandra.sidecar.metrics.ResourceMetrics; +import org.apache.cassandra.sidecar.metrics.Timer; + +/** + * A pool of executors backed by {@link WorkerExecutor} and {@link Vertx} + */ +public abstract class TaskExecutorPool implements WorkerExecutor +{ + private final Vertx vertx; + private final WorkerExecutor workerExecutor; + + private TaskExecutorPool(Vertx vertx, WorkerPoolConfiguration config) + { + this.vertx = vertx; + this.workerExecutor = vertx.createSharedWorkerExecutor(config.workerPoolName(), + config.workerPoolSize(), + config.workerMaxExecutionTimeMillis(), + TimeUnit.MILLISECONDS); + } + + /** + * Like {@link #setPeriodic(long, Handler, boolean)} with the handler guaranteed to be executed in a + * worker thread and ordered = false. + * + * @param delay the delay in milliseconds, after which the timer will fire + * @param handler the handler that will be called with the timer ID when the timer fires + * @return the unique identifier for the timer + */ + public long setPeriodic(long delay, Handler<Long> handler) + { + return setPeriodic(delay, handler, false); + } + + /** + * Like {@link Vertx#setPeriodic(long, Handler)} with the handler guaranteed to be executed in a worker thread. + * + * @param delay the delay in milliseconds, after which the timer will fire + * @param handler the handler that will be called with the timer ID when the timer fires + * @param ordered if true and if executeBlocking is called several times on the same context, the executions + * for that context will be executed serially, not in parallel. The periodic scheduled + * executions could be delayed if the prior execution on the same context is taking longer. + * If false then they will be no ordering guarantees + * @return the unique identifier for the timer + */ + public long setPeriodic(long delay, Handler<Long> handler, boolean ordered) + { + return setPeriodic(delay, delay, handler, ordered); + } + + /** + * Set a periodic timer to fire every {@code delay} milliseconds with initial delay, at which point + * {@code handler} will be called with the id of the timer. + * + * @param initialDelay the initial delay in milliseconds + * @param delay the delay in milliseconds, after which the timer will fire + * @param handler the handler that will be called with the timer ID when the timer fires + * @return the unique ID of the timer + */ + public long setPeriodic(long initialDelay, long delay, Handler<Long> handler) + { + return setPeriodic(initialDelay, delay, handler, false); + } + + /** + * Set a periodic timer to fire every {@code delay} milliseconds with initial delay, at which point + * {@code handler} will be called with the id of the timer. + * + * @param initialDelay the initial delay in milliseconds + * @param delay the delay in milliseconds, after which the timer will fire + * @param handler the handler that will be called with the timer ID when the timer fires + * @param ordered if true then executeBlocking is called several times on the same context, the + * executions for that context will be executed serially, not in parallel. if false + * then they will be no ordering guarantees + * @return the unique ID of the timer + */ + public long setPeriodic(long initialDelay, long delay, Handler<Long> handler, boolean ordered) + { + return vertx.setPeriodic(initialDelay, + delay, + id -> workerExecutor.executeBlocking(() -> { + long startTime = System.nanoTime(); + handler.handle(id); + recordTimeTaken(System.nanoTime() - startTime); + return id; + }, ordered)); + } + + /** + * Like {@link #setTimer(long, Handler)} with the handler guaranteed to be executed in a + * worker thread and ordered = false. + * + * @param delay the delay in milliseconds, after which the timer will fire + * @param handler the handler that will be called with the timer ID when the timer fires + * @return the unique identifier for the timer + */ + public long setTimer(long delay, Handler<Long> handler) + { + return vertx.setTimer(delay, id -> + workerExecutor.executeBlocking(promise -> { + long startTime = System.nanoTime(); + handler.handle(id); + recordTimeTaken(System.nanoTime() - startTime); + }, false)); + } + + /** + * Like {@link Vertx#setTimer(long, Handler)} with the handler guaranteed to be executed in a worker thread. + * + * @param delay the delay in milliseconds, after which the timer will fire + * @param handler the handler that will be called with the timer ID when the timer fires + * @param ordered if true and if executeBlocking is called several times on the same context, the executions + * for that context will be executed serially, not in parallel. The periodic scheduled + * executions could be delayed if the prior execution on the same context is taking longer. + * If false then they will be no ordering guarantees + * @return the unique identifier for the timer + */ + public long setTimer(long delay, Handler<Long> handler, boolean ordered) + { + return vertx.setTimer(delay, id -> workerExecutor.executeBlocking(() -> { + long startTime = System.nanoTime(); + handler.handle(id); + recordTimeTaken(System.nanoTime() - startTime); + return id; + }, ordered)); + } + + /** + * Delegate to {@link Vertx#cancelTimer(long)} + * + * @param id The id of the timer to cancel + * @return {@code true} if the timer was successfully cancelled, {@code false} otherwise + */ + public boolean cancelTimer(long id) + { + return vertx.cancelTimer(id); + } + + @Override + public <T> void executeBlocking(Handler<Promise<T>> blockingCodeHandler, + boolean ordered, + Handler<AsyncResult<T>> asyncResultHandler) + { + workerExecutor.executeBlocking(blockingCodeHandler, ordered, asyncResultHandler); + } + + @Override + public <T> Future<T> executeBlocking(Handler<Promise<T>> blockingCodeHandler, + boolean ordered) + { + return Timer.measureTimeTaken(workerExecutor.executeBlocking(blockingCodeHandler, ordered), this::recordTimeTaken); + } + + @Override + public <T> Future<T> executeBlocking(Callable<T> blockingCodeHandler, boolean ordered) + { + return Timer.measureTimeTaken(workerExecutor.executeBlocking(blockingCodeHandler, ordered), this::recordTimeTaken); + } + + @Override + public <T> void executeBlocking(Handler<Promise<T>> blockingCodeHandler, + Handler<AsyncResult<T>> asyncResultHandler) + { + workerExecutor.executeBlocking(blockingCodeHandler, asyncResultHandler); + } + + @Override + public <T> Future<T> executeBlocking(Handler<Promise<T>> blockingCodeHandler) + { + return Timer.measureTimeTaken(workerExecutor.executeBlocking(blockingCodeHandler), this::recordTimeTaken); + } + + /** + * Records time taken for tasks executed by {@link TaskExecutorPool} + * @param duration time taken by a task + */ + protected abstract void recordTimeTaken(long duration); Review Comment: ```suggestion protected abstract void recordTimeTaken(long durationNanos); ``` ########## src/main/java/org/apache/cassandra/sidecar/utils/SSTableImporter.java: ########## @@ -159,16 +159,45 @@ protected ImportQueue initializeQueue(String key) */ private void processPendingImports(Long timerId) { - for (ImportQueue queue : importQueuePerHost.values()) + for (Map.Entry<String, ImportQueue> entry : importQueuePerHost.entrySet()) { + ImportQueue queue = entry.getValue(); if (!queue.isEmpty()) { + recordPendingImports(hostFromKey(entry.getKey()), entry.getValue().size()); executorPools.internal() .executeBlocking(p -> maybeDrainImportQueue(queue)); } } } + private void recordPendingImports(String host, int pendingImports) + { + if (host != null) + { + metadataFetcher.instance(host) + .metrics() + .sstableImport().pendingImports.metric.setValue(pendingImports); + } + } + + /** + * Host is extracted from key for publishing Cassandra instance specific metrics related to {@link ImportQueue} + * + * @param key a key for one of the import queues + * @return host for which the import queue is maintained + */ + protected String hostFromKey(String key) + { + String[] keyParts = key.split("\\$"); Review Comment: do we really want to parse the key here? is there a better way to do this? ########## src/main/java/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableUploadHandler.java: ########## @@ -127,7 +123,7 @@ public void handleInternal(RoutingContext context, if (!limiter.tryAcquire()) { String message = String.format("Concurrent upload limit (%d) exceeded", limiter.limit()); - componentMetrics.rateLimitedCalls.metric.mark(); + componentMetrics.rateLimitedCalls.metric.setValue(1); Review Comment: The component metrics seem wrong or misleading. When I think of components I think of SSTable Components. https://github.com/apache/cassandra/blob/trunk/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java#L206 But you are really tracking by extension, which is different than component. I think we should either rename the metric, or if we really want to track SSTable components, we should properly determine the SSTable component. ########## src/main/java/org/apache/cassandra/sidecar/metrics/instance/UploadSSTableMetrics.java: ########## @@ -55,8 +62,9 @@ public static class UploadSSTableComponentMetrics { protected final MetricRegistry metricRegistry; public final String sstableComponent; - public final NamedMetric<Meter> rateLimitedCalls; - public final NamedMetric<Meter> diskUsageHigh; + public final NamedMetric<DefaultSettableGauge<Integer>> rateLimitedCalls; + public final NamedMetric<DefaultSettableGauge<Integer>> diskUsageHigh; Review Comment: Why is the disk usage associated to the SSTable component? Wouldn't this be a metric that tracks the staging directory instead? ########## src/main/java/org/apache/cassandra/sidecar/concurrent/TaskExecutorPool.java: ########## @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.concurrent; + +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; + +import io.vertx.core.AsyncResult; +import io.vertx.core.Future; +import io.vertx.core.Handler; +import io.vertx.core.Promise; +import io.vertx.core.Vertx; +import io.vertx.core.WorkerExecutor; +import org.apache.cassandra.sidecar.config.WorkerPoolConfiguration; +import org.apache.cassandra.sidecar.metrics.ResourceMetrics; +import org.apache.cassandra.sidecar.metrics.Timer; + +/** + * A pool of executors backed by {@link WorkerExecutor} and {@link Vertx} + */ +public abstract class TaskExecutorPool implements WorkerExecutor +{ + private final Vertx vertx; + private final WorkerExecutor workerExecutor; + + private TaskExecutorPool(Vertx vertx, WorkerPoolConfiguration config) + { + this.vertx = vertx; + this.workerExecutor = vertx.createSharedWorkerExecutor(config.workerPoolName(), + config.workerPoolSize(), + config.workerMaxExecutionTimeMillis(), + TimeUnit.MILLISECONDS); + } + + /** + * Like {@link #setPeriodic(long, Handler, boolean)} with the handler guaranteed to be executed in a + * worker thread and ordered = false. + * + * @param delay the delay in milliseconds, after which the timer will fire + * @param handler the handler that will be called with the timer ID when the timer fires + * @return the unique identifier for the timer + */ + public long setPeriodic(long delay, Handler<Long> handler) + { + return setPeriodic(delay, handler, false); + } + + /** + * Like {@link Vertx#setPeriodic(long, Handler)} with the handler guaranteed to be executed in a worker thread. + * + * @param delay the delay in milliseconds, after which the timer will fire + * @param handler the handler that will be called with the timer ID when the timer fires + * @param ordered if true and if executeBlocking is called several times on the same context, the executions + * for that context will be executed serially, not in parallel. The periodic scheduled + * executions could be delayed if the prior execution on the same context is taking longer. + * If false then they will be no ordering guarantees + * @return the unique identifier for the timer + */ + public long setPeriodic(long delay, Handler<Long> handler, boolean ordered) + { + return setPeriodic(delay, delay, handler, ordered); + } + + /** + * Set a periodic timer to fire every {@code delay} milliseconds with initial delay, at which point + * {@code handler} will be called with the id of the timer. + * + * @param initialDelay the initial delay in milliseconds + * @param delay the delay in milliseconds, after which the timer will fire + * @param handler the handler that will be called with the timer ID when the timer fires + * @return the unique ID of the timer + */ + public long setPeriodic(long initialDelay, long delay, Handler<Long> handler) + { + return setPeriodic(initialDelay, delay, handler, false); + } + + /** + * Set a periodic timer to fire every {@code delay} milliseconds with initial delay, at which point + * {@code handler} will be called with the id of the timer. + * + * @param initialDelay the initial delay in milliseconds + * @param delay the delay in milliseconds, after which the timer will fire + * @param handler the handler that will be called with the timer ID when the timer fires + * @param ordered if true then executeBlocking is called several times on the same context, the + * executions for that context will be executed serially, not in parallel. if false + * then they will be no ordering guarantees + * @return the unique ID of the timer + */ + public long setPeriodic(long initialDelay, long delay, Handler<Long> handler, boolean ordered) + { + return vertx.setPeriodic(initialDelay, + delay, + id -> workerExecutor.executeBlocking(() -> { + long startTime = System.nanoTime(); + handler.handle(id); + recordTimeTaken(System.nanoTime() - startTime); + return id; + }, ordered)); + } + + /** + * Like {@link #setTimer(long, Handler)} with the handler guaranteed to be executed in a + * worker thread and ordered = false. + * + * @param delay the delay in milliseconds, after which the timer will fire + * @param handler the handler that will be called with the timer ID when the timer fires + * @return the unique identifier for the timer + */ + public long setTimer(long delay, Handler<Long> handler) + { + return vertx.setTimer(delay, id -> + workerExecutor.executeBlocking(promise -> { + long startTime = System.nanoTime(); + handler.handle(id); + recordTimeTaken(System.nanoTime() - startTime); + }, false)); + } + + /** + * Like {@link Vertx#setTimer(long, Handler)} with the handler guaranteed to be executed in a worker thread. + * + * @param delay the delay in milliseconds, after which the timer will fire + * @param handler the handler that will be called with the timer ID when the timer fires + * @param ordered if true and if executeBlocking is called several times on the same context, the executions + * for that context will be executed serially, not in parallel. The periodic scheduled + * executions could be delayed if the prior execution on the same context is taking longer. + * If false then they will be no ordering guarantees + * @return the unique identifier for the timer + */ + public long setTimer(long delay, Handler<Long> handler, boolean ordered) + { + return vertx.setTimer(delay, id -> workerExecutor.executeBlocking(() -> { + long startTime = System.nanoTime(); + handler.handle(id); + recordTimeTaken(System.nanoTime() - startTime); + return id; + }, ordered)); + } + + /** + * Delegate to {@link Vertx#cancelTimer(long)} + * + * @param id The id of the timer to cancel + * @return {@code true} if the timer was successfully cancelled, {@code false} otherwise + */ + public boolean cancelTimer(long id) + { + return vertx.cancelTimer(id); + } + + @Override + public <T> void executeBlocking(Handler<Promise<T>> blockingCodeHandler, + boolean ordered, + Handler<AsyncResult<T>> asyncResultHandler) + { + workerExecutor.executeBlocking(blockingCodeHandler, ordered, asyncResultHandler); + } + + @Override + public <T> Future<T> executeBlocking(Handler<Promise<T>> blockingCodeHandler, + boolean ordered) + { + return Timer.measureTimeTaken(workerExecutor.executeBlocking(blockingCodeHandler, ordered), this::recordTimeTaken); + } + + @Override + public <T> Future<T> executeBlocking(Callable<T> blockingCodeHandler, boolean ordered) + { + return Timer.measureTimeTaken(workerExecutor.executeBlocking(blockingCodeHandler, ordered), this::recordTimeTaken); + } + + @Override + public <T> void executeBlocking(Handler<Promise<T>> blockingCodeHandler, + Handler<AsyncResult<T>> asyncResultHandler) + { + workerExecutor.executeBlocking(blockingCodeHandler, asyncResultHandler); + } + + @Override + public <T> Future<T> executeBlocking(Handler<Promise<T>> blockingCodeHandler) + { + return Timer.measureTimeTaken(workerExecutor.executeBlocking(blockingCodeHandler), this::recordTimeTaken); + } + + /** + * Records time taken for tasks executed by {@link TaskExecutorPool} + * @param duration time taken by a task Review Comment: we are missing the units here ########## src/main/java/org/apache/cassandra/sidecar/concurrent/TaskExecutorPool.java: ########## @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.concurrent; + +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; + +import io.vertx.core.AsyncResult; +import io.vertx.core.Future; +import io.vertx.core.Handler; +import io.vertx.core.Promise; +import io.vertx.core.Vertx; +import io.vertx.core.WorkerExecutor; +import org.apache.cassandra.sidecar.config.WorkerPoolConfiguration; +import org.apache.cassandra.sidecar.metrics.ResourceMetrics; +import org.apache.cassandra.sidecar.metrics.Timer; + +/** + * A pool of executors backed by {@link WorkerExecutor} and {@link Vertx} + */ +public abstract class TaskExecutorPool implements WorkerExecutor +{ + private final Vertx vertx; + private final WorkerExecutor workerExecutor; + + private TaskExecutorPool(Vertx vertx, WorkerPoolConfiguration config) + { + this.vertx = vertx; + this.workerExecutor = vertx.createSharedWorkerExecutor(config.workerPoolName(), + config.workerPoolSize(), + config.workerMaxExecutionTimeMillis(), + TimeUnit.MILLISECONDS); + } + + /** + * Like {@link #setPeriodic(long, Handler, boolean)} with the handler guaranteed to be executed in a + * worker thread and ordered = false. + * + * @param delay the delay in milliseconds, after which the timer will fire + * @param handler the handler that will be called with the timer ID when the timer fires + * @return the unique identifier for the timer + */ + public long setPeriodic(long delay, Handler<Long> handler) + { + return setPeriodic(delay, handler, false); + } + + /** + * Like {@link Vertx#setPeriodic(long, Handler)} with the handler guaranteed to be executed in a worker thread. + * + * @param delay the delay in milliseconds, after which the timer will fire + * @param handler the handler that will be called with the timer ID when the timer fires + * @param ordered if true and if executeBlocking is called several times on the same context, the executions + * for that context will be executed serially, not in parallel. The periodic scheduled + * executions could be delayed if the prior execution on the same context is taking longer. + * If false then they will be no ordering guarantees + * @return the unique identifier for the timer + */ + public long setPeriodic(long delay, Handler<Long> handler, boolean ordered) + { + return setPeriodic(delay, delay, handler, ordered); + } + + /** + * Set a periodic timer to fire every {@code delay} milliseconds with initial delay, at which point + * {@code handler} will be called with the id of the timer. + * + * @param initialDelay the initial delay in milliseconds + * @param delay the delay in milliseconds, after which the timer will fire + * @param handler the handler that will be called with the timer ID when the timer fires + * @return the unique ID of the timer + */ + public long setPeriodic(long initialDelay, long delay, Handler<Long> handler) + { + return setPeriodic(initialDelay, delay, handler, false); + } + + /** + * Set a periodic timer to fire every {@code delay} milliseconds with initial delay, at which point + * {@code handler} will be called with the id of the timer. + * + * @param initialDelay the initial delay in milliseconds + * @param delay the delay in milliseconds, after which the timer will fire + * @param handler the handler that will be called with the timer ID when the timer fires + * @param ordered if true then executeBlocking is called several times on the same context, the + * executions for that context will be executed serially, not in parallel. if false + * then they will be no ordering guarantees + * @return the unique ID of the timer + */ + public long setPeriodic(long initialDelay, long delay, Handler<Long> handler, boolean ordered) + { + return vertx.setPeriodic(initialDelay, + delay, + id -> workerExecutor.executeBlocking(() -> { + long startTime = System.nanoTime(); + handler.handle(id); + recordTimeTaken(System.nanoTime() - startTime); + return id; + }, ordered)); + } + + /** + * Like {@link #setTimer(long, Handler)} with the handler guaranteed to be executed in a + * worker thread and ordered = false. + * + * @param delay the delay in milliseconds, after which the timer will fire + * @param handler the handler that will be called with the timer ID when the timer fires + * @return the unique identifier for the timer + */ + public long setTimer(long delay, Handler<Long> handler) + { + return vertx.setTimer(delay, id -> + workerExecutor.executeBlocking(promise -> { + long startTime = System.nanoTime(); + handler.handle(id); + recordTimeTaken(System.nanoTime() - startTime); + }, false)); + } + + /** + * Like {@link Vertx#setTimer(long, Handler)} with the handler guaranteed to be executed in a worker thread. + * + * @param delay the delay in milliseconds, after which the timer will fire + * @param handler the handler that will be called with the timer ID when the timer fires + * @param ordered if true and if executeBlocking is called several times on the same context, the executions + * for that context will be executed serially, not in parallel. The periodic scheduled + * executions could be delayed if the prior execution on the same context is taking longer. + * If false then they will be no ordering guarantees + * @return the unique identifier for the timer + */ + public long setTimer(long delay, Handler<Long> handler, boolean ordered) + { + return vertx.setTimer(delay, id -> workerExecutor.executeBlocking(() -> { + long startTime = System.nanoTime(); + handler.handle(id); + recordTimeTaken(System.nanoTime() - startTime); + return id; + }, ordered)); + } + + /** + * Delegate to {@link Vertx#cancelTimer(long)} + * + * @param id The id of the timer to cancel + * @return {@code true} if the timer was successfully cancelled, {@code false} otherwise + */ + public boolean cancelTimer(long id) + { + return vertx.cancelTimer(id); + } + + @Override + public <T> void executeBlocking(Handler<Promise<T>> blockingCodeHandler, + boolean ordered, + Handler<AsyncResult<T>> asyncResultHandler) + { + workerExecutor.executeBlocking(blockingCodeHandler, ordered, asyncResultHandler); + } + + @Override + public <T> Future<T> executeBlocking(Handler<Promise<T>> blockingCodeHandler, + boolean ordered) + { + return Timer.measureTimeTaken(workerExecutor.executeBlocking(blockingCodeHandler, ordered), this::recordTimeTaken); + } + + @Override + public <T> Future<T> executeBlocking(Callable<T> blockingCodeHandler, boolean ordered) + { + return Timer.measureTimeTaken(workerExecutor.executeBlocking(blockingCodeHandler, ordered), this::recordTimeTaken); + } + + @Override + public <T> void executeBlocking(Handler<Promise<T>> blockingCodeHandler, + Handler<AsyncResult<T>> asyncResultHandler) + { + workerExecutor.executeBlocking(blockingCodeHandler, asyncResultHandler); + } + + @Override + public <T> Future<T> executeBlocking(Handler<Promise<T>> blockingCodeHandler) + { + return Timer.measureTimeTaken(workerExecutor.executeBlocking(blockingCodeHandler), this::recordTimeTaken); + } + + /** + * Records time taken for tasks executed by {@link TaskExecutorPool} + * @param duration time taken by a task + */ + protected abstract void recordTimeTaken(long duration); + + @Override + public void close(Handler<AsyncResult<Void>> handler) + { + throw new UnsupportedOperationException("Closing TaskExecutorPool is not supported!"); + } + + @Override + public Future<Void> close() + { + throw new UnsupportedOperationException("Closing TaskExecutorPool is not supported!"); + } + + Future<Void> closeInternal() + { + return workerExecutor == null + ? Future.succeededFuture() + : workerExecutor.close(); + } + + /** + * {@link ServiceTaskExecutorPool} is used for executing tasks that are short lived and not expected to block for + * too long, therefore will free up resources more quickly + */ + static class ServiceTaskExecutorPool extends TaskExecutorPool + { + private final ResourceMetrics metrics; + + ServiceTaskExecutorPool(Vertx vertx, WorkerPoolConfiguration config, ResourceMetrics metrics) + { + super(vertx, config); + this.metrics = metrics; + } + + @Override + protected void recordTimeTaken(long duration) + { + if (metrics == null) Review Comment: will `metrics` ever be null? it doesn't seem like we do null checks for other metrics anywhere else in the code ########## src/main/java/org/apache/cassandra/sidecar/metrics/instance/UploadSSTableMetrics.java: ########## @@ -71,15 +79,21 @@ public UploadSSTableComponentMetrics(MetricRegistry metricRegistry, String sstab NamedMetric.Tag componentTag = NamedMetric.Tag.of("component", sstableComponent); rateLimitedCalls - = NamedMetric.builder(metricRegistry::meter) + = NamedMetric.builder(name -> metricRegistry.gauge(name, () -> new DefaultSettableGauge<>(0))) Review Comment: I'm not sure if rate limited calls are tied to a specific component. Is there a reason it matters to make it component specific? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]

