yifan-c commented on code in PR #193: URL: https://github.com/apache/cassandra-sidecar/pull/193#discussion_r1962568090
########## server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcConfig.java: ########## @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.sidecar.cdc; + +import java.time.Duration; +import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +/** + * In memory representation of CDC and Kafka configurations from "configs" table in sidecar keyspace. + */ +public interface CdcConfig +{ + Logger LOGGER = LoggerFactory.getLogger(CdcConfig.class); + + String DEFAULT_JOB_ID = "test-job"; + int DEFAULT_MAX_WATERMARKER_SIZE = 400000; Review Comment: It is generally not advised to define constants in an interface. Can you relocate them to the implementation, `CdcConfigImpl`? ########## server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcConfig.java: ########## @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.sidecar.cdc; + +import java.time.Duration; +import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +/** + * In memory representation of CDC and Kafka configurations from "configs" table in sidecar keyspace. + */ +public interface CdcConfig +{ + Logger LOGGER = LoggerFactory.getLogger(CdcConfig.class); + + String DEFAULT_JOB_ID = "test-job"; + int DEFAULT_MAX_WATERMARKER_SIZE = 400000; + + CdcConfig STUB = new CdcConfig() + { + }; Review Comment: Can this be removed and declared in `CdcConfigImpl` instead? It is not used, so you can probably just delete it. ########## client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java: ########## @@ -544,6 +550,20 @@ public void streamCdcSegments(SidecarInstance sidecarInstance, .build(), streamConsumer); } + public CompletableFuture<GetServicesConfigPayload> getServiceConfig() + { + return executor.executeRequestAsync(requestBuilder() + .request(new GetServiceConfigRequest()) + .build()); + } + + public CompletableFuture<PutCdcServiceConfigPayload> putCdcServiceConfig(ServiceConfig serviceConfig, Map<String, String> config) + { + return executor.executeRequestAsync(requestBuilder() + .request(new PutServiceConfigRequest(serviceConfig, new PutCdcServiceConfigPayload(config))) + .build()); Review Comment: 1. align the chained invocations. 2. add javadocs for both methods ########## server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcConfigImpl.java: ########## @@ -0,0 +1,337 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.sidecar.cdc; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.function.Supplier; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import com.google.inject.Inject; +import com.google.inject.Singleton; +import io.vertx.core.Promise; + +import org.apache.cassandra.sidecar.common.server.utils.DurationSpec; +import org.apache.cassandra.sidecar.config.CdcConfiguration; +import org.apache.cassandra.sidecar.config.SchemaKeyspaceConfiguration; +import org.apache.cassandra.sidecar.db.CdcConfigAccessor; +import org.apache.cassandra.sidecar.db.KafkaConfigAccessor; +import org.apache.cassandra.sidecar.tasks.PeriodicTask; +import org.apache.cassandra.sidecar.tasks.PeriodicTaskExecutor; +import org.apache.cassandra.sidecar.tasks.ScheduleDecision; +import org.jetbrains.annotations.NotNull; + +/** + * Implementation of the interface {@link CdcConfig}, an in-memory representation holding + * CDC and Kafka configurations from "configs" table inside sidecar internal keyspace. + */ +@Singleton +public class CdcConfigImpl implements CdcConfig +{ + private static final Logger LOGGER = LoggerFactory.getLogger(CdcConfigImpl.class); + private static final String CDC_CONFIG_DC_KEY = "dc"; + private static final String CDC_CONFIG_LOG_ONLY_KEY = "log_only"; + private static final String CDC_CONFIG_PERSIST_STATE_KEY = "persist_state"; + private static final String CDC_CONFIG_ENV_KEY = "env"; + private static final String KAFKA_CONFIG_TOPIC_KEY = "topic"; + private static final String KAFKA_FORMAT_TYPE_CONFIG_TOPIC_KEY = "topic_format_type"; + private static final String CDC_ENABLED_KEY = "cdc_enabled"; + private static final String KAFKA_CONFIG_JOB_ID_KEY = "jobId"; + private static final String WATERMARK_WINDOW_KEY = "watermark_seconds"; + private static final String MICROBATCH_DELAY_KEY = "microbatch_delay_millis"; + private static final String CDC_CONFIG_MAX_COMMIT_LOGS_KEY = "max_commit_logs"; + private static final String CDC_MAX_WATERMARKER_SIZE_KEY = "max_watermarker_size"; + private static final String CDC_FAIL_KAFKA_ERRORS = "fail_kafka_errors"; + private static final String CDC_FAIL_KAFKA_TOO_LARGE_ERRORS = "fail_kafka_too_large_errors"; + private static final String CDC_PERSIST_DELAY_MILLIS = "persist_delay_millis"; + + private final SchemaKeyspaceConfiguration schemaKeyspaceConfiguration; + private final CdcConfiguration cdcConfiguration; + private final CdcConfigAccessor cdcConfigAccessor; + private final KafkaConfigAccessor kafkaConfigAccessor; + private final List<Callable<?>> configChangeListeners = Collections.synchronizedList(new ArrayList<>()); + private final ConfigRefreshNotifier configRefreshNotifier; + private volatile Map<String, String> kafkaConfigMappings = Map.of(); + private volatile Map<String, String> cdcConfigMappings = Map.of(); + + @Inject + public CdcConfigImpl(CdcConfiguration cdcConfiguration, + SchemaKeyspaceConfiguration schemaKeyspaceConfiguration, + CdcConfigAccessor cdcConfigAccessor, + KafkaConfigAccessor kafkaConfigAccessor, + PeriodicTaskExecutor periodicTaskExecutor) + { + this.schemaKeyspaceConfiguration = schemaKeyspaceConfiguration; + this.cdcConfiguration = cdcConfiguration; + this.cdcConfigAccessor = cdcConfigAccessor; + this.kafkaConfigAccessor = kafkaConfigAccessor; + + if (this.schemaKeyspaceConfiguration.isEnabled()) + { + this.configRefreshNotifier = new ConfigRefreshNotifier(); + periodicTaskExecutor.schedule(configRefreshNotifier); + } + else + { + this.configRefreshNotifier = null; + } + } + + @Override + public Map<String, Object> kafkaConfigs() + { + Map<String, Object> kafkaConfigs = new HashMap<>(); + kafkaConfigs.putAll(getAuthConfigs()); + kafkaConfigs.putAll(kafkaConfigMappings); + return ImmutableMap.copyOf(kafkaConfigs); + } + + @Override + public Map<String, Object> cdcConfigs() + { + return ImmutableMap.copyOf(cdcConfigMappings); + } + + @Override + public boolean isConfigReady() + { + return cdcConfigAccessor.isSchemaInitialized() + && !kafkaConfigMappings.isEmpty() + && !cdcConfigMappings.isEmpty(); + } + + @Override + public String kafkaTopic() + { + return cdcConfigMappings.getOrDefault(KAFKA_CONFIG_TOPIC_KEY, null); + } + + @NotNull + public TopicFormatType topicFormat() + { + return TopicFormatType.valueOf(cdcConfigMappings.getOrDefault(KAFKA_FORMAT_TYPE_CONFIG_TOPIC_KEY, TopicFormatType.STATIC.name())); + } + + public boolean cdcEnabled() + { + return Boolean.parseBoolean(cdcConfigMappings.getOrDefault(CDC_ENABLED_KEY, "true")); + } + + @Override + public String jobId() + { + return cdcConfigMappings.getOrDefault(KAFKA_CONFIG_JOB_ID_KEY, DEFAULT_JOB_ID); + } + + @Override + public boolean logOnly() + { + return getBool(CDC_CONFIG_LOG_ONLY_KEY, false); + } + + @Override + public boolean persistEnabled() + { + return getBool(CDC_CONFIG_PERSIST_STATE_KEY, true); + } + + @Override + public boolean failOnRecordTooLargeError() + { + return getBool(CDC_FAIL_KAFKA_TOO_LARGE_ERRORS, false); + } + + @Override + public boolean failOnKafkaError() + { + return getBool(CDC_FAIL_KAFKA_ERRORS, true); + } + + @Override + public Duration persistDelay() + { + return Duration.ofMillis(getInt(CDC_PERSIST_DELAY_MILLIS, 1000)); + } + + @Override + public String dc() + { + return cdcConfigMappings.get(CDC_CONFIG_DC_KEY); + } + + @Override + public Duration watermarkWindow() + { + // this prop sets the maximum duration age accepted by CDC, any mutations with write timestamps older than + // the watermark window will be dropped with log message "Exclude the update due to out of the allowed time window." + final int seconds = getInt(WATERMARK_WINDOW_KEY, 259200); + return Duration.ofSeconds(seconds); + } + + @Override + public Duration minDelayBetweenMicroBatches() + { + // this prop allows us to add a minimum delay between CDC micro batches + // usually if we need to slow down CDC + // e.g. if CDC is started with a large backlog of commit log segments and is working hard to process. + // e.g. or if there is a large data dump or burst of writes that causes high CDC activity. + final long millis = Long.parseLong(cdcConfigMappings.getOrDefault(MICROBATCH_DELAY_KEY, "1000")); + return Duration.ofMillis(millis); + } + + @Override + public String env() + { + return cdcConfigMappings.getOrDefault(CDC_CONFIG_ENV_KEY, ""); + } + + @Override + public int maxCommitLogsPerInstance() + { + return getInt(CDC_CONFIG_MAX_COMMIT_LOGS_KEY, CdcConfig.super::maxCommitLogsPerInstance); + } + + @Override + public int maxWatermarkerSize() + { + return getInt(CDC_MAX_WATERMARKER_SIZE_KEY, CdcConfig.super::maxWatermarkerSize); + } + + protected boolean getBool(String key, boolean orDefault) + { + String bool = cdcConfigMappings.get(key); + return bool != null ? Boolean.parseBoolean(bool) : orDefault; + } + + protected int getInt(String key, int orDefault) + { + return getInt(key, () -> orDefault); + } + + protected int getInt(String key, Supplier<Integer> orDefault) + { + String aInt = cdcConfigMappings.get(key); + return aInt != null ? Integer.valueOf(aInt) : orDefault.get(); + } + + /** + * Adds a listener to service config changes + * + * @param listener The listener to call + */ + public void registerConfigChangeListener(Callable<?> listener) + { + this.configChangeListeners.add(listener); + } + + private Map<String, Object> getAuthConfigs() + { + Map<String, Object> authConfigs = new HashMap<>(); + authConfigs.put("pie.queue.kaffe.client.private.key.location", cdcConfiguration.kafkaClientPrivateKeyPath()); + return authConfigs; + } + + @VisibleForTesting + void forceExecuteNotifier() + { + if (configRefreshNotifier != null && + configRefreshNotifier.scheduleDecision() == ScheduleDecision.EXECUTE) + { + configRefreshNotifier.execute(Promise.promise()); + } + } + + @VisibleForTesting + ConfigRefreshNotifier configRefreshNotifier() + { + return configRefreshNotifier; + } + + class ConfigRefreshNotifier implements PeriodicTask + { + @Override + public DurationSpec delay() + { + return cdcConfiguration.cdcConfigRefreshTime(); + } + + @Override + public void execute(Promise<Void> promise) + { + for (Callable<?> listener : configChangeListeners) + { + try + { + listener.call(); + } + catch (Throwable e) + { + LOGGER.error(String.format("There was an error with callback %s", listener), e); Review Comment: Do not use string format. ########## server/src/main/java/org/apache/cassandra/sidecar/routes/cdc/UpdateCdcConfigRequestValidationHandler.java: ########## @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.sidecar.routes.cdc; + +import com.google.inject.Singleton; +import io.vertx.core.Handler; +import io.vertx.core.json.JsonObject; +import io.vertx.ext.web.RoutingContext; + + +/** + * Updating a service config from "configs" table in sidecar keyspace should perform some + * validation checks before updating the config. + * {@link DeleteCdcConfigRequestValidationHandler} performs those validations before the + * update operation. + */ +@Singleton +public class UpdateCdcConfigRequestValidationHandler implements Handler<RoutingContext> +{ + @Override + public void handle(RoutingContext context) + { + final JsonObject payload = context.getBodyAsJson(); Review Comment: ```suggestion JsonObject payload = context.body().asJsonObject(); ``` ########## server/src/main/java/org/apache/cassandra/sidecar/routes/cdc/ServiceConfigValidators.java: ########## @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.sidecar.routes.cdc; + +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import io.netty.handler.codec.http.HttpResponseStatus; +import io.vertx.core.json.JsonObject; +import io.vertx.ext.web.RoutingContext; + +import static org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException; + +/** + * Certain validations on payload and services have to be done before updating or deleting + * configs for services in "configs" table. {@link ServiceConfigValidators} has static + * utility methods for some of those validations. + */ +public class ServiceConfigValidators Review Comment: Can you rename it to the singular form? Prefer create utility instance (and inject as singleton), instead of static helpers. This way, it is possible to bind to a different implementation, in case we want to change the validation behavior. ########## server/src/main/java/org/apache/cassandra/sidecar/routes/cdc/DeleteServiceConfigHandler.java: ########## @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.sidecar.routes.cdc; + + +import java.util.Collections; +import java.util.NoSuchElementException; +import java.util.Set; +import com.google.inject.Inject; +import com.google.inject.Singleton; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.vertx.core.Handler; +import io.vertx.ext.auth.authorization.Authorization; +import io.vertx.ext.web.RoutingContext; + +import org.apache.cassandra.sidecar.acl.authorization.BasicPermissions; +import org.apache.cassandra.sidecar.db.ConfigAccessor; +import org.apache.cassandra.sidecar.db.ConfigAccessorFactory; +import org.apache.cassandra.sidecar.routes.AccessProtected; +import static org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException; + +/** + * Provides REST endpoint for deleting CDC/Kafka configs + */ +@Singleton +public class DeleteServiceConfigHandler implements Handler<RoutingContext>, AccessProtected +{ + private final ConfigAccessorFactory configAccessorFactory; + + @Inject + public DeleteServiceConfigHandler(final ConfigAccessorFactory configAccessorFactory) Review Comment: final ########## server/src/main/java/org/apache/cassandra/sidecar/routes/cdc/UpdateServiceConfigHandler.java: ########## @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.sidecar.routes.cdc; + +import java.util.Collections; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import com.google.inject.Inject; +import com.google.inject.Singleton; +import io.vertx.core.Handler; +import io.vertx.core.json.JsonObject; +import io.vertx.ext.auth.authorization.Authorization; +import io.vertx.ext.web.RoutingContext; +import org.apache.cassandra.sidecar.acl.authorization.BasicPermissions; +import org.apache.cassandra.sidecar.db.ConfigAccessor; +import org.apache.cassandra.sidecar.db.ConfigAccessorFactory; +import org.apache.cassandra.sidecar.routes.AccessProtected; + +/** + * Provides REST endpoint for updating CDC/Kafka configs. + */ +@Singleton +public class UpdateServiceConfigHandler implements Handler<RoutingContext>, AccessProtected +{ + private final ConfigAccessorFactory configAccessorFactory; + + @Inject + public UpdateServiceConfigHandler(final ConfigAccessorFactory configAccessorFactory) + { + this.configAccessorFactory = configAccessorFactory; + } + + @Override + public Set<Authorization> requiredAuthorizations() + { + return Collections.singleton(BasicPermissions.CDC.toAuthorization()); + } + + @Override + public void handle(RoutingContext context) + { + JsonObject body = context.getBodyAsJson(); + final String service = context.pathParam(ConfigPayloadParams.SERVICE); + ConfigAccessor accessor = configAccessorFactory.getConfigAccessor(service); + Map<String, String> config = context.getBodyAsJson().getJsonObject(ConfigPayloadParams.CONFIG).getMap().entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> (String) e.getValue())); + accessor.storeConfig(config); Review Comment: Here, the body is extract as json for the second time and the third time. (The first time of extraction is at the validation handler). Let's just merge the validation handler. Extract/materialize the json body just once, and keep the reference. ########## server/src/main/java/org/apache/cassandra/sidecar/db/ConfigAccessorFactory.java: ########## @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.sidecar.db; + +import com.google.inject.Inject; +import org.apache.cassandra.sidecar.routes.cdc.ValidServices; + +/** + * Factory for creating config objects based on the service name. + */ +public class ConfigAccessorFactory +{ + private final KafkaConfigAccessor kafkaConfigAccessor; + private final CdcConfigAccessor cdcConfigAccessor; Review Comment: Remove them. And have a map instead. ``` Map<String, ConfigAccessor> configAccessors ``` where the key is service name in lower case. ########## server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcConfigImpl.java: ########## @@ -0,0 +1,337 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.sidecar.cdc; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.function.Supplier; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import com.google.inject.Inject; +import com.google.inject.Singleton; +import io.vertx.core.Promise; + +import org.apache.cassandra.sidecar.common.server.utils.DurationSpec; +import org.apache.cassandra.sidecar.config.CdcConfiguration; +import org.apache.cassandra.sidecar.config.SchemaKeyspaceConfiguration; +import org.apache.cassandra.sidecar.db.CdcConfigAccessor; +import org.apache.cassandra.sidecar.db.KafkaConfigAccessor; +import org.apache.cassandra.sidecar.tasks.PeriodicTask; +import org.apache.cassandra.sidecar.tasks.PeriodicTaskExecutor; +import org.apache.cassandra.sidecar.tasks.ScheduleDecision; +import org.jetbrains.annotations.NotNull; + +/** + * Implementation of the interface {@link CdcConfig}, an in-memory representation holding + * CDC and Kafka configurations from "configs" table inside sidecar internal keyspace. + */ +@Singleton +public class CdcConfigImpl implements CdcConfig +{ + private static final Logger LOGGER = LoggerFactory.getLogger(CdcConfigImpl.class); + private static final String CDC_CONFIG_DC_KEY = "dc"; + private static final String CDC_CONFIG_LOG_ONLY_KEY = "log_only"; + private static final String CDC_CONFIG_PERSIST_STATE_KEY = "persist_state"; + private static final String CDC_CONFIG_ENV_KEY = "env"; + private static final String KAFKA_CONFIG_TOPIC_KEY = "topic"; + private static final String KAFKA_FORMAT_TYPE_CONFIG_TOPIC_KEY = "topic_format_type"; + private static final String CDC_ENABLED_KEY = "cdc_enabled"; + private static final String KAFKA_CONFIG_JOB_ID_KEY = "jobId"; + private static final String WATERMARK_WINDOW_KEY = "watermark_seconds"; + private static final String MICROBATCH_DELAY_KEY = "microbatch_delay_millis"; + private static final String CDC_CONFIG_MAX_COMMIT_LOGS_KEY = "max_commit_logs"; + private static final String CDC_MAX_WATERMARKER_SIZE_KEY = "max_watermarker_size"; + private static final String CDC_FAIL_KAFKA_ERRORS = "fail_kafka_errors"; + private static final String CDC_FAIL_KAFKA_TOO_LARGE_ERRORS = "fail_kafka_too_large_errors"; + private static final String CDC_PERSIST_DELAY_MILLIS = "persist_delay_millis"; Review Comment: nit: define enums for the keys? and use the `name()` of each enum. It is less verbose. ########## server/src/main/java/org/apache/cassandra/sidecar/config/CdcConfiguration.java: ########## @@ -28,4 +29,14 @@ public interface CdcConfiguration * @return segment hard link cache expiration time used in {@link org.apache.cassandra.sidecar.cdc.CdcLogCache} */ SecondBoundConfiguration segmentHardLinkCacheExpiry(); + + /** + * + * @return returns if cdc feature is enabled + */ + boolean isEnabled(); + + String kafkaClientPrivateKeyPath(); + + MillisecondBoundConfiguration cdcConfigRefreshTime(); Review Comment: javadoc ########## server/src/main/java/org/apache/cassandra/sidecar/config/yaml/CdcConfigurationImpl.java: ########## @@ -32,22 +33,50 @@ public class CdcConfigurationImpl implements CdcConfiguration { private static final Logger LOGGER = LoggerFactory.getLogger(CdcConfigurationImpl.class); + public static final String IS_ENABLED_PROPERTY = "is_enabled"; + public static final String CONFIGURATION_REFRESH_TIME_PROPERTY = "config_refresh_time"; public static final String SEGMENT_HARD_LINK_CACHE_EXPIRY_PROPERTY = "segment_hardlink_cache_expiry"; + public static final boolean DEFAULT_IS_ENABLED = false; + public static final MillisecondBoundConfiguration DEFAULT_CDC_CONFIG_REFRESH_TIME = + MillisecondBoundConfiguration.parse("30s"); public static final SecondBoundConfiguration DEFAULT_SEGMENT_HARD_LINK_CACHE_EXPIRY = - SecondBoundConfiguration.parse("5m"); + SecondBoundConfiguration.parse("5m"); + protected boolean isEnabled; + protected MillisecondBoundConfiguration cdcConfigRefreshTime; protected SecondBoundConfiguration segmentHardLinkCacheExpiry; + public CdcConfigurationImpl() { this.segmentHardLinkCacheExpiry = DEFAULT_SEGMENT_HARD_LINK_CACHE_EXPIRY; + this.cdcConfigRefreshTime = DEFAULT_CDC_CONFIG_REFRESH_TIME; + this.isEnabled = DEFAULT_IS_ENABLED; } - public CdcConfigurationImpl(SecondBoundConfiguration segmentHardLinkCacheExpiry) + public CdcConfigurationImpl(boolean isEnabled, + MillisecondBoundConfiguration cdcConfigRefreshTime, + SecondBoundConfiguration segmentHardLinkCacheExpiry) Review Comment: I think you need to annotate with `@JsonCreator` and `@JsonProperty`. Can you add the ser/deser test to verify it is working? ########## server/src/test/java/org/apache/cassandra/sidecar/cdc/CdcConfigImplTest.java: ########## @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.cdc; + +import java.util.Map; +import java.util.concurrent.Callable; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.google.inject.util.Modules; +import org.apache.cassandra.sidecar.TestModule; +import org.apache.cassandra.sidecar.common.server.utils.MillisecondBoundConfiguration; +import org.apache.cassandra.sidecar.config.CdcConfiguration; +import org.apache.cassandra.sidecar.config.SchemaKeyspaceConfiguration; +import org.apache.cassandra.sidecar.db.CdcConfigAccessor; +import org.apache.cassandra.sidecar.db.KafkaConfigAccessor; +import org.apache.cassandra.sidecar.server.MainModule; +import org.apache.cassandra.sidecar.tasks.PeriodicTaskExecutor; +import org.apache.cassandra.sidecar.tasks.ScheduleDecision; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.RETURNS_DEEP_STUBS; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +class CdcConfigImplTest +{ + private PeriodicTaskExecutor executor; + + @BeforeEach + void setup() + { + Injector injector = Guice.createInjector(Modules.override(new MainModule()).with(new TestModule())); + executor = injector.getInstance(PeriodicTaskExecutor.class); + } Review Comment: By creating `PeriodicTaskExecutor`, guice creates `Vertx` internally, and many other objects. Can you just create the `PeriodicTaskExecutor` the same way as `org.apache.cassandra.sidecar.tasks.PeriodicTaskExecutorTest`? And remember to teardown the resources like `vertx` and `executorPools`. ########## server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcConfigImpl.java: ########## @@ -0,0 +1,337 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.sidecar.cdc; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.function.Supplier; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import com.google.inject.Inject; +import com.google.inject.Singleton; +import io.vertx.core.Promise; + +import org.apache.cassandra.sidecar.common.server.utils.DurationSpec; +import org.apache.cassandra.sidecar.config.CdcConfiguration; +import org.apache.cassandra.sidecar.config.SchemaKeyspaceConfiguration; +import org.apache.cassandra.sidecar.db.CdcConfigAccessor; +import org.apache.cassandra.sidecar.db.KafkaConfigAccessor; +import org.apache.cassandra.sidecar.tasks.PeriodicTask; +import org.apache.cassandra.sidecar.tasks.PeriodicTaskExecutor; +import org.apache.cassandra.sidecar.tasks.ScheduleDecision; +import org.jetbrains.annotations.NotNull; + +/** + * Implementation of the interface {@link CdcConfig}, an in-memory representation holding + * CDC and Kafka configurations from "configs" table inside sidecar internal keyspace. + */ +@Singleton +public class CdcConfigImpl implements CdcConfig +{ + private static final Logger LOGGER = LoggerFactory.getLogger(CdcConfigImpl.class); + private static final String CDC_CONFIG_DC_KEY = "dc"; + private static final String CDC_CONFIG_LOG_ONLY_KEY = "log_only"; + private static final String CDC_CONFIG_PERSIST_STATE_KEY = "persist_state"; + private static final String CDC_CONFIG_ENV_KEY = "env"; + private static final String KAFKA_CONFIG_TOPIC_KEY = "topic"; + private static final String KAFKA_FORMAT_TYPE_CONFIG_TOPIC_KEY = "topic_format_type"; + private static final String CDC_ENABLED_KEY = "cdc_enabled"; + private static final String KAFKA_CONFIG_JOB_ID_KEY = "jobId"; + private static final String WATERMARK_WINDOW_KEY = "watermark_seconds"; + private static final String MICROBATCH_DELAY_KEY = "microbatch_delay_millis"; + private static final String CDC_CONFIG_MAX_COMMIT_LOGS_KEY = "max_commit_logs"; + private static final String CDC_MAX_WATERMARKER_SIZE_KEY = "max_watermarker_size"; + private static final String CDC_FAIL_KAFKA_ERRORS = "fail_kafka_errors"; + private static final String CDC_FAIL_KAFKA_TOO_LARGE_ERRORS = "fail_kafka_too_large_errors"; + private static final String CDC_PERSIST_DELAY_MILLIS = "persist_delay_millis"; + + private final SchemaKeyspaceConfiguration schemaKeyspaceConfiguration; + private final CdcConfiguration cdcConfiguration; + private final CdcConfigAccessor cdcConfigAccessor; + private final KafkaConfigAccessor kafkaConfigAccessor; + private final List<Callable<?>> configChangeListeners = Collections.synchronizedList(new ArrayList<>()); Review Comment: It is a callable, but the result is not consumed anywhere. Can you change it to `ThrowingRunnable`? ########## server/src/main/java/org/apache/cassandra/sidecar/routes/cdc/ServiceConfigValidators.java: ########## @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.sidecar.routes.cdc; + +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import io.netty.handler.codec.http.HttpResponseStatus; +import io.vertx.core.json.JsonObject; +import io.vertx.ext.web.RoutingContext; + +import static org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException; + +/** + * Certain validations on payload and services have to be done before updating or deleting + * configs for services in "configs" table. {@link ServiceConfigValidators} has static + * utility methods for some of those validations. + */ +public class ServiceConfigValidators +{ + public static void verifyValidService(RoutingContext context, JsonObject payload) + { + final String requestService = context.pathParam(ConfigPayloadParams.SERVICE); + if (!Stream.of(ValidServices.values()).anyMatch(v -> v.serviceName.equals(requestService))) + { + final Set<String> services = Stream.of(ValidServices.values()).map(v -> v.serviceName).collect(Collectors.toSet()); + final String supportedServices = String.join(", ", services); + throw wrapHttpException(HttpResponseStatus.BAD_REQUEST, "Invalid service passed. Supported services: " + + supportedServices); + } + } + + public static void verifyValidConfig(RoutingContext context, JsonObject payload) + { + try + { + payload.getJsonObject(ConfigPayloadParams.CONFIG).getMap(); + } + catch (final ClassCastException ex) + { + throw wrapHttpException(HttpResponseStatus.BAD_REQUEST, "Invalid config passed"); + } + } + + public static void verifyValidPayload(RoutingContext context, JsonObject payload) Review Comment: `validatePayload` ########## server/src/main/java/org/apache/cassandra/sidecar/routes/cdc/ServiceConfigValidators.java: ########## @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.sidecar.routes.cdc; + +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import io.netty.handler.codec.http.HttpResponseStatus; +import io.vertx.core.json.JsonObject; +import io.vertx.ext.web.RoutingContext; + +import static org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException; + +/** + * Certain validations on payload and services have to be done before updating or deleting + * configs for services in "configs" table. {@link ServiceConfigValidators} has static + * utility methods for some of those validations. + */ +public class ServiceConfigValidators +{ + public static void verifyValidService(RoutingContext context, JsonObject payload) Review Comment: Once `ValidService` is renamed, please update the method name to `validateService` It is the validator. Its action is "validate", not "verify" ########## server/src/main/java/org/apache/cassandra/sidecar/routes/cdc/ValidServices.java: ########## @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.sidecar.routes.cdc; + +/** + * Enum representing various services inside config table in sidecar internal keyspace. + */ +public enum ValidServices Review Comment: nit: the `Valid` in the name does not provide much clarity. It makes me wondering if there is another set for "InvalidServices`. Can you rename it to just `Service`? (In the singular form) ########## server/src/main/java/org/apache/cassandra/sidecar/routes/cdc/ServiceConfigValidators.java: ########## @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.sidecar.routes.cdc; + +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import io.netty.handler.codec.http.HttpResponseStatus; +import io.vertx.core.json.JsonObject; +import io.vertx.ext.web.RoutingContext; + +import static org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException; + +/** + * Certain validations on payload and services have to be done before updating or deleting + * configs for services in "configs" table. {@link ServiceConfigValidators} has static + * utility methods for some of those validations. + */ +public class ServiceConfigValidators +{ + public static void verifyValidService(RoutingContext context, JsonObject payload) + { + final String requestService = context.pathParam(ConfigPayloadParams.SERVICE); + if (!Stream.of(ValidServices.values()).anyMatch(v -> v.serviceName.equals(requestService))) + { + final Set<String> services = Stream.of(ValidServices.values()).map(v -> v.serviceName).collect(Collectors.toSet()); + final String supportedServices = String.join(", ", services); + throw wrapHttpException(HttpResponseStatus.BAD_REQUEST, "Invalid service passed. Supported services: " + + supportedServices); + } + } + + public static void verifyValidConfig(RoutingContext context, JsonObject payload) Review Comment: rename to `validateConfig` ########## server/src/main/java/org/apache/cassandra/sidecar/routes/cdc/UpdateCdcConfigRequestValidationHandler.java: ########## @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.sidecar.routes.cdc; + +import com.google.inject.Singleton; +import io.vertx.core.Handler; +import io.vertx.core.json.JsonObject; +import io.vertx.ext.web.RoutingContext; + + +/** + * Updating a service config from "configs" table in sidecar keyspace should perform some + * validation checks before updating the config. + * {@link DeleteCdcConfigRequestValidationHandler} performs those validations before the + * update operation. + */ +@Singleton +public class UpdateCdcConfigRequestValidationHandler implements Handler<RoutingContext> +{ + @Override + public void handle(RoutingContext context) + { + final JsonObject payload = context.getBodyAsJson(); + ServiceConfigValidators.verifyValidPayload(context, payload); + ServiceConfigValidators.verifyValidService(context, payload); + ServiceConfigValidators.verifyValidConfig(context, payload); + context.next(); + } +} Review Comment: missing `requiredAuthorizations` ########## server/src/main/java/org/apache/cassandra/sidecar/routes/cdc/UpdateCdcConfigRequestValidationHandler.java: ########## @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.sidecar.routes.cdc; + +import com.google.inject.Singleton; +import io.vertx.core.Handler; +import io.vertx.core.json.JsonObject; +import io.vertx.ext.web.RoutingContext; + + +/** + * Updating a service config from "configs" table in sidecar keyspace should perform some + * validation checks before updating the config. + * {@link DeleteCdcConfigRequestValidationHandler} performs those validations before the Review Comment: I think you meant to have `{@link UpdateCdcConfigRequestValidationHandler}` ########## server/src/main/java/org/apache/cassandra/sidecar/routes/cdc/UpdateCdcConfigRequestValidationHandler.java: ########## @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.sidecar.routes.cdc; + +import com.google.inject.Singleton; +import io.vertx.core.Handler; +import io.vertx.core.json.JsonObject; +import io.vertx.ext.web.RoutingContext; + + +/** + * Updating a service config from "configs" table in sidecar keyspace should perform some + * validation checks before updating the config. + * {@link DeleteCdcConfigRequestValidationHandler} performs those validations before the + * update operation. + */ +@Singleton +public class UpdateCdcConfigRequestValidationHandler implements Handler<RoutingContext> Review Comment: As mentioned, you can inject the `ServiceConfigValidator` singleton. ########## server/src/main/java/org/apache/cassandra/sidecar/routes/cdc/DeleteCdcConfigRequestValidationHandler.java: ########## @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.sidecar.routes.cdc; + +import com.google.inject.Singleton; +import io.vertx.core.Handler; +import io.vertx.core.json.JsonObject; +import io.vertx.ext.web.RoutingContext; + + +/** + * Deleting a service config from "configs" table in sidecar keyspace should perform some + * validation checks before deleting the config. {@link DeleteCdcConfigRequestValidationHandler} + * performs those validations before the delete operation. + */ +@Singleton +public class DeleteCdcConfigRequestValidationHandler implements Handler<RoutingContext> +{ + @Override + public void handle(RoutingContext context) + { + final JsonObject payload = context.getBodyAsJson(); Review Comment: ```suggestion JsonObject payload = context.body().asJsonObject(); ``` ########## server/src/main/java/org/apache/cassandra/sidecar/routes/cdc/DeleteServiceConfigHandler.java: ########## @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.sidecar.routes.cdc; + + +import java.util.Collections; +import java.util.NoSuchElementException; +import java.util.Set; +import com.google.inject.Inject; +import com.google.inject.Singleton; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.vertx.core.Handler; +import io.vertx.ext.auth.authorization.Authorization; +import io.vertx.ext.web.RoutingContext; + +import org.apache.cassandra.sidecar.acl.authorization.BasicPermissions; +import org.apache.cassandra.sidecar.db.ConfigAccessor; +import org.apache.cassandra.sidecar.db.ConfigAccessorFactory; +import org.apache.cassandra.sidecar.routes.AccessProtected; +import static org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException; + +/** + * Provides REST endpoint for deleting CDC/Kafka configs + */ +@Singleton +public class DeleteServiceConfigHandler implements Handler<RoutingContext>, AccessProtected +{ + private final ConfigAccessorFactory configAccessorFactory; + + @Inject + public DeleteServiceConfigHandler(final ConfigAccessorFactory configAccessorFactory) + { + this.configAccessorFactory = configAccessorFactory; + } + + @Override + public void handle(RoutingContext context) + { + final String service = context.pathParam(ConfigPayloadParams.SERVICE); + final ConfigAccessor accessor = configAccessorFactory.getConfigAccessor(service); Review Comment: remove `final`s ########## server/src/main/java/org/apache/cassandra/sidecar/routes/cdc/DeleteServiceConfigHandler.java: ########## @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.sidecar.routes.cdc; + + +import java.util.Collections; +import java.util.NoSuchElementException; +import java.util.Set; +import com.google.inject.Inject; +import com.google.inject.Singleton; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.vertx.core.Handler; +import io.vertx.ext.auth.authorization.Authorization; +import io.vertx.ext.web.RoutingContext; + +import org.apache.cassandra.sidecar.acl.authorization.BasicPermissions; +import org.apache.cassandra.sidecar.db.ConfigAccessor; +import org.apache.cassandra.sidecar.db.ConfigAccessorFactory; +import org.apache.cassandra.sidecar.routes.AccessProtected; +import static org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException; + +/** + * Provides REST endpoint for deleting CDC/Kafka configs + */ +@Singleton +public class DeleteServiceConfigHandler implements Handler<RoutingContext>, AccessProtected +{ + private final ConfigAccessorFactory configAccessorFactory; + + @Inject + public DeleteServiceConfigHandler(final ConfigAccessorFactory configAccessorFactory) + { + this.configAccessorFactory = configAccessorFactory; + } + + @Override + public void handle(RoutingContext context) + { + final String service = context.pathParam(ConfigPayloadParams.SERVICE); + final ConfigAccessor accessor = configAccessorFactory.getConfigAccessor(service); + try + { + accessor.deleteConfig(); + } + catch (final NoSuchElementException ex) Review Comment: and the `final` here ########## server/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java: ########## @@ -350,6 +355,11 @@ public Router vertxRouter(Vertx vertx, AbortRestoreJobHandler abortRestoreJobHandler, CreateRestoreSliceHandler createRestoreSliceHandler, RestoreJobProgressHandler restoreJobProgressHandler, + UpdateServiceConfigHandler updateServiceConfigHandler, + UpdateCdcConfigRequestValidationHandler updateCDCConfigRequestValidationHandler, + DeleteServiceConfigHandler deleteServiceConfigHandler, + DeleteCdcConfigRequestValidationHandler deleteCDCConfigRequestValidationHandler, Review Comment: I do not see much value with those 2 additional validation handlers. They are not shared anywhere. It would make sense to just merge them into the corresponding handlers, and avoiding extract the body as json object multiple times. ########## server/src/main/java/org/apache/cassandra/sidecar/routes/cdc/GetServiceConfigHandler.java: ########## @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.sidecar.routes.cdc; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import com.google.inject.Inject; +import com.google.inject.Singleton; +import io.vertx.core.Handler; +import io.vertx.ext.auth.authorization.Authorization; +import io.vertx.ext.web.RoutingContext; +import org.apache.cassandra.sidecar.acl.authorization.BasicPermissions; +import org.apache.cassandra.sidecar.common.request.data.GetServicesConfigPayload; +import org.apache.cassandra.sidecar.db.ConfigAccessor; +import org.apache.cassandra.sidecar.db.ConfigAccessorFactory; +import org.apache.cassandra.sidecar.routes.AccessProtected; + +/** + * Provides REST endpoint for getting all the configs in "configs" table in sidecar internal keyspace. + */ +@Singleton +public class GetServiceConfigHandler implements Handler<RoutingContext>, AccessProtected +{ + private final ConfigAccessorFactory configAccessorFactory; + + @Inject + public GetServiceConfigHandler(final ConfigAccessorFactory configAccessorFactory) + { + this.configAccessorFactory = configAccessorFactory; + } + + @Override + public Set<Authorization> requiredAuthorizations() + { + return Collections.singleton(BasicPermissions.CDC.toAuthorization()); + } + + @Override + public void handle(RoutingContext context) + { + final List<GetServicesConfigPayload.Service> services = new ArrayList<>(); Review Comment: drop `final` ########## server/src/main/java/org/apache/cassandra/sidecar/routes/cdc/UpdateServiceConfigHandler.java: ########## @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.sidecar.routes.cdc; + +import java.util.Collections; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import com.google.inject.Inject; +import com.google.inject.Singleton; +import io.vertx.core.Handler; +import io.vertx.core.json.JsonObject; +import io.vertx.ext.auth.authorization.Authorization; +import io.vertx.ext.web.RoutingContext; +import org.apache.cassandra.sidecar.acl.authorization.BasicPermissions; +import org.apache.cassandra.sidecar.db.ConfigAccessor; +import org.apache.cassandra.sidecar.db.ConfigAccessorFactory; +import org.apache.cassandra.sidecar.routes.AccessProtected; + +/** + * Provides REST endpoint for updating CDC/Kafka configs. + */ +@Singleton +public class UpdateServiceConfigHandler implements Handler<RoutingContext>, AccessProtected +{ + private final ConfigAccessorFactory configAccessorFactory; + + @Inject + public UpdateServiceConfigHandler(final ConfigAccessorFactory configAccessorFactory) Review Comment: remove `final` ########## server/src/main/java/org/apache/cassandra/sidecar/db/ConfigAccessorImpl.java: ########## @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.sidecar.db; + +import java.util.Map; +import java.util.Optional; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import com.datastax.driver.core.BoundStatement; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Row; + +import org.apache.cassandra.sidecar.common.server.CQLSessionProvider; +import org.apache.cassandra.sidecar.db.schema.ConfigsSchema; +import org.apache.cassandra.sidecar.db.schema.SidecarSchema; +import org.apache.cassandra.sidecar.routes.cdc.ValidServices; +import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; + +/** + * Configurations for CDC feature are stored inside a table "config" in an internal sidecar keyspace. + * {@link ConfigAccessorImpl} is an accessor for the above-mentioned table and encapsulates database + * access operations of the "config" table. + */ +public abstract class ConfigAccessorImpl extends DatabaseAccessor<ConfigsSchema> implements ConfigAccessor +{ + private static final Logger LOGGER = LoggerFactory.getLogger(ConfigAccessorImpl.class); + private final ValidServices service = service(); + private final SidecarSchema sidecarSchema; + + protected ConfigAccessorImpl(InstanceMetadataFetcher instanceMetadataFetcher, + ConfigsSchema configsSchema, + CQLSessionProvider sessionProvider, + SidecarSchema sidecarSchema) + { + super(configsSchema, sessionProvider); + this.sidecarSchema = sidecarSchema; + } + + public abstract ValidServices service(); + + @Override + public ServiceConfig getConfig() + { + sidecarSchema.ensureInitialized(); + BoundStatement statement = tableSchema + .selectConfig() + .bind(service.serviceName); + Row row = execute(statement).one(); + if (row == null || row.isNull(0)) + { + LOGGER.debug(String.format("No %s configs are present in the table C* table", service.serviceName)); + return new ServiceConfig(Map.of()); + } + return ServiceConfig.from(row); + } + + @Override + public ServiceConfig storeConfig(Map<String, String> config) + { + sidecarSchema.ensureInitialized(); + BoundStatement statement = tableSchema + .insertConfig() + .bind(service.serviceName, config); + execute(statement); + return new ServiceConfig(config); + } + + @Override + public Optional<ServiceConfig> storeConfigIfNotExists(Map<String, String> config) Review Comment: Not used anywhere. Is there a mistake? If it is no longer needed, can you delete the method as well as the relevant fields in the schema class. ########## server/src/main/java/org/apache/cassandra/sidecar/db/schema/ConfigsSchema.java: ########## @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.sidecar.db.schema; + +import com.datastax.driver.core.KeyspaceMetadata; +import com.datastax.driver.core.Metadata; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.Session; +import com.google.inject.Inject; +import com.google.inject.Singleton; +import org.apache.cassandra.sidecar.config.SchemaKeyspaceConfiguration; +import org.apache.cassandra.sidecar.config.ServiceConfiguration; +import org.jetbrains.annotations.NotNull; + +/** + * {@link ConfigsSchema} holds all prepared statements needed for talking to Cassandra for various actions + * related to storing and updating CDC and Kafka configs. + */ +@Singleton +public class ConfigsSchema extends TableSchema +{ + private static final String CONFIGS_TABLE_NAME = "configs"; + private final SchemaKeyspaceConfiguration keyspaceConfig; + + // prepared statements + private PreparedStatement selectConfig; + private PreparedStatement insertConfig; + private PreparedStatement insertConfigIfNotExist; + private PreparedStatement deleteConfig; + + @Inject + public ConfigsSchema(ServiceConfiguration configuration) + { + this.keyspaceConfig = configuration.schemaKeyspaceConfiguration(); + } + + @Override + protected String keyspaceName() + { + return keyspaceConfig.keyspace(); + } + + @Override + protected void prepareStatements(@NotNull Session session) + { + selectConfig = prepare(selectConfig, session, CqlLiterals.selectConfig(keyspaceConfig)); + insertConfig = prepare(insertConfig, session, CqlLiterals.insertConfig(keyspaceConfig)); + insertConfigIfNotExist = prepare(insertConfigIfNotExist, session, CqlLiterals.insertConfigIfNotExist(keyspaceConfig)); + deleteConfig = prepare(deleteConfig, session, CqlLiterals.deleteConfig(keyspaceConfig)); + } + + @Override + protected String tableName() + { + return CONFIGS_TABLE_NAME; + } + + @Override + protected boolean exists(@NotNull Metadata metadata) + { + KeyspaceMetadata ksMetadata = metadata.getKeyspace(keyspaceConfig.keyspace()); + if (ksMetadata == null) + return false; + return ksMetadata.getTable(CONFIGS_TABLE_NAME) != null; + } + + @Override + protected String createSchemaStatement() + { + return String.format("CREATE TABLE IF NOT EXISTS %s.%s (" + + " service text," + + " config map<text, text>," + + " PRIMARY KEY (service))", + keyspaceConfig.keyspace(), CONFIGS_TABLE_NAME); + } + + public PreparedStatement selectConfig() + { + return selectConfig; + } + + public PreparedStatement insertConfig() + { + return insertConfig; + } + + public PreparedStatement insertConfigIfNotExists() + { + return insertConfig; Review Comment: Wrong object returned ########## server/src/main/java/org/apache/cassandra/sidecar/db/ConfigAccessorFactory.java: ########## @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.sidecar.db; + +import com.google.inject.Inject; +import org.apache.cassandra.sidecar.routes.cdc.ValidServices; + +/** + * Factory for creating config objects based on the service name. + */ +public class ConfigAccessorFactory +{ + private final KafkaConfigAccessor kafkaConfigAccessor; + private final CdcConfigAccessor cdcConfigAccessor; + + @Inject + public ConfigAccessorFactory(KafkaConfigAccessor kafkaConfigAccessor, + CdcConfigAccessor cdcConfigAccessor) + { + this.kafkaConfigAccessor = kafkaConfigAccessor; + this.cdcConfigAccessor = cdcConfigAccessor; + } + + public ConfigAccessor getConfigAccessor(final String service) + { + if (service.equals(ValidServices.KAFKA.serviceName)) + { + return kafkaConfigAccessor; + } + + if (service.equals(ValidServices.CDC.serviceName)) + { + return cdcConfigAccessor; + } + + throw new RuntimeException("Couldn't find a db accessor for service " + service); Review Comment: Can you create a map and do the look up? `configAccessors.get(service.toLowerCase())`. If the result it null, it throws. ########## server/src/main/java/org/apache/cassandra/sidecar/db/ConfigAccessorImpl.java: ########## @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.sidecar.db; + +import java.util.Map; +import java.util.Optional; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import com.datastax.driver.core.BoundStatement; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Row; + +import org.apache.cassandra.sidecar.common.server.CQLSessionProvider; +import org.apache.cassandra.sidecar.db.schema.ConfigsSchema; +import org.apache.cassandra.sidecar.db.schema.SidecarSchema; +import org.apache.cassandra.sidecar.routes.cdc.ValidServices; +import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; + +/** + * Configurations for CDC feature are stored inside a table "config" in an internal sidecar keyspace. + * {@link ConfigAccessorImpl} is an accessor for the above-mentioned table and encapsulates database + * access operations of the "config" table. + */ +public abstract class ConfigAccessorImpl extends DatabaseAccessor<ConfigsSchema> implements ConfigAccessor +{ + private static final Logger LOGGER = LoggerFactory.getLogger(ConfigAccessorImpl.class); + private final ValidServices service = service(); + private final SidecarSchema sidecarSchema; + + protected ConfigAccessorImpl(InstanceMetadataFetcher instanceMetadataFetcher, + ConfigsSchema configsSchema, + CQLSessionProvider sessionProvider, + SidecarSchema sidecarSchema) + { + super(configsSchema, sessionProvider); + this.sidecarSchema = sidecarSchema; + } + + public abstract ValidServices service(); + + @Override + public ServiceConfig getConfig() + { + sidecarSchema.ensureInitialized(); + BoundStatement statement = tableSchema + .selectConfig() + .bind(service.serviceName); + Row row = execute(statement).one(); + if (row == null || row.isNull(0)) + { + LOGGER.debug(String.format("No %s configs are present in the table C* table", service.serviceName)); + return new ServiceConfig(Map.of()); + } + return ServiceConfig.from(row); + } + + @Override + public ServiceConfig storeConfig(Map<String, String> config) + { + sidecarSchema.ensureInitialized(); + BoundStatement statement = tableSchema + .insertConfig() + .bind(service.serviceName, config); + execute(statement); + return new ServiceConfig(config); + } + + @Override + public Optional<ServiceConfig> storeConfigIfNotExists(Map<String, String> config) + { + sidecarSchema.ensureInitialized(); + BoundStatement statement = tableSchema + .insertConfigIfNotExists() + .bind(service.serviceName, config); + ResultSet resultSet = execute(statement); + if (resultSet.wasApplied()) + { + return Optional.of(new ServiceConfig(config)); + } + return Optional.empty(); + } + + @Override + public void deleteConfig() + { + sidecarSchema.ensureInitialized(); + BoundStatement deleteStatement = tableSchema + .deleteConfig() + .bind(service.serviceName); + execute(deleteStatement); Review Comment: align the chained invocations here and elsewhere in this file. ########## server/src/main/java/org/apache/cassandra/sidecar/db/ConfigAccessor.java: ########## @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.sidecar.db; + +import java.util.Map; +import java.util.Optional; + +/** + * CDC configs are stored inside "configs" table of sidecar keyspace. This is an interface for + * database accessor of "configs" table + */ +public interface ConfigAccessor +{ + ServiceConfig getConfig(); + + ServiceConfig storeConfig(final Map<String, String> config); + + Optional<ServiceConfig> storeConfigIfNotExists(final Map<String, String> config); + + void deleteConfig(); + + boolean isSchemaInitialized(); Review Comment: javadoc -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org