yifan-c commented on code in PR #72:
URL: https://github.com/apache/cassandra-sidecar/pull/72#discussion_r1367368330


##########
src/main/java/org/apache/cassandra/sidecar/cluster/instance/InstanceMetadata.java:
##########
@@ -20,7 +20,8 @@
 
 import java.util.List;
 
-import org.apache.cassandra.sidecar.common.CassandraAdapterDelegate;
+import org.apache.cassandra.sidecar.utils.CassandraAdapterDelegate;

Review Comment:
   `CassandraAdapterDelegate` does not seem to fit in the `utils` package. In 
the existing packages, `cluster` would be a better fit. 



##########
src/main/dist/conf/sidecar.yaml:
##########
@@ -68,10 +68,20 @@ sidecar:
   port: 9043
   request_idle_timeout_millis: 300000 # this field expects integer value
   request_timeout_millis: 300000
+  tcp_keep_alive: false
+  accept_backlog: 1024
+  server_verticle_instances: 1
   throttle:
     stream_requests_per_sec: 5000
     delay_sec: 5
     timeout_sec: 10
+  traffic_shaping:
+    inbound_global_bandwidth_bps: 0               # 0 implies unthrottled, the 
inbound bandwidth in bytes per second
+    outbound_global_bandwidth_bps: 0              # 0 implies unthrottled, the 
outbound bandwidth in bytes per second
+    peak_outbound_global_bandwidth_bps: 419430400 # 400 Mbps, the peak 
outbound bandwidth in bytes per second

Review Comment:
   `Mbps` means megabits per seconds. 
   The value here is indeed `MBps`.
   
   It is too easy to be confused with the abbreviation, i.e. upper B and lower 
b. 
   
   I would just put this for the comment 
   
   "the peak outbound bandwidth in bytes per second. The default is 400 
mebibytes per second"



