yifan-c commented on code in PR #72:
URL: https://github.com/apache/cassandra-sidecar/pull/72#discussion_r1367368330
##########
src/main/java/org/apache/cassandra/sidecar/cluster/instance/InstanceMetadata.java:
##########
@@ -20,7 +20,8 @@
import java.util.List;
-import org.apache.cassandra.sidecar.common.CassandraAdapterDelegate;
+import org.apache.cassandra.sidecar.utils.CassandraAdapterDelegate;
Review Comment:
`CassandraAdapterDelegate` does not seem to fit in the `utils` package. In
the existing packages, `cluster` would be a better fit.
##########
src/main/dist/conf/sidecar.yaml:
##########
@@ -68,10 +68,20 @@ sidecar:
port: 9043
request_idle_timeout_millis: 300000 # this field expects integer value
request_timeout_millis: 300000
+ tcp_keep_alive: false
+ accept_backlog: 1024
+ server_verticle_instances: 1
throttle:
stream_requests_per_sec: 5000
delay_sec: 5
timeout_sec: 10
+ traffic_shaping:
+ inbound_global_bandwidth_bps: 0 # 0 implies unthrottled, the
inbound bandwidth in bytes per second
+ outbound_global_bandwidth_bps: 0 # 0 implies unthrottled, the
outbound bandwidth in bytes per second
+ peak_outbound_global_bandwidth_bps: 419430400 # 400 Mbps, the peak
outbound bandwidth in bytes per second
Review Comment:
`Mbps` means megabits per seconds.
The value here is indeed `MBps`.
It is too easy to be confused with the abbreviation, i.e. upper B and lower
b.
I would just put this for the comment
"the peak outbound bandwidth in bytes per second. The default is 400
mebibytes per second"
##########
src/main/java/org/apache/cassandra/sidecar/concurrent/ExecutorPools.java:
##########
@@ -119,9 +120,45 @@ public long setPeriodic(long delay, Handler<Long> handler)
public long setPeriodic(long delay, Handler<Long> handler, boolean
ordered)
{
return vertx.setPeriodic(delay,
- id ->
workerExecutor.executeBlocking(promise -> {
+ id -> workerExecutor.executeBlocking(()
-> {
handler.handle(id);
- promise.complete();
+ return id;
+ }, ordered));
Review Comment:
instead of scheduling. It can just call the other `setPeriodic` overloads.
```
public long setPeriodic(long delay, Handler<Long> handler, boolean
ordered)
{
return setPeriodic(delay, delay, handler, ordered);
}
```
##########
src/main/java/org/apache/cassandra/sidecar/config/TrafficShapingConfiguration.java:
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.config;
+
+import io.netty.handler.traffic.AbstractTrafficShapingHandler;
+import io.netty.handler.traffic.GlobalTrafficShapingHandler;
+
+/**
+ * Configuration for the global traffic shaping options. These TCP server
options enable configuration of
+ * bandwidth limiting. Both inbound and outbound bandwidth can be limited
through these options.
+ */
+public interface TrafficShapingConfiguration
+{
+ /**
+ * Default inbound bandwidth limit in bytes/sec = 0 (0 implies unthrottled)
+ */
+ long DEFAULT_INBOUND_GLOBAL_BANDWIDTH_LIMIT = 0;
Review Comment:
Why defining the constants in the interface? How about moving them to
`TrafficShapingConfigurationImpl`?
##########
src/main/java/org/apache/cassandra/sidecar/config/ServiceConfiguration.java:
##########
@@ -33,11 +36,33 @@ public interface ServiceConfiguration
*/
String host();
+ /**
+ * Returns a list of interfaces where the Sidecar process will bind and
listen for connections. Defaults to
+ * the configured {@link #host()}.
+ *
+ * @return a list of interfaces where Sidecar will listen
+ */
+ default List<String> listenInterfaces()
+ {
+ return Collections.singletonList(Objects.requireNonNull(host(), "host
must be provided"));
+ }
+
/**
* @return Sidecar's HTTP REST API port
*/
int port();
+ /**
+ * Returns a list of ports where the Sidecar process will bind and listen
for connections. Defaults to
+ * the configured {@link #port()}
+ *
+ * @return a list of ports where Sidecar will listen
+ */
+ default List<Integer> ports()
+ {
+ return Collections.singletonList(port());
Review Comment:
The matrix of network interface array and port array is confusing.
For example, for interface array [i1, i2] and port array [8000, 9000], does
it listens to [i1:8000, i1:9000, i2:8000, i2:9000] or [i1:8000, i2:9000]?
For better API clarity, I would just return a list of `SocketAddress`.
##########
src/main/java/org/apache/cassandra/sidecar/config/SslConfiguration.java:
##########
@@ -28,6 +28,29 @@ public interface SslConfiguration
*/
boolean enabled();
+ /**
+ * Returns {@code true} if the OpenSSL engine should be used, {@code
false} otherwise. The OpenSSL engine
+ * will only be enabled if the native libraries for OpenSSL have been
loaded correctly.
+ *
+ * @return {@code true} if the OpenSSL engine should be used, {@code
false} otherwise
Review Comment:
nit:
- duplicated sentences.
- Since configuring `true` does not guarantee OpenSSL is picked, it is only
an intent or preference. How about renaming the config to `preferOpenSSL`?
##########
src/main/java/org/apache/cassandra/sidecar/tasks/HealthCheckPeriodicTask.java:
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.tasks;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import org.apache.cassandra.sidecar.cluster.InstancesConfig;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+
+/**
+ * Periodically checks the health of every instance configured in the {@link
InstancesConfig}.
+ */
+@Singleton
+public class HealthCheckPeriodicTask implements PeriodicTask
+{
+ private static final Logger LOGGER =
LoggerFactory.getLogger(HealthCheckPeriodicTask.class);
+ private final SidecarConfiguration configuration;
+ private final InstancesConfig instancesConfig;
+ private final ExecutorPools.TaskExecutorPool internalPool;
+
+ @Inject
+ public HealthCheckPeriodicTask(SidecarConfiguration configuration,
+ InstancesConfig instancesConfig,
+ ExecutorPools executorPools)
+ {
+ this.configuration = configuration;
+ this.instancesConfig = instancesConfig;
+ internalPool = executorPools.internal();
+ }
+
+ @Override
+ public long initialDelay()
+ {
+ return configuration.healthCheckConfiguration().initialDelayMillis();
+ }
+
+ @Override
+ public long delay()
+ {
+ return configuration.healthCheckConfiguration().checkIntervalMillis();
+ }
+
+ /**
+ * Run health checks on all the configured instances
+ */
+ @Override
+ public void execute()
+ {
+ instancesConfig.instances()
+ .forEach(instanceMetadata ->
internalPool.executeBlocking(promise -> {
+ try
+ {
+ instanceMetadata.delegate().healthCheck();
+ promise.complete();
+ }
+ catch (Throwable cause)
+ {
+ promise.fail(cause);
+ LOGGER.error("Unable to complete health check
on instance={}",
+ instanceMetadata.id(), cause);
+ }
+ }));
Review Comment:
nit: submit with `unordered == true`.
##########
src/main/java/org/apache/cassandra/sidecar/concurrent/ExecutorPools.java:
##########
@@ -119,9 +120,45 @@ public long setPeriodic(long delay, Handler<Long> handler)
public long setPeriodic(long delay, Handler<Long> handler, boolean
ordered)
{
return vertx.setPeriodic(delay,
- id ->
workerExecutor.executeBlocking(promise -> {
+ id -> workerExecutor.executeBlocking(()
-> {
handler.handle(id);
- promise.complete();
+ return id;
+ }, ordered));
+ }
+
+ /**
+ * Set a periodic timer to fire every {@code delay} milliseconds with
initial delay, at which point
+ * {@code handler} will be called with the id of the timer.
+ *
+ * @param initialDelay the initial delay in milliseconds
+ * @param delay the delay in milliseconds, after which the
timer will fire
+ * @param handler the handler that will be called with the timer
ID when the timer fires
+ * @return the unique ID of the timer
+ */
+ public long setPeriodic(long initialDelay, long delay, Handler<Long>
handler)
+ {
+ return setPeriodic(initialDelay, delay, handler, false);
+ }
+
+ /**
+ * Set a periodic timer to fire every {@code delay} milliseconds with
initial delay, at which point
+ * {@code handler} will be called with the id of the timer.
+ *
+ * @param initialDelay the initial delay in milliseconds
+ * @param delay the delay in milliseconds, after which the
timer will fire
+ * @param handler the handler that will be called with the timer
ID when the timer fires
+ * @param ordered if true then executeBlocking is called several
times on the same context, the
+ * executions for that context will be executed
serially, not in parallel. if false
+ * then they will be no ordering guarantees
+ * @return the unique ID of the timer
+ */
+ public long setPeriodic(long initialDelay, long delay, Handler<Long>
handler, boolean ordered)
+ {
+ return vertx.setPeriodic(initialDelay,
+ delay,
+ id -> workerExecutor.executeBlocking(()
-> {
+ handler.handle(id);
+ return id;
Review Comment:
Although the new library suggests to use the new API that takes `Callable`,
one difference I noticed in the `WorkerExecutorImpl` is that the implementation
with `Callable` does not check whether Worker executor is closed, meaning the
callable is scheduled to the closing pool regardless.
It could be a concern.
I would revert and use the deprecated API.
##########
src/main/java/org/apache/cassandra/sidecar/server/MainModule.java:
##########
@@ -241,11 +237,27 @@ public Router vertxRouter(Vertx vertx,
return router;
}
+ @Provides
+ @Singleton
+ public PeriodicTaskExecutor periodicTaskExecutor(Vertx vertx,
+ ExecutorPools
executorPools,
+ HealthCheckPeriodicTask
healthCheckTask)
+ {
+ EventBus eventBus = vertx.eventBus();
+ PeriodicTaskExecutor executor = new
PeriodicTaskExecutor(executorPools);
+ eventBus.localConsumer(ON_SERVER_START, message ->
executor.schedule(healthCheckTask));
+ eventBus.localConsumer(ON_SERVER_STOP, message ->
executor.unschedule(healthCheckTask));
Review Comment:
nit: the event bus message registration for health check fits better in
`HealthCheckPeriodicTask` constructor.
The method here should just return an executor
##########
src/main/java/org/apache/cassandra/sidecar/server/SidecarServerEvents.java:
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.server;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import io.vertx.core.Vertx;
+import io.vertx.core.eventbus.EventBus;
+import io.vertx.core.eventbus.MessageConsumer;
+import io.vertx.core.json.JsonArray;
+import io.vertx.core.json.JsonObject;
+import org.apache.cassandra.sidecar.cluster.InstancesConfig;
+import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+
+/**
+ * Defines the {@link io.vertx.core.eventbus.EventBus} addresses where
different notifications will be published
+ * during Sidecar startup/shutdown, as well as CQL connection availability.
+ *
+ * <p>The messages can be published multiple times depending on whether
Sidecar is started or stopped
+ * during the lifetime of the application. Implementing consumers will need to
deal with this expectation
+ * internally.
+ * <p>
+ * The expectation is that:
+ * <ul>
+ * <li>{@link #ON_SERVER_START} will happen first
+ * <li>{@link #ON_SERVER_STOP} can happen before {@link
#ON_ALL_CASSANDRA_CQL_READY}
+ * <li>{@link #ON_SERVER_START} can only happen for any subsequent calls only
after a {@link #ON_SERVER_STOP} message
+ * <li>{@link #ON_ALL_CASSANDRA_CQL_READY} might never happen
+ * <li>{@link #ON_CASSANDRA_CQL_READY} can be called multiple times with
different cassandraInstanceId values
+ * <li>{@link #ON_CASSANDRA_CQL_DISCONNECTED} can be called multiple times
with different cassandraInstanceId values
+ * </ul>
+ * <p>
+ * However, implementers should choose to implement methods assuming no
guarantees to the event sequence.
+ */
+@Singleton
+public class SidecarServerEvents
+{
+ private static final Logger LOGGER =
LoggerFactory.getLogger(SidecarServerEvents.class);
+
+ /**
+ * The {@link io.vertx.core.eventbus.EventBus} address where server start
events will be published. Server start
+ * will be published whenever Sidecar has successfully started and is
ready listening for requests.
+ */
+ public static final String ON_SERVER_START =
SidecarServerEvents.class.getName() + ".ON_SERVER_START";
+
+ /**
+ * The {@link io.vertx.core.eventbus.EventBus} address where server
stop/shutdown events will be published.
+ * Server stop events will be published whenever Sidecar is stopping or
shutting down.
+ */
+ public static final String ON_SERVER_STOP =
SidecarServerEvents.class.getName() + ".ON_SERVER_STOP";
+
+ /**
+ * The {@link io.vertx.core.eventbus.EventBus} address where events will
be published when a CQL connection for
+ * a given instance has been established. The instance identifier will be
passed as part of the message.
+ */
+ public static final String ON_CASSANDRA_CQL_READY =
SidecarServerEvents.class.getName() + ".ON_CASSANDRA_CQL_READY";
Review Comment:
Instead of hardcode the names, you can declare the enum instead. It store
all the name in the list that can be accessed using the enum's ordinal.
```java
enum Event
{
ON_SERVER_START;
private static List<String> eventNames = Arrays.stream(values())
.map(e ->
e.getDeclaringClass().getName() + '.' + e.name())
.collect(Collectors.toList());
String eventName()
{
return eventNames.get(ordinal());
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]