##########
src/main/java/org/apache/cassandra/sidecar/concurrent/ExecutorPools.java:
##########
@@ -119,9 +120,45 @@ public long setPeriodic(long delay, Handler<Long> handler)
         public long setPeriodic(long delay, Handler<Long> handler, boolean 
ordered)
         {
             return vertx.setPeriodic(delay,
-                                     id -> 
workerExecutor.executeBlocking(promise -> {
+                                     id -> workerExecutor.executeBlocking(() 
-> {
                                          handler.handle(id);
-                                         promise.complete();
+                                         return id;
+                                     }, ordered));

Review Comment:
   instead of scheduling. It can just call the other `setPeriodic` overloads.
   
   ```
           public long setPeriodic(long delay, Handler<Long> handler, boolean 
ordered)
           {
               return setPeriodic(delay, delay, handler, ordered);
           }
   ```



##########
src/main/java/org/apache/cassandra/sidecar/config/TrafficShapingConfiguration.java:
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.config;
+
+import io.netty.handler.traffic.AbstractTrafficShapingHandler;
+import io.netty.handler.traffic.GlobalTrafficShapingHandler;
+
+/**
+ * Configuration for the global traffic shaping options. These TCP server 
options enable configuration of
+ * bandwidth limiting. Both inbound and outbound bandwidth can be limited 
through these options.
+ */
+public interface TrafficShapingConfiguration
+{
+    /**
+     * Default inbound bandwidth limit in bytes/sec = 0 (0 implies unthrottled)
+     */
+    long DEFAULT_INBOUND_GLOBAL_BANDWIDTH_LIMIT = 0;

Review Comment:
   Why defining the constants in the interface? How about moving them to 
`TrafficShapingConfigurationImpl`?



##########
src/main/java/org/apache/cassandra/sidecar/config/ServiceConfiguration.java:
##########
@@ -33,11 +36,33 @@ public interface ServiceConfiguration
      */
     String host();
 
+    /**
+     * Returns a list of interfaces where the Sidecar process will bind and 
listen for connections. Defaults to
+     * the configured {@link #host()}.
+     *
+     * @return a list of interfaces where Sidecar will listen
+     */
+    default List<String> listenInterfaces()
+    {
+        return Collections.singletonList(Objects.requireNonNull(host(), "host 
must be provided"));
+    }
+
     /**
      * @return Sidecar's HTTP REST API port
      */
     int port();
 
+    /**
+     * Returns a list of ports where the Sidecar process will bind and listen 
for connections. Defaults to
+     * the configured {@link #port()}
+     *
+     * @return a list of ports where Sidecar will listen
+     */
+    default List<Integer> ports()
+    {
+        return Collections.singletonList(port());

Review Comment:
   The matrix of network interface array and port array is confusing. 
   For example, for interface array [i1, i2] and port array [8000, 9000], does 
it listens to [i1:8000, i1:9000, i2:8000, i2:9000] or [i1:8000, i2:9000]?
   
   For better API clarity, I would just return a list of `SocketAddress`. 



##########
src/main/java/org/apache/cassandra/sidecar/config/SslConfiguration.java:
##########
@@ -28,6 +28,29 @@ public interface SslConfiguration
      */
     boolean enabled();
 
+    /**
+     * Returns {@code true} if the OpenSSL engine should be used, {@code 
false} otherwise. The OpenSSL engine
+     * will only be enabled if the native libraries for OpenSSL have been 
loaded correctly.
+     *
+     * @return {@code true} if the OpenSSL engine should be used, {@code 
false} otherwise

Review Comment:
   nit: 
   - duplicated sentences. 
   - Since configuring `true` does not guarantee OpenSSL is picked, it is only 
an intent or preference. How about renaming the config to `preferOpenSSL`?



##########
src/main/java/org/apache/cassandra/sidecar/tasks/HealthCheckPeriodicTask.java:
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.tasks;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import org.apache.cassandra.sidecar.cluster.InstancesConfig;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+
+/**
+ * Periodically checks the health of every instance configured in the {@link 
InstancesConfig}.
+ */
+@Singleton
+public class HealthCheckPeriodicTask implements PeriodicTask
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(HealthCheckPeriodicTask.class);
+    private final SidecarConfiguration configuration;
+    private final InstancesConfig instancesConfig;
+    private final ExecutorPools.TaskExecutorPool internalPool;
+
+    @Inject
+    public HealthCheckPeriodicTask(SidecarConfiguration configuration,
+                                   InstancesConfig instancesConfig,
+                                   ExecutorPools executorPools)
+    {
+        this.configuration = configuration;
+        this.instancesConfig = instancesConfig;
+        internalPool = executorPools.internal();
+    }
+
+    @Override
+    public long initialDelay()
+    {
+        return configuration.healthCheckConfiguration().initialDelayMillis();
+    }
+
+    @Override
+    public long delay()
+    {
+        return configuration.healthCheckConfiguration().checkIntervalMillis();
+    }
+
+    /**
+     * Run health checks on all the configured instances
+     */
+    @Override
+    public void execute()
+    {
+        instancesConfig.instances()
+                       .forEach(instanceMetadata -> 
internalPool.executeBlocking(promise -> {
+                           try
+                           {
+                               instanceMetadata.delegate().healthCheck();
+                               promise.complete();
+                           }
+                           catch (Throwable cause)
+                           {
+                               promise.fail(cause);
+                               LOGGER.error("Unable to complete health check 
on instance={}",
+                                            instanceMetadata.id(), cause);
+                           }
+                       }));

Review Comment:
   nit: submit with `unordered == true`.



##########
src/main/java/org/apache/cassandra/sidecar/concurrent/ExecutorPools.java:
##########
@@ -119,9 +120,45 @@ public long setPeriodic(long delay, Handler<Long> handler)
         public long setPeriodic(long delay, Handler<Long> handler, boolean 
ordered)
         {
             return vertx.setPeriodic(delay,
-                                     id -> 
workerExecutor.executeBlocking(promise -> {
+                                     id -> workerExecutor.executeBlocking(() 
-> {
                                          handler.handle(id);
-                                         promise.complete();
+                                         return id;
+                                     }, ordered));
+        }
+
+        /**
+         * Set a periodic timer to fire every {@code delay} milliseconds with 
initial delay, at which point
+         * {@code handler} will be called with the id of the timer.
+         *
+         * @param initialDelay the initial delay in milliseconds
+         * @param delay        the delay in milliseconds, after which the 
timer will fire
+         * @param handler      the handler that will be called with the timer 
ID when the timer fires
+         * @return the unique ID of the timer
+         */
+        public long setPeriodic(long initialDelay, long delay, Handler<Long> 
handler)
+        {
+            return setPeriodic(initialDelay, delay, handler, false);
+        }
+
+        /**
+         * Set a periodic timer to fire every {@code delay} milliseconds with 
initial delay, at which point
+         * {@code handler} will be called with the id of the timer.
+         *
+         * @param initialDelay the initial delay in milliseconds
+         * @param delay        the delay in milliseconds, after which the 
timer will fire
+         * @param handler      the handler that will be called with the timer 
ID when the timer fires
+         * @param ordered      if true then executeBlocking is called several 
times on the same context, the
+         *                     executions for that context will be executed 
serially, not in parallel. if false
+         *                     then they will be no ordering guarantees
+         * @return the unique ID of the timer
+         */
+        public long setPeriodic(long initialDelay, long delay, Handler<Long> 
handler, boolean ordered)
+        {
+            return vertx.setPeriodic(initialDelay,
+                                     delay,
+                                     id -> workerExecutor.executeBlocking(() 
-> {
+                                         handler.handle(id);
+                                         return id;

Review Comment:
   Although the new library suggests to use the new API that takes `Callable`, 
one difference I noticed in the `WorkerExecutorImpl` is that the implementation 
with `Callable` does not check whether Worker executor is closed, meaning the 
callable is scheduled to the closing pool regardless. 
   
   It could be a concern.
   
   I would revert and use the deprecated API. 



##########
src/main/java/org/apache/cassandra/sidecar/server/MainModule.java:
##########
@@ -241,11 +237,27 @@ public Router vertxRouter(Vertx vertx,
         return router;
     }
 
+    @Provides
+    @Singleton
+    public PeriodicTaskExecutor periodicTaskExecutor(Vertx vertx,
+                                                     ExecutorPools 
executorPools,
+                                                     HealthCheckPeriodicTask 
healthCheckTask)
+    {
+        EventBus eventBus = vertx.eventBus();
+        PeriodicTaskExecutor executor = new 
PeriodicTaskExecutor(executorPools);
+        eventBus.localConsumer(ON_SERVER_START, message -> 
executor.schedule(healthCheckTask));
+        eventBus.localConsumer(ON_SERVER_STOP, message -> 
executor.unschedule(healthCheckTask));

Review Comment:
   nit: the event bus message registration for health check fits better in 
`HealthCheckPeriodicTask` constructor. 
   
   The method here should just return an executor



##########
src/main/java/org/apache/cassandra/sidecar/server/SidecarServerEvents.java:
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.server;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import io.vertx.core.Vertx;
+import io.vertx.core.eventbus.EventBus;
+import io.vertx.core.eventbus.MessageConsumer;
+import io.vertx.core.json.JsonArray;
+import io.vertx.core.json.JsonObject;
+import org.apache.cassandra.sidecar.cluster.InstancesConfig;
+import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+
+/**
+ * Defines the {@link io.vertx.core.eventbus.EventBus} addresses where 
different notifications will be published
+ * during Sidecar startup/shutdown, as well as CQL connection availability.
+ *
+ * <p>The messages can be published multiple times depending on whether 
Sidecar is started or stopped
+ * during the lifetime of the application. Implementing consumers will need to 
deal with this expectation
+ * internally.
+ * <p>
+ * The expectation is that:
+ * <ul>
+ * <li>{@link #ON_SERVER_START} will happen first
+ * <li>{@link #ON_SERVER_STOP} can happen before {@link 
#ON_ALL_CASSANDRA_CQL_READY}
+ * <li>{@link #ON_SERVER_START} can only happen for any subsequent calls only 
after a {@link #ON_SERVER_STOP} message
+ * <li>{@link #ON_ALL_CASSANDRA_CQL_READY} might never happen
+ * <li>{@link #ON_CASSANDRA_CQL_READY} can be called multiple times with 
different cassandraInstanceId values
+ * <li>{@link #ON_CASSANDRA_CQL_DISCONNECTED} can be called multiple times 
with different cassandraInstanceId values
+ * </ul>
+ * <p>
+ * However, implementers should choose to implement methods assuming no 
guarantees to the event sequence.
+ */
+@Singleton
+public class SidecarServerEvents
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(SidecarServerEvents.class);
+
+    /**
+     * The {@link io.vertx.core.eventbus.EventBus} address where server start 
events will be published. Server start
+     * will be published whenever Sidecar has successfully started and is 
ready listening for requests.
+     */
+    public static final String ON_SERVER_START = 
SidecarServerEvents.class.getName() + ".ON_SERVER_START";
+
+    /**
+     * The {@link io.vertx.core.eventbus.EventBus} address where server 
stop/shutdown events will be published.
+     * Server stop events will be published whenever Sidecar is stopping or 
shutting down.
+     */
+    public static final String ON_SERVER_STOP = 
SidecarServerEvents.class.getName() + ".ON_SERVER_STOP";
+
+    /**
+     * The {@link io.vertx.core.eventbus.EventBus} address where events will 
be published when a CQL connection for
+     * a given instance has been established. The instance identifier will be 
passed as part of the message.
+     */
+    public static final String ON_CASSANDRA_CQL_READY = 
SidecarServerEvents.class.getName() + ".ON_CASSANDRA_CQL_READY";

Review Comment:
   Instead of hardcode the names, you can declare the enum instead. It store 
all the name in the list that can be accessed using the enum's ordinal. 
   
   ```java
       enum Event
       {
           ON_SERVER_START;
   
           private static List<String> eventNames = Arrays.stream(values())
                                                          .map(e -> 
e.getDeclaringClass().getName() + '.' + e.name())
                                                          
.collect(Collectors.toList());
   
           String eventName()
           {
               return eventNames.get(ordinal());
           }
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to