frankgh commented on code in PR #193: URL: https://github.com/apache/cassandra-sidecar/pull/193#discussion_r1980038873
########## client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java: ########## @@ -121,6 +121,12 @@ public final class ApiEndpointsV1 public static final String LIST_CDC_SEGMENTS_ROUTE = API_V1 + CDC_PATH + "/segments"; public static final String STREAM_CDC_SEGMENTS_ROUTE = LIST_CDC_SEGMENTS_ROUTE + "/" + SEGMENT_PATH_PARAM; + public static final String SERVICES_PATH = "/services"; + public static final String SERVICE_PARAM = ":service"; + public static final String CONFIG = "/config"; + public static final String SERVICE_CONFIG_ROUTE = API_V1 + SERVICES_PATH + SERVICE_PARAM + CONFIG; + public static final String GET_SERVICES_CONFIG_ROUTE = API_V1 + SERVICES_PATH; Review Comment: NIT ```suggestion public static final String SERVICES_CONFIG_ROUTE = API_V1 + SERVICES_PATH; ``` ########## client-common/src/main/java/org/apache/cassandra/sidecar/common/request/GetServiceConfigRequest.java: ########## @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.sidecar.common.request; + +import io.netty.handler.codec.http.HttpMethod; +import org.apache.cassandra.sidecar.common.request.data.GetServicesConfigPayload; + +/** + * Represents a request for getting configurations for services from "configs" table inside + * sidecar internal keyspace. + */ +public class GetServiceConfigRequest extends JsonRequest<GetServicesConfigPayload> +{ + /** + * Constructs a Sidecar request with the given {@code requestURI}. Defaults to {@code ssl} enabled. + */ + public GetServiceConfigRequest() + { + super("/api/v1/services"); Review Comment: Use the constant here? ```suggestion super(ApiEndpointsV1.SERVICES_CONFIG_ROUTE); ``` ########## client-common/src/main/java/org/apache/cassandra/sidecar/common/request/GetServiceConfigRequest.java: ########## @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.sidecar.common.request; + +import io.netty.handler.codec.http.HttpMethod; +import org.apache.cassandra.sidecar.common.request.data.GetServicesConfigPayload; + +/** + * Represents a request for getting configurations for services from "configs" table inside + * sidecar internal keyspace. + */ +public class GetServiceConfigRequest extends JsonRequest<GetServicesConfigPayload> Review Comment: NIT ```suggestion public class AllServicesConfigRequest extends JsonRequest<GetServicesConfigPayload> ``` ########## client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java: ########## @@ -544,6 +550,33 @@ public void streamCdcSegments(SidecarInstance sidecarInstance, .build(), streamConsumer); } + /** + * Get configs for all the services in the "configs" table inside sidecar's internal + * keyspace + * + * @return List of services and their corresponding configs + */ + public CompletableFuture<GetServicesConfigPayload> getServiceConfig() Review Comment: NIT: ```suggestion public CompletableFuture<GetServicesConfigPayload> allServicesConfig() ``` ########## client-common/src/main/java/org/apache/cassandra/sidecar/common/request/ServiceConfig.java: ########## @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.sidecar.common.request; + +/** + * "configs" table inside internal sidecar keyspace stores configs for Kafka and CDC. ServiceConfig identifies + * the service of the configs. + */ +public enum ServiceConfig +{ + CDC("cdc"), + KAFKA("kafka"); + + private final String serviceConfig; + + ServiceConfig(String serviceConfig) + { + this.serviceConfig = serviceConfig; + } + + public String getServiceConfig() Review Comment: NIT: ```suggestion public String serviceConfig() ``` ########## client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java: ########## @@ -544,6 +550,33 @@ public void streamCdcSegments(SidecarInstance sidecarInstance, .build(), streamConsumer); } + /** + * Get configs for all the services in the "configs" table inside sidecar's internal + * keyspace + * + * @return List of services and their corresponding configs + */ + public CompletableFuture<GetServicesConfigPayload> getServiceConfig() + { + return executor.executeRequestAsync(requestBuilder() + .request(new GetServiceConfigRequest()) + .build()); + } + + /** + * Update config for a given service in "configs" table in internal sidecar keyspace + * + * @param serviceConfig service for which the configs are being updated + * @param config the updated config + * @return updated config + */ + public CompletableFuture<PutCdcServiceConfigPayload> putCdcServiceConfig(ServiceConfig serviceConfig, Map<String, String> config) Review Comment: NIT: ```suggestion public CompletableFuture<PutCdcServiceConfigPayload> updateServiceConfig(ServiceConfig serviceConfig, Map<String, String> config) ``` ########## server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcConfigImpl.java: ########## @@ -0,0 +1,362 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.sidecar.cdc; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import com.google.inject.Inject; +import com.google.inject.Singleton; +import io.vertx.core.Promise; + +import org.apache.cassandra.sidecar.common.server.ThrowingRunnable; +import org.apache.cassandra.sidecar.common.server.utils.DurationSpec; +import org.apache.cassandra.sidecar.common.server.utils.MillisecondBoundConfiguration; +import org.apache.cassandra.sidecar.common.server.utils.MinuteBoundConfiguration; +import org.apache.cassandra.sidecar.config.CdcConfiguration; +import org.apache.cassandra.sidecar.config.SchemaKeyspaceConfiguration; +import org.apache.cassandra.sidecar.db.CdcConfigAccessor; +import org.apache.cassandra.sidecar.db.KafkaConfigAccessor; +import org.apache.cassandra.sidecar.tasks.PeriodicTask; +import org.apache.cassandra.sidecar.tasks.PeriodicTaskExecutor; +import org.apache.cassandra.sidecar.tasks.ScheduleDecision; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +/** + * Implementation of the interface {@link CdcConfig}, an in-memory representation holding + * CDC and Kafka configurations from "configs" table inside sidecar internal keyspace. + */ +@Singleton +public class CdcConfigImpl implements CdcConfig +{ + private static final Logger LOGGER = LoggerFactory.getLogger(CdcConfigImpl.class); + private static final int DEFAULT_MAX_WATERMARKER_SIZE = 400000; + private static final String DEFAULT_JOB_ID = "test-job-id"; + private static final int DEFAULT_MAX_COMMITLOGS_PER_INSTANCE = 4; + private static final int DEFAULT_MAX_RECORD_BYTE_SIZE = -1; + private final SchemaKeyspaceConfiguration schemaKeyspaceConfiguration; + private final CdcConfiguration cdcConfiguration; + private final CdcConfigAccessor cdcConfigAccessor; + private final KafkaConfigAccessor kafkaConfigAccessor; + private final List<ThrowingRunnable> configChangeListeners = Collections.synchronizedList(new ArrayList<>()); + private final ConfigRefreshNotifier configRefreshNotifier; + private volatile Map<String, String> kafkaConfigMappings = Map.of(); + private volatile Map<String, String> cdcConfigMappings = Map.of(); + + @Inject + public CdcConfigImpl(CdcConfiguration cdcConfiguration, + SchemaKeyspaceConfiguration schemaKeyspaceConfiguration, + CdcConfigAccessor cdcConfigAccessor, + KafkaConfigAccessor kafkaConfigAccessor, + PeriodicTaskExecutor periodicTaskExecutor) + { + this.schemaKeyspaceConfiguration = schemaKeyspaceConfiguration; + this.cdcConfiguration = cdcConfiguration; + this.cdcConfigAccessor = cdcConfigAccessor; + this.kafkaConfigAccessor = kafkaConfigAccessor; + + if (this.schemaKeyspaceConfiguration.isEnabled()) + { + this.configRefreshNotifier = new ConfigRefreshNotifier(); + periodicTaskExecutor.schedule(configRefreshNotifier); + } + else + { + this.configRefreshNotifier = null; + } + } + + @Override + public Map<String, Object> kafkaConfigs() + { + Map<String, Object> kafkaConfigs = new HashMap<>(); + kafkaConfigs.putAll(getAuthConfigs()); + kafkaConfigs.putAll(kafkaConfigMappings); + return ImmutableMap.copyOf(kafkaConfigs); + } + + @Override + public Map<String, Object> cdcConfigs() + { + return ImmutableMap.copyOf(cdcConfigMappings); + } + + @Override + public boolean isConfigReady() + { + return cdcConfigAccessor.isSchemaInitialized() + && !kafkaConfigMappings.isEmpty() + && !cdcConfigMappings.isEmpty(); + } + + @Override + public String kafkaTopic() + { + return cdcConfigMappings.getOrDefault(ConfigKeys.TOPIC.lowcaseName, null); Review Comment: NIT, there's no need to default to null here ```suggestion return cdcConfigMappings.get(ConfigKeys.TOPIC.lowcaseName); ``` ########## server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcConfig.java: ########## @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.sidecar.cdc; + +import java.util.Map; +import org.apache.cassandra.sidecar.common.server.utils.MillisecondBoundConfiguration; +import org.apache.cassandra.sidecar.common.server.utils.MinuteBoundConfiguration; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +/** + * In memory representation of CDC and Kafka configurations from "configs" table in sidecar keyspace. + */ +public interface CdcConfig +{ + /** + * @return returns environment of the cassandra cluster. This config could be optional + */ + String env(); + + /** + * @return returns kafka topic for the mutations to be published + */ + @Nullable + String kafkaTopic(); + + /** + * @return returns topic formats + */ + @NotNull + TopicFormatType topicFormat(); + + /** + * @return returns if CDC is enabled or not + */ + boolean cdcEnabled(); + + /** + * @return returns unique global identifier for CDC job, CDC state is associated with job-id + */ + String jobId(); + + /** + * @return returns configurations of the kafka for the mutations to be published. + */ + Map<String, Object> kafkaConfigs(); + + /** + * @return returns CDC configurations as a map + */ + Map<String, Object> cdcConfigs(); + + /** + * @return if logOnly config is set, mutations will not be published to kafka, instead they + * would be logged, this would be useful for debugging and running sidecar application locally + */ + boolean logOnly(); + + /** + * @return returns the data center, this config could be optional + */ + String dc(); Review Comment: NIT: for consistency with the codebase ```suggestion String datacenter(); ``` ########## server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcConfigImpl.java: ########## @@ -0,0 +1,362 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.sidecar.cdc; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import com.google.inject.Inject; +import com.google.inject.Singleton; +import io.vertx.core.Promise; + +import org.apache.cassandra.sidecar.common.server.ThrowingRunnable; +import org.apache.cassandra.sidecar.common.server.utils.DurationSpec; +import org.apache.cassandra.sidecar.common.server.utils.MillisecondBoundConfiguration; +import org.apache.cassandra.sidecar.common.server.utils.MinuteBoundConfiguration; +import org.apache.cassandra.sidecar.config.CdcConfiguration; +import org.apache.cassandra.sidecar.config.SchemaKeyspaceConfiguration; +import org.apache.cassandra.sidecar.db.CdcConfigAccessor; +import org.apache.cassandra.sidecar.db.KafkaConfigAccessor; +import org.apache.cassandra.sidecar.tasks.PeriodicTask; +import org.apache.cassandra.sidecar.tasks.PeriodicTaskExecutor; +import org.apache.cassandra.sidecar.tasks.ScheduleDecision; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +/** + * Implementation of the interface {@link CdcConfig}, an in-memory representation holding + * CDC and Kafka configurations from "configs" table inside sidecar internal keyspace. + */ +@Singleton +public class CdcConfigImpl implements CdcConfig +{ + private static final Logger LOGGER = LoggerFactory.getLogger(CdcConfigImpl.class); + private static final int DEFAULT_MAX_WATERMARKER_SIZE = 400000; + private static final String DEFAULT_JOB_ID = "test-job-id"; + private static final int DEFAULT_MAX_COMMITLOGS_PER_INSTANCE = 4; + private static final int DEFAULT_MAX_RECORD_BYTE_SIZE = -1; + private final SchemaKeyspaceConfiguration schemaKeyspaceConfiguration; + private final CdcConfiguration cdcConfiguration; + private final CdcConfigAccessor cdcConfigAccessor; + private final KafkaConfigAccessor kafkaConfigAccessor; + private final List<ThrowingRunnable> configChangeListeners = Collections.synchronizedList(new ArrayList<>()); + private final ConfigRefreshNotifier configRefreshNotifier; + private volatile Map<String, String> kafkaConfigMappings = Map.of(); + private volatile Map<String, String> cdcConfigMappings = Map.of(); Review Comment: should we use case insensitive maps here? (i.e TreeMap) to avoid having to lowercase during access? ########## server/src/main/java/org/apache/cassandra/sidecar/db/ConfigAccessor.java: ########## @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.sidecar.db; + +import java.util.Map; +import java.util.Optional; + +/** + * CDC configs are stored inside "configs" table of sidecar keyspace. This is an interface for + * database accessor of "configs" table + */ +public interface ConfigAccessor +{ + /** + * Gets the configs of a service + * @return returns configs for the current service + */ + ServiceConfig getConfig(); + + /** + * Persists configs into the "configs" table for the current service + * + * @param config configs to be persisted + * @return returns updated configs + */ + ServiceConfig storeConfig(final Map<String, String> config); + + /** + * Stores configs of the current service if they are not already present + * + * @param config new configs + * @return updated configs + */ + Optional<ServiceConfig> storeConfigIfNotExists(final Map<String, String> config); Review Comment: unused, can it be removed? ########## server/src/main/java/org/apache/cassandra/sidecar/db/ConfigAccessor.java: ########## @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.sidecar.db; + +import java.util.Map; +import java.util.Optional; + +/** + * CDC configs are stored inside "configs" table of sidecar keyspace. This is an interface for + * database accessor of "configs" table + */ +public interface ConfigAccessor +{ + /** + * Gets the configs of a service + * @return returns configs for the current service + */ + ServiceConfig getConfig(); + + /** + * Persists configs into the "configs" table for the current service + * + * @param config configs to be persisted + * @return returns updated configs + */ + ServiceConfig storeConfig(final Map<String, String> config); + + /** + * Stores configs of the current service if they are not already present + * + * @param config new configs + * @return updated configs + */ + Optional<ServiceConfig> storeConfigIfNotExists(final Map<String, String> config); + + /** + * Deletes configs for the given service + */ + void deleteConfig(); + + /** + * Checks if the schema for configs table is initialized + * @return + */ + boolean isSchemaInitialized(); Review Comment: this method is actually not needed anymore. We could instead override the `org.apache.cassandra.sidecar.db.DatabaseAccessor#isAvailable` in `org.apache.cassandra.sidecar.db.ConfigAccessorImpl` ########## server/src/main/java/org/apache/cassandra/sidecar/db/ConfigAccessorImpl.java: ########## @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.sidecar.db; + +import java.util.Map; +import java.util.Optional; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import com.datastax.driver.core.BoundStatement; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Row; + +import org.apache.cassandra.sidecar.common.server.CQLSessionProvider; +import org.apache.cassandra.sidecar.db.schema.ConfigsSchema; +import org.apache.cassandra.sidecar.db.schema.SidecarSchema; +import org.apache.cassandra.sidecar.routes.cdc.Service; + +/** + * Configurations for CDC feature are stored inside a table "config" in an internal sidecar keyspace. + * {@link ConfigAccessorImpl} is an accessor for the above-mentioned table and encapsulates database + * access operations of the "config" table. + */ +public abstract class ConfigAccessorImpl extends DatabaseAccessor<ConfigsSchema> implements ConfigAccessor +{ + private static final Logger LOGGER = LoggerFactory.getLogger(ConfigAccessorImpl.class); + private final Service service = service(); + private final SidecarSchema sidecarSchema; + + protected ConfigAccessorImpl(ConfigsSchema configsSchema, + CQLSessionProvider sessionProvider, + SidecarSchema sidecarSchema) + { + super(configsSchema, sessionProvider); + this.sidecarSchema = sidecarSchema; + } + + public abstract Service service(); + + @Override + public ServiceConfig getConfig() + { + sidecarSchema.ensureInitialized(); + BoundStatement statement = tableSchema.selectConfig() + .bind(service.serviceName); + Row row = execute(statement).one(); + if (row == null || row.isNull(0)) + { + LOGGER.debug(String.format("No %s configs are present in the table C* table", service.serviceName)); + return new ServiceConfig(Map.of()); + } + return ServiceConfig.from(row); + } + + @Override + public ServiceConfig storeConfig(Map<String, String> config) + { + sidecarSchema.ensureInitialized(); + BoundStatement statement = tableSchema.insertConfig() + .bind(service.serviceName, config); + execute(statement); + return new ServiceConfig(config); + } + + @Override + public Optional<ServiceConfig> storeConfigIfNotExists(Map<String, String> config) + { + sidecarSchema.ensureInitialized(); + BoundStatement statement = tableSchema.insertConfigIfNotExists() + .bind(service.serviceName, config); + ResultSet resultSet = execute(statement); + if (resultSet.wasApplied()) + { + return Optional.of(new ServiceConfig(config)); + } + return Optional.empty(); + } + + @Override + public void deleteConfig() + { + sidecarSchema.ensureInitialized(); + BoundStatement deleteStatement = tableSchema.deleteConfig() + .bind(service.serviceName); + execute(deleteStatement); + } + + @Override + public boolean isSchemaInitialized() + { + return sidecarSchema.isInitialized(); + } Review Comment: override isAvailable instead ```suggestion @Override public boolean isAvailable() { return super.isAvailable() && sidecarSchema.isInitialized(); } ``` ########## server/src/main/java/org/apache/cassandra/sidecar/db/ServiceConfig.java: ########## @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.sidecar.db; + +import java.util.HashMap; +import java.util.Map; +import com.google.common.collect.ImmutableMap; +import com.datastax.driver.core.Row; +import org.jetbrains.annotations.Nullable; + +/** + * In-memory representation of service configs stored in "configs" table in C* + */ +public class ServiceConfig +{ + private final Map<String, String> serviceConfig; + + public ServiceConfig() + { + this.serviceConfig = new HashMap<>(); + } + + public ServiceConfig(Map<String, String> serviceConfig) + { + this.serviceConfig = serviceConfig; Review Comment: ```suggestion this.serviceConfig = Map.copyOf(serviceConfig); ``` ########## server/src/main/java/org/apache/cassandra/sidecar/db/ServiceConfig.java: ########## @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.sidecar.db; + +import java.util.HashMap; +import java.util.Map; +import com.google.common.collect.ImmutableMap; +import com.datastax.driver.core.Row; +import org.jetbrains.annotations.Nullable; + +/** + * In-memory representation of service configs stored in "configs" table in C* + */ +public class ServiceConfig +{ + private final Map<String, String> serviceConfig; + + public ServiceConfig() + { + this.serviceConfig = new HashMap<>(); + } + + public ServiceConfig(Map<String, String> serviceConfig) + { + this.serviceConfig = serviceConfig; + } + + public static ServiceConfig from(@Nullable Row row) + { + if (row == null || row.isNull(0)) + { + return new ServiceConfig(); + } + Map<String, String> configMap = row.getMap("config", String.class, String.class); + return new ServiceConfig(configMap); + } + + public Map<String, String> getConfigs() + { + return ImmutableMap.copyOf(serviceConfig); Review Comment: NIT: prefer JDK apis ```suggestion return Map.copyOf(serviceConfig); ``` ########## server/src/main/java/org/apache/cassandra/sidecar/routes/cdc/Service.java: ########## @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.sidecar.routes.cdc; + +import java.util.Locale; + +/** + * Enum representing various services inside config table in sidecar internal keyspace. + */ +public enum Service Review Comment: do we really need to have this enum as well as `org.apache.cassandra.sidecar.common.request.ServiceConfig` which are practically the same? can we consolidate? ########## client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java: ########## @@ -121,6 +121,12 @@ public final class ApiEndpointsV1 public static final String LIST_CDC_SEGMENTS_ROUTE = API_V1 + CDC_PATH + "/segments"; public static final String STREAM_CDC_SEGMENTS_ROUTE = LIST_CDC_SEGMENTS_ROUTE + "/" + SEGMENT_PATH_PARAM; + public static final String SERVICES_PATH = "/services"; + public static final String SERVICE_PARAM = ":service"; + public static final String CONFIG = "/config"; + public static final String SERVICE_CONFIG_ROUTE = API_V1 + SERVICES_PATH + SERVICE_PARAM + CONFIG; Review Comment: There's a bug here ```suggestion public static final String SERVICE_CONFIG_ROUTE = API_V1 + SERVICES_PATH + "/" + SERVICE_PARAM + CONFIG;; ``` ########## client-common/src/main/java/org/apache/cassandra/sidecar/common/request/data/GetServicesConfigPayload.java: ########## @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.sidecar.common.request.data; + +import java.util.List; +import java.util.Map; +import java.util.Objects; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + * A class representing a response for the GetServicesConfig api "/api/v1/services" which contains + * configs for all the services in the "configs" table. + */ +public class GetServicesConfigPayload Review Comment: NIT ```suggestion public class AllServicesConfigPayload ``` ########## client-common/src/main/java/org/apache/cassandra/sidecar/common/request/PutServiceConfigRequest.java: ########## @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.sidecar.common.request; + +import io.netty.handler.codec.http.HttpMethod; +import org.apache.cassandra.sidecar.common.request.data.PutCdcServiceConfigPayload; + +/** + * Request class for updating the config of a given service. + * Ex: updating kafka configs or updating cdc configs + */ +public class PutServiceConfigRequest extends JsonRequest<PutCdcServiceConfigPayload> +{ + final PutCdcServiceConfigPayload payload; + + public PutServiceConfigRequest(ServiceConfig serviceConfig, PutCdcServiceConfigPayload payload) + { + super(String.format("/api/v1/services/%s/config", serviceConfig.getServiceConfig())); Review Comment: Use constant here? ```suggestion super(ApiEndpointsV1.SERVICE_CONFIG_ROUTE.replaceAll(ApiEndpointsV1.SERVICE_PARAM, serviceConfig.serviceConfig())); ``` ########## client-common/src/main/java/org/apache/cassandra/sidecar/common/request/data/PutCdcServiceConfigPayload.java: ########## @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.sidecar.common.request.data; + +import java.util.Map; +import java.util.Objects; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + * A class representing a response for the {@link PutCdcServiceConfigPayload}. + */ +public class PutCdcServiceConfigPayload Review Comment: Is this CDC-specific, or can it be used to update Kafka configurations? Also, NIT: ```suggestion public class UpdateServiceConfigPayload ``` ########## client-common/src/main/java/org/apache/cassandra/sidecar/common/request/PutServiceConfigRequest.java: ########## @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.sidecar.common.request; + +import io.netty.handler.codec.http.HttpMethod; +import org.apache.cassandra.sidecar.common.request.data.PutCdcServiceConfigPayload; + +/** + * Request class for updating the config of a given service. + * Ex: updating kafka configs or updating cdc configs + */ +public class PutServiceConfigRequest extends JsonRequest<PutCdcServiceConfigPayload> Review Comment: NIT: ```suggestion public class UpdateServiceConfigRequest extends JsonRequest<PutCdcServiceConfigPayload> ``` ########## client-common/src/main/java/org/apache/cassandra/sidecar/common/request/data/GetServicesConfigPayload.java: ########## @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.sidecar.common.request.data; + +import java.util.List; +import java.util.Map; +import java.util.Objects; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + * A class representing a response for the GetServicesConfig api "/api/v1/services" which contains + * configs for all the services in the "configs" table. Review Comment: NIT ```suggestion * configurations for all the services in the "configs" table. ``` ########## client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java: ########## @@ -1624,6 +1629,44 @@ public void onError(Throwable throwable) assertThat(new String(baos.toByteArray(), StandardCharsets.UTF_8)).isEqualTo("Test Content"); } + @Test + public void testsGetServiceSuccessTests() throws IOException, ExecutionException, InterruptedException + { + List<GetServicesConfigPayload.Service> services = new ArrayList<>(); + Map<String, String> kafkaConfigs = new HashMap<>(); + kafkaConfigs.put("k1", "v1"); + kafkaConfigs.put("k2", "v2"); + Map<String, String> cdcConfigs = new HashMap<>(); + cdcConfigs.put("k1", "v1"); + cdcConfigs.put("k2", "v2"); + services.add(new GetServicesConfigPayload.Service("kafka", kafkaConfigs)); + services.add(new GetServicesConfigPayload.Service("cdc", cdcConfigs)); + GetServicesConfigPayload expectedResponse = new GetServicesConfigPayload(services); + + MockResponse response = new MockResponse(); + response.setResponseCode(200); + response.setHeader("content-type", "application/json"); + ObjectMapper mapper = new ObjectMapper(); + response.setBody(mapper.writeValueAsString(expectedResponse)); + enqueue(response); + assertThat(client.getServiceConfig().get()).isEqualTo(expectedResponse); + } + + @Test + public void testsPutCdcServiceSuccessTests() throws IOException, ExecutionException, InterruptedException + { + Map<String, String> payload = new HashMap<>(); + payload.put("testKey", "testValue"); + PutCdcServiceConfigPayload putResponse = new PutCdcServiceConfigPayload(payload); + MockResponse response = new MockResponse(); + response.setResponseCode(200); + response.setHeader("content-type", "application/json"); + ObjectMapper mapper = new ObjectMapper(); + response.setBody(mapper.writeValueAsString(putResponse)); + enqueue(response); + assertThat(client.putCdcServiceConfig(ServiceConfig.CDC, payload).get()).isEqualTo(putResponse); Review Comment: also, I think it's worth asserting the PUT here ########## server/src/test/resources/config/sidecar_cdc.yaml: ########## @@ -58,6 +58,8 @@ sidecar: max_execution_time: 15m # 15 minutes cdc: segment_hardlink_cache_expiry: 1m # 1 minute + is_enabled: true Review Comment: for consistency let's use `enabled` instead. Also this should be included in `conf/sidecar.yaml` with proper documentation ```suggestion enabled: true ``` ########## server/src/main/java/org/apache/cassandra/sidecar/routes/cdc/ServiceConfigValidator.java: ########## @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.sidecar.routes.cdc; + +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import com.google.inject.Singleton; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.vertx.core.json.JsonObject; + +import static org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException; + +/** + * Certain validations on payload and services have to be done before updating or deleting + * configs for services in "configs" table. {@link ServiceConfigValidator} has static + * utility methods for some of those validations. + */ +@Singleton +public class ServiceConfigValidator +{ + public Service validateAndGet(String requestService) + { + try + { + return Service.withName(requestService); + } + catch (Exception e) + { + Set<String> services = Stream.of(Service.values()).map(v -> v.serviceName).collect(Collectors.toSet()); + String supportedServices = String.join(", ", services); + throw wrapHttpException(HttpResponseStatus.BAD_REQUEST, "Invalid service passed. Supported services: " + + supportedServices); + } + } + + public void validateConfig(JsonObject payload) + { + try + { + payload.getJsonObject(ConfigPayloadParams.CONFIG).getMap(); + } + catch (final ClassCastException ex) + { + throw wrapHttpException(HttpResponseStatus.BAD_REQUEST, "Invalid config passed"); + } + } + + public void validatePayload(JsonObject payload) + { + if (!payload.containsKey(ConfigPayloadParams.CONFIG)) + { + throw wrapHttpException(HttpResponseStatus.BAD_REQUEST, "Invalid request payload. " + + "config needs to be passed"); Review Comment: NIT ```suggestion + "config needs to be provided"); ``` ########## server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcConfig.java: ########## @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.sidecar.cdc; + +import java.util.Map; +import org.apache.cassandra.sidecar.common.server.utils.MillisecondBoundConfiguration; +import org.apache.cassandra.sidecar.common.server.utils.MinuteBoundConfiguration; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +/** + * In memory representation of CDC and Kafka configurations from "configs" table in sidecar keyspace. + */ +public interface CdcConfig +{ + /** + * @return returns environment of the cassandra cluster. This config could be optional + */ + String env(); + + /** + * @return returns kafka topic for the mutations to be published + */ + @Nullable + String kafkaTopic(); + + /** + * @return returns topic formats + */ + @NotNull + TopicFormatType topicFormat(); + + /** + * @return returns if CDC is enabled or not + */ + boolean cdcEnabled(); + + /** + * @return returns unique global identifier for CDC job, CDC state is associated with job-id + */ + String jobId(); + + /** + * @return returns configurations of the kafka for the mutations to be published. + */ + Map<String, Object> kafkaConfigs(); + + /** + * @return returns CDC configurations as a map + */ + Map<String, Object> cdcConfigs(); + + /** + * @return if logOnly config is set, mutations will not be published to kafka, instead they + * would be logged, this would be useful for debugging and running sidecar application locally + */ + boolean logOnly(); + + /** + * @return returns the data center, this config could be optional + */ + String dc(); + + /** + * @return watermark window + */ + MinuteBoundConfiguration watermarkWindow(); + + /** + * @return max Kafka record size in bytes. If value is non-negative then the KafkaPublisher will chunk larger records into multiple messages. + */ + int maxRecordSizeBytes(); + + /** + * @return "zstd" to enable compression on large blobs, or null or empty string if disabled. + */ + @Nullable + String compression(); + + /** + * @return true if Kafka publisher should fail if Kafka client returns "record too large" error + */ + boolean failOnRecordTooLargeError(); + + /** + * @return true if Kafka publisher should fail if Kafka client returns any other error. + */ + boolean failOnKafkaError(); + + /** + * Initialization of tables and loading config takes some time, returns if the config + * is ready to be loaded or not. + * + * @return true if config is ready to be read. + */ + boolean isConfigReady(); + + MillisecondBoundConfiguration minDelayBetweenMicroBatches(); + + int maxCommitLogsPerInstance(); Review Comment: can we add javadocs here? ########## server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcConfig.java: ########## @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.sidecar.cdc; + +import java.util.Map; +import org.apache.cassandra.sidecar.common.server.utils.MillisecondBoundConfiguration; +import org.apache.cassandra.sidecar.common.server.utils.MinuteBoundConfiguration; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +/** + * In memory representation of CDC and Kafka configurations from "configs" table in sidecar keyspace. + */ +public interface CdcConfig +{ + /** + * @return returns environment of the cassandra cluster. This config could be optional + */ + String env(); + + /** + * @return returns kafka topic for the mutations to be published + */ + @Nullable + String kafkaTopic(); + + /** + * @return returns topic formats + */ + @NotNull + TopicFormatType topicFormat(); + + /** + * @return returns if CDC is enabled or not + */ + boolean cdcEnabled(); + + /** + * @return returns unique global identifier for CDC job, CDC state is associated with job-id + */ + String jobId(); + + /** + * @return returns configurations of the kafka for the mutations to be published. + */ + Map<String, Object> kafkaConfigs(); + + /** + * @return returns CDC configurations as a map + */ + Map<String, Object> cdcConfigs(); + + /** + * @return if logOnly config is set, mutations will not be published to kafka, instead they + * would be logged, this would be useful for debugging and running sidecar application locally + */ + boolean logOnly(); + + /** + * @return returns the data center, this config could be optional + */ + String dc(); + + /** + * @return watermark window + */ + MinuteBoundConfiguration watermarkWindow(); + + /** + * @return max Kafka record size in bytes. If value is non-negative then the KafkaPublisher will chunk larger records into multiple messages. + */ + int maxRecordSizeBytes(); + + /** + * @return "zstd" to enable compression on large blobs, or null or empty string if disabled. + */ + @Nullable + String compression(); + + /** + * @return true if Kafka publisher should fail if Kafka client returns "record too large" error + */ + boolean failOnRecordTooLargeError(); + + /** + * @return true if Kafka publisher should fail if Kafka client returns any other error. + */ + boolean failOnKafkaError(); + + /** + * Initialization of tables and loading config takes some time, returns if the config + * is ready to be loaded or not. + * + * @return true if config is ready to be read. + */ + boolean isConfigReady(); + + MillisecondBoundConfiguration minDelayBetweenMicroBatches(); Review Comment: can we add javadocs here? ########## server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcConfigImpl.java: ########## @@ -0,0 +1,362 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.sidecar.cdc; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import com.google.inject.Inject; +import com.google.inject.Singleton; +import io.vertx.core.Promise; + +import org.apache.cassandra.sidecar.common.server.ThrowingRunnable; +import org.apache.cassandra.sidecar.common.server.utils.DurationSpec; +import org.apache.cassandra.sidecar.common.server.utils.MillisecondBoundConfiguration; +import org.apache.cassandra.sidecar.common.server.utils.MinuteBoundConfiguration; +import org.apache.cassandra.sidecar.config.CdcConfiguration; +import org.apache.cassandra.sidecar.config.SchemaKeyspaceConfiguration; +import org.apache.cassandra.sidecar.db.CdcConfigAccessor; +import org.apache.cassandra.sidecar.db.KafkaConfigAccessor; +import org.apache.cassandra.sidecar.tasks.PeriodicTask; +import org.apache.cassandra.sidecar.tasks.PeriodicTaskExecutor; +import org.apache.cassandra.sidecar.tasks.ScheduleDecision; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +/** + * Implementation of the interface {@link CdcConfig}, an in-memory representation holding + * CDC and Kafka configurations from "configs" table inside sidecar internal keyspace. + */ +@Singleton +public class CdcConfigImpl implements CdcConfig +{ + private static final Logger LOGGER = LoggerFactory.getLogger(CdcConfigImpl.class); + private static final int DEFAULT_MAX_WATERMARKER_SIZE = 400000; + private static final String DEFAULT_JOB_ID = "test-job-id"; + private static final int DEFAULT_MAX_COMMITLOGS_PER_INSTANCE = 4; + private static final int DEFAULT_MAX_RECORD_BYTE_SIZE = -1; + private final SchemaKeyspaceConfiguration schemaKeyspaceConfiguration; + private final CdcConfiguration cdcConfiguration; + private final CdcConfigAccessor cdcConfigAccessor; + private final KafkaConfigAccessor kafkaConfigAccessor; + private final List<ThrowingRunnable> configChangeListeners = Collections.synchronizedList(new ArrayList<>()); + private final ConfigRefreshNotifier configRefreshNotifier; + private volatile Map<String, String> kafkaConfigMappings = Map.of(); + private volatile Map<String, String> cdcConfigMappings = Map.of(); + + @Inject + public CdcConfigImpl(CdcConfiguration cdcConfiguration, + SchemaKeyspaceConfiguration schemaKeyspaceConfiguration, + CdcConfigAccessor cdcConfigAccessor, + KafkaConfigAccessor kafkaConfigAccessor, + PeriodicTaskExecutor periodicTaskExecutor) + { + this.schemaKeyspaceConfiguration = schemaKeyspaceConfiguration; + this.cdcConfiguration = cdcConfiguration; + this.cdcConfigAccessor = cdcConfigAccessor; + this.kafkaConfigAccessor = kafkaConfigAccessor; + + if (this.schemaKeyspaceConfiguration.isEnabled()) + { + this.configRefreshNotifier = new ConfigRefreshNotifier(); + periodicTaskExecutor.schedule(configRefreshNotifier); + } + else + { + this.configRefreshNotifier = null; + } + } + + @Override + public Map<String, Object> kafkaConfigs() + { + Map<String, Object> kafkaConfigs = new HashMap<>(); + kafkaConfigs.putAll(getAuthConfigs()); + kafkaConfigs.putAll(kafkaConfigMappings); + return ImmutableMap.copyOf(kafkaConfigs); + } + + @Override + public Map<String, Object> cdcConfigs() + { + return ImmutableMap.copyOf(cdcConfigMappings); + } + + @Override + public boolean isConfigReady() + { + return cdcConfigAccessor.isSchemaInitialized() + && !kafkaConfigMappings.isEmpty() + && !cdcConfigMappings.isEmpty(); + } + + @Override + public String kafkaTopic() + { + return cdcConfigMappings.getOrDefault(ConfigKeys.TOPIC.lowcaseName, null); + } + + @NotNull + public TopicFormatType topicFormat() + { + return TopicFormatType.valueOf(cdcConfigMappings.getOrDefault(ConfigKeys.TOPIC_FORMAT_TYPE.lowcaseName, TopicFormatType.STATIC.name())); + } + + public boolean cdcEnabled() + { + return Boolean.parseBoolean(cdcConfigMappings.getOrDefault(ConfigKeys.CDC_ENABLED.lowcaseName, "true")); + } + + @Override + public String jobId() + { + return cdcConfigMappings.getOrDefault(ConfigKeys.JOBID.lowcaseName, DEFAULT_JOB_ID); + } + + @Override + public boolean logOnly() + { + return getBool(ConfigKeys.LOG_ONLY.lowcaseName, false); + } + + @Override + public boolean persistEnabled() + { + return getBool(ConfigKeys.PERSIST_STATE.lowcaseName, true); + } + + @Override + public boolean failOnRecordTooLargeError() + { + return getBool(ConfigKeys.FAIL_KAFKA_TOO_LARGE_ERRORS.lowcaseName, false); + } + + @Override + public boolean failOnKafkaError() + { + return getBool(ConfigKeys.FAIL_KAFKA_ERRORS.lowcaseName, true); + } + + @Override + public MillisecondBoundConfiguration persistDelay() + { + return new MillisecondBoundConfiguration(getInt(ConfigKeys.PERSIST_DELAY_MILLIS.lowcaseName, 1000), TimeUnit.SECONDS); + } + + @Override + public String dc() + { + return cdcConfigMappings.get(ConfigKeys.DC.lowcaseName); + } + + @Override + public MinuteBoundConfiguration watermarkWindow() + { + // this prop sets the maximum duration age accepted by CDC, any mutations with write timestamps older than + // the watermark window will be dropped with log message "Exclude the update due to out of the allowed time window." + return new MinuteBoundConfiguration(getInt(ConfigKeys.WATERMARK_SECONDS.lowcaseName, 259200), TimeUnit.SECONDS); + } + + @Override + public int maxRecordSizeBytes() + { + return DEFAULT_MAX_RECORD_BYTE_SIZE; + } + + @Override + public @Nullable String compression() Review Comment: is this not coming from the configuration map? ########## server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcConfigImpl.java: ########## @@ -0,0 +1,362 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.sidecar.cdc; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import com.google.inject.Inject; +import com.google.inject.Singleton; +import io.vertx.core.Promise; + +import org.apache.cassandra.sidecar.common.server.ThrowingRunnable; +import org.apache.cassandra.sidecar.common.server.utils.DurationSpec; +import org.apache.cassandra.sidecar.common.server.utils.MillisecondBoundConfiguration; +import org.apache.cassandra.sidecar.common.server.utils.MinuteBoundConfiguration; +import org.apache.cassandra.sidecar.config.CdcConfiguration; +import org.apache.cassandra.sidecar.config.SchemaKeyspaceConfiguration; +import org.apache.cassandra.sidecar.db.CdcConfigAccessor; +import org.apache.cassandra.sidecar.db.KafkaConfigAccessor; +import org.apache.cassandra.sidecar.tasks.PeriodicTask; +import org.apache.cassandra.sidecar.tasks.PeriodicTaskExecutor; +import org.apache.cassandra.sidecar.tasks.ScheduleDecision; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +/** + * Implementation of the interface {@link CdcConfig}, an in-memory representation holding + * CDC and Kafka configurations from "configs" table inside sidecar internal keyspace. + */ +@Singleton +public class CdcConfigImpl implements CdcConfig +{ + private static final Logger LOGGER = LoggerFactory.getLogger(CdcConfigImpl.class); + private static final int DEFAULT_MAX_WATERMARKER_SIZE = 400000; + private static final String DEFAULT_JOB_ID = "test-job-id"; + private static final int DEFAULT_MAX_COMMITLOGS_PER_INSTANCE = 4; + private static final int DEFAULT_MAX_RECORD_BYTE_SIZE = -1; + private final SchemaKeyspaceConfiguration schemaKeyspaceConfiguration; + private final CdcConfiguration cdcConfiguration; + private final CdcConfigAccessor cdcConfigAccessor; + private final KafkaConfigAccessor kafkaConfigAccessor; + private final List<ThrowingRunnable> configChangeListeners = Collections.synchronizedList(new ArrayList<>()); + private final ConfigRefreshNotifier configRefreshNotifier; + private volatile Map<String, String> kafkaConfigMappings = Map.of(); + private volatile Map<String, String> cdcConfigMappings = Map.of(); + + @Inject + public CdcConfigImpl(CdcConfiguration cdcConfiguration, + SchemaKeyspaceConfiguration schemaKeyspaceConfiguration, + CdcConfigAccessor cdcConfigAccessor, + KafkaConfigAccessor kafkaConfigAccessor, + PeriodicTaskExecutor periodicTaskExecutor) + { + this.schemaKeyspaceConfiguration = schemaKeyspaceConfiguration; + this.cdcConfiguration = cdcConfiguration; + this.cdcConfigAccessor = cdcConfigAccessor; + this.kafkaConfigAccessor = kafkaConfigAccessor; + + if (this.schemaKeyspaceConfiguration.isEnabled()) + { + this.configRefreshNotifier = new ConfigRefreshNotifier(); + periodicTaskExecutor.schedule(configRefreshNotifier); + } + else + { + this.configRefreshNotifier = null; + } + } + + @Override + public Map<String, Object> kafkaConfigs() + { + Map<String, Object> kafkaConfigs = new HashMap<>(); + kafkaConfigs.putAll(getAuthConfigs()); + kafkaConfigs.putAll(kafkaConfigMappings); + return ImmutableMap.copyOf(kafkaConfigs); + } + + @Override + public Map<String, Object> cdcConfigs() + { + return ImmutableMap.copyOf(cdcConfigMappings); + } + + @Override + public boolean isConfigReady() + { + return cdcConfigAccessor.isSchemaInitialized() + && !kafkaConfigMappings.isEmpty() + && !cdcConfigMappings.isEmpty(); + } + + @Override + public String kafkaTopic() + { + return cdcConfigMappings.getOrDefault(ConfigKeys.TOPIC.lowcaseName, null); + } + + @NotNull + public TopicFormatType topicFormat() + { + return TopicFormatType.valueOf(cdcConfigMappings.getOrDefault(ConfigKeys.TOPIC_FORMAT_TYPE.lowcaseName, TopicFormatType.STATIC.name())); + } + + public boolean cdcEnabled() + { + return Boolean.parseBoolean(cdcConfigMappings.getOrDefault(ConfigKeys.CDC_ENABLED.lowcaseName, "true")); + } + + @Override + public String jobId() + { + return cdcConfigMappings.getOrDefault(ConfigKeys.JOBID.lowcaseName, DEFAULT_JOB_ID); + } + + @Override + public boolean logOnly() + { + return getBool(ConfigKeys.LOG_ONLY.lowcaseName, false); + } + + @Override + public boolean persistEnabled() + { + return getBool(ConfigKeys.PERSIST_STATE.lowcaseName, true); + } + + @Override + public boolean failOnRecordTooLargeError() + { + return getBool(ConfigKeys.FAIL_KAFKA_TOO_LARGE_ERRORS.lowcaseName, false); + } + + @Override + public boolean failOnKafkaError() + { + return getBool(ConfigKeys.FAIL_KAFKA_ERRORS.lowcaseName, true); + } + + @Override + public MillisecondBoundConfiguration persistDelay() + { + return new MillisecondBoundConfiguration(getInt(ConfigKeys.PERSIST_DELAY_MILLIS.lowcaseName, 1000), TimeUnit.SECONDS); + } + + @Override + public String dc() + { + return cdcConfigMappings.get(ConfigKeys.DC.lowcaseName); + } + + @Override + public MinuteBoundConfiguration watermarkWindow() + { + // this prop sets the maximum duration age accepted by CDC, any mutations with write timestamps older than + // the watermark window will be dropped with log message "Exclude the update due to out of the allowed time window." + return new MinuteBoundConfiguration(getInt(ConfigKeys.WATERMARK_SECONDS.lowcaseName, 259200), TimeUnit.SECONDS); Review Comment: Should the value 259200 be defined as a constant? ########## server/src/main/java/org/apache/cassandra/sidecar/config/yaml/CdcConfigurationImpl.java: ########## @@ -32,33 +33,58 @@ public class CdcConfigurationImpl implements CdcConfiguration { private static final Logger LOGGER = LoggerFactory.getLogger(CdcConfigurationImpl.class); + public static final String IS_ENABLED_PROPERTY = "is_enabled"; Review Comment: to maintain consistency with the existing configuration this should just be "enabled" ```suggestion public static final String IS_ENABLED_PROPERTY = "enabled"; ``` ########## server/src/main/java/org/apache/cassandra/sidecar/config/CdcConfiguration.java: ########## @@ -28,4 +29,16 @@ public interface CdcConfiguration * @return segment hard link cache expiration time used in {@link org.apache.cassandra.sidecar.cdc.CdcLogCache} */ SecondBoundConfiguration segmentHardLinkCacheExpiry(); + + /** + * + * @return returns if cdc feature is enabled Review Comment: ```suggestion * @return true if cdc feature is enabled ``` ########## server/src/main/java/org/apache/cassandra/sidecar/db/ConfigAccessor.java: ########## @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.sidecar.db; + +import java.util.Map; +import java.util.Optional; + +/** + * CDC configs are stored inside "configs" table of sidecar keyspace. This is an interface for + * database accessor of "configs" table + */ +public interface ConfigAccessor +{ + /** + * Gets the configs of a service + * @return returns configs for the current service + */ + ServiceConfig getConfig(); + + /** + * Persists configs into the "configs" table for the current service + * + * @param config configs to be persisted + * @return returns updated configs + */ + ServiceConfig storeConfig(final Map<String, String> config); + + /** + * Stores configs of the current service if they are not already present + * + * @param config new configs + * @return updated configs + */ + Optional<ServiceConfig> storeConfigIfNotExists(final Map<String, String> config); + + /** + * Deletes configs for the given service Review Comment: If I understand this correctly this deletes all the configurations for the given service ```suggestion * Deletes all the configurations for the given service ``` ########## server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcConfigImpl.java: ########## @@ -0,0 +1,362 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.sidecar.cdc; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import com.google.inject.Inject; +import com.google.inject.Singleton; +import io.vertx.core.Promise; + +import org.apache.cassandra.sidecar.common.server.ThrowingRunnable; +import org.apache.cassandra.sidecar.common.server.utils.DurationSpec; +import org.apache.cassandra.sidecar.common.server.utils.MillisecondBoundConfiguration; +import org.apache.cassandra.sidecar.common.server.utils.MinuteBoundConfiguration; +import org.apache.cassandra.sidecar.config.CdcConfiguration; +import org.apache.cassandra.sidecar.config.SchemaKeyspaceConfiguration; +import org.apache.cassandra.sidecar.db.CdcConfigAccessor; +import org.apache.cassandra.sidecar.db.KafkaConfigAccessor; +import org.apache.cassandra.sidecar.tasks.PeriodicTask; +import org.apache.cassandra.sidecar.tasks.PeriodicTaskExecutor; +import org.apache.cassandra.sidecar.tasks.ScheduleDecision; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +/** + * Implementation of the interface {@link CdcConfig}, an in-memory representation holding + * CDC and Kafka configurations from "configs" table inside sidecar internal keyspace. + */ +@Singleton +public class CdcConfigImpl implements CdcConfig +{ + private static final Logger LOGGER = LoggerFactory.getLogger(CdcConfigImpl.class); + private static final int DEFAULT_MAX_WATERMARKER_SIZE = 400000; + private static final String DEFAULT_JOB_ID = "test-job-id"; + private static final int DEFAULT_MAX_COMMITLOGS_PER_INSTANCE = 4; + private static final int DEFAULT_MAX_RECORD_BYTE_SIZE = -1; + private final SchemaKeyspaceConfiguration schemaKeyspaceConfiguration; + private final CdcConfiguration cdcConfiguration; + private final CdcConfigAccessor cdcConfigAccessor; + private final KafkaConfigAccessor kafkaConfigAccessor; + private final List<ThrowingRunnable> configChangeListeners = Collections.synchronizedList(new ArrayList<>()); + private final ConfigRefreshNotifier configRefreshNotifier; + private volatile Map<String, String> kafkaConfigMappings = Map.of(); + private volatile Map<String, String> cdcConfigMappings = Map.of(); + + @Inject + public CdcConfigImpl(CdcConfiguration cdcConfiguration, + SchemaKeyspaceConfiguration schemaKeyspaceConfiguration, + CdcConfigAccessor cdcConfigAccessor, + KafkaConfigAccessor kafkaConfigAccessor, + PeriodicTaskExecutor periodicTaskExecutor) + { + this.schemaKeyspaceConfiguration = schemaKeyspaceConfiguration; + this.cdcConfiguration = cdcConfiguration; + this.cdcConfigAccessor = cdcConfigAccessor; + this.kafkaConfigAccessor = kafkaConfigAccessor; + + if (this.schemaKeyspaceConfiguration.isEnabled()) + { + this.configRefreshNotifier = new ConfigRefreshNotifier(); + periodicTaskExecutor.schedule(configRefreshNotifier); + } + else + { + this.configRefreshNotifier = null; + } + } + + @Override + public Map<String, Object> kafkaConfigs() + { + Map<String, Object> kafkaConfigs = new HashMap<>(); + kafkaConfigs.putAll(getAuthConfigs()); + kafkaConfigs.putAll(kafkaConfigMappings); + return ImmutableMap.copyOf(kafkaConfigs); + } + + @Override + public Map<String, Object> cdcConfigs() + { + return ImmutableMap.copyOf(cdcConfigMappings); + } + + @Override + public boolean isConfigReady() + { + return cdcConfigAccessor.isSchemaInitialized() + && !kafkaConfigMappings.isEmpty() + && !cdcConfigMappings.isEmpty(); + } + + @Override + public String kafkaTopic() + { + return cdcConfigMappings.getOrDefault(ConfigKeys.TOPIC.lowcaseName, null); + } + + @NotNull + public TopicFormatType topicFormat() + { + return TopicFormatType.valueOf(cdcConfigMappings.getOrDefault(ConfigKeys.TOPIC_FORMAT_TYPE.lowcaseName, TopicFormatType.STATIC.name())); + } + + public boolean cdcEnabled() + { + return Boolean.parseBoolean(cdcConfigMappings.getOrDefault(ConfigKeys.CDC_ENABLED.lowcaseName, "true")); + } + + @Override + public String jobId() + { + return cdcConfigMappings.getOrDefault(ConfigKeys.JOBID.lowcaseName, DEFAULT_JOB_ID); + } + + @Override + public boolean logOnly() + { + return getBool(ConfigKeys.LOG_ONLY.lowcaseName, false); + } + + @Override + public boolean persistEnabled() + { + return getBool(ConfigKeys.PERSIST_STATE.lowcaseName, true); + } + + @Override + public boolean failOnRecordTooLargeError() + { + return getBool(ConfigKeys.FAIL_KAFKA_TOO_LARGE_ERRORS.lowcaseName, false); + } + + @Override + public boolean failOnKafkaError() + { + return getBool(ConfigKeys.FAIL_KAFKA_ERRORS.lowcaseName, true); + } + + @Override + public MillisecondBoundConfiguration persistDelay() + { + return new MillisecondBoundConfiguration(getInt(ConfigKeys.PERSIST_DELAY_MILLIS.lowcaseName, 1000), TimeUnit.SECONDS); + } + + @Override + public String dc() + { + return cdcConfigMappings.get(ConfigKeys.DC.lowcaseName); + } + + @Override + public MinuteBoundConfiguration watermarkWindow() + { + // this prop sets the maximum duration age accepted by CDC, any mutations with write timestamps older than + // the watermark window will be dropped with log message "Exclude the update due to out of the allowed time window." + return new MinuteBoundConfiguration(getInt(ConfigKeys.WATERMARK_SECONDS.lowcaseName, 259200), TimeUnit.SECONDS); + } + + @Override + public int maxRecordSizeBytes() + { + return DEFAULT_MAX_RECORD_BYTE_SIZE; + } + + @Override + public @Nullable String compression() + { + return null; + } + + @Override + public MillisecondBoundConfiguration minDelayBetweenMicroBatches() + { + // this prop allows us to add a minimum delay between CDC micro batches + // usually if we need to slow down CDC + // e.g. if CDC is started with a large backlog of commit log segments and is working hard to process. + // e.g. or if there is a large data dump or burst of writes that causes high CDC activity. + final long millis = Long.parseLong(cdcConfigMappings.getOrDefault(ConfigKeys.MICRO_BATCH_DELAY_MILLIS.lowcaseName, "1000")); + return new MillisecondBoundConfiguration(millis, TimeUnit.MILLISECONDS); + } + + @Override + public String env() + { + return cdcConfigMappings.getOrDefault(ConfigKeys.ENV.lowcaseName, ""); + } + + @Override + public int maxCommitLogsPerInstance() + { + return getInt(ConfigKeys.MAX_COMMIT_LOGS.lowcaseName, DEFAULT_MAX_COMMITLOGS_PER_INSTANCE); + } + + @Override + public int maxWatermarkerSize() + { + return getInt(ConfigKeys.MAX_WATERMARKER_SIZE.lowcaseName, DEFAULT_MAX_WATERMARKER_SIZE); + } + + protected boolean getBool(String key, boolean orDefault) + { + String bool = cdcConfigMappings.get(key); + return bool != null ? Boolean.parseBoolean(bool) : orDefault; + } + + protected int getInt(String key, int orDefault) + { + return getInt(key, () -> orDefault); + } + + protected int getInt(String key, Supplier<Integer> orDefault) + { + String aInt = cdcConfigMappings.get(key); + return aInt != null ? Integer.valueOf(aInt) : orDefault.get(); + } + + /** + * Adds a listener to service config changes + * + * @param listener The listener to call + */ + public void registerConfigChangeListener(ThrowingRunnable listener) + { + this.configChangeListeners.add(listener); + } + + private Map<String, Object> getAuthConfigs() + { + return new HashMap<>(); + } + + @VisibleForTesting + void forceExecuteNotifier() + { + if (configRefreshNotifier != null && + configRefreshNotifier.scheduleDecision() == ScheduleDecision.EXECUTE) + { + configRefreshNotifier.execute(Promise.promise()); + } + } + + @VisibleForTesting + ConfigRefreshNotifier configRefreshNotifier() + { + return configRefreshNotifier; + } + + class ConfigRefreshNotifier implements PeriodicTask + { + @Override + public DurationSpec delay() + { + return cdcConfiguration.cdcConfigRefreshTime(); + } + + @Override + public void execute(Promise<Void> promise) + { + for (ThrowingRunnable listener : configChangeListeners) + { + try + { + listener.run(); + } + catch (Throwable e) + { + LOGGER.error("There was an error with callback {}", listener, e); + } + } + promise.tryComplete(); + } + + // skip if any of the following condition is true + // - sidecar schema not enabled or cdc not enabled + // - both configs have not changed + @Override + public ScheduleDecision scheduleDecision() + { + if (!schemaKeyspaceConfiguration.isEnabled() || !cdcConfiguration.isEnabled()) + { + LOGGER.trace("Skipping config refreshing"); + return ScheduleDecision.SKIP; + } + + Map<String, String> newKafkaConfigMappings; + Map<String, String> newCdcConfigMappings; + try + { + newKafkaConfigMappings = kafkaConfigAccessor.getConfig().getConfigs(); + newCdcConfigMappings = cdcConfigAccessor.getConfig().getConfigs(); + } + catch (Throwable e) + { + LOGGER.error("Failed to access cdc/kafka configs", e); + return ScheduleDecision.SKIP; + } + + boolean shouldSkip = true; + if (!newKafkaConfigMappings.equals(kafkaConfigMappings)) + { + shouldSkip = false; + kafkaConfigMappings = newKafkaConfigMappings; + } + if (!newCdcConfigMappings.equals(cdcConfigMappings)) + { + shouldSkip = false; + cdcConfigMappings = newCdcConfigMappings; + } + return shouldSkip ? ScheduleDecision.SKIP : ScheduleDecision.EXECUTE; + } + } + + enum ConfigKeys + { + DC, Review Comment: NIT: DATACENTER ########## server/src/main/java/org/apache/cassandra/sidecar/db/ConfigAccessor.java: ########## @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.sidecar.db; + +import java.util.Map; +import java.util.Optional; + +/** + * CDC configs are stored inside "configs" table of sidecar keyspace. This is an interface for + * database accessor of "configs" table + */ +public interface ConfigAccessor +{ + /** + * Gets the configs of a service + * @return returns configs for the current service + */ + ServiceConfig getConfig(); + + /** + * Persists configs into the "configs" table for the current service + * + * @param config configs to be persisted + * @return returns updated configs + */ + ServiceConfig storeConfig(final Map<String, String> config); Review Comment: NIT ```suggestion ServiceConfig storeConfig(Map<String, String> config); ``` ########## server/src/main/java/org/apache/cassandra/sidecar/db/ConfigAccessor.java: ########## @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.sidecar.db; + +import java.util.Map; +import java.util.Optional; + +/** + * CDC configs are stored inside "configs" table of sidecar keyspace. This is an interface for + * database accessor of "configs" table + */ +public interface ConfigAccessor +{ + /** + * Gets the configs of a service + * @return returns configs for the current service + */ + ServiceConfig getConfig(); + + /** + * Persists configs into the "configs" table for the current service + * + * @param config configs to be persisted + * @return returns updated configs + */ + ServiceConfig storeConfig(final Map<String, String> config); + + /** + * Stores configs of the current service if they are not already present + * + * @param config new configs + * @return updated configs + */ + Optional<ServiceConfig> storeConfigIfNotExists(final Map<String, String> config); + + /** + * Deletes configs for the given service + */ + void deleteConfig(); + + /** + * Checks if the schema for configs table is initialized + * @return Review Comment: ```suggestion * @return true if the schema for configs table is initialized ``` ########## server/src/main/java/org/apache/cassandra/sidecar/db/ServiceConfig.java: ########## @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.sidecar.db; + +import java.util.HashMap; +import java.util.Map; +import com.google.common.collect.ImmutableMap; +import com.datastax.driver.core.Row; +import org.jetbrains.annotations.Nullable; + +/** + * In-memory representation of service configs stored in "configs" table in C* Review Comment: NIT ```suggestion * In-memory representation of service configs stored in "configs" table in Cassandra ``` ########## server/src/main/java/org/apache/cassandra/sidecar/db/ConfigAccessorImpl.java: ########## @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.sidecar.db; + +import java.util.Map; +import java.util.Optional; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import com.datastax.driver.core.BoundStatement; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Row; + +import org.apache.cassandra.sidecar.common.server.CQLSessionProvider; +import org.apache.cassandra.sidecar.db.schema.ConfigsSchema; +import org.apache.cassandra.sidecar.db.schema.SidecarSchema; +import org.apache.cassandra.sidecar.routes.cdc.Service; + +/** + * Configurations for CDC feature are stored inside a table "config" in an internal sidecar keyspace. + * {@link ConfigAccessorImpl} is an accessor for the above-mentioned table and encapsulates database + * access operations of the "config" table. + */ +public abstract class ConfigAccessorImpl extends DatabaseAccessor<ConfigsSchema> implements ConfigAccessor +{ + private static final Logger LOGGER = LoggerFactory.getLogger(ConfigAccessorImpl.class); + private final Service service = service(); + private final SidecarSchema sidecarSchema; + + protected ConfigAccessorImpl(ConfigsSchema configsSchema, + CQLSessionProvider sessionProvider, + SidecarSchema sidecarSchema) + { + super(configsSchema, sessionProvider); + this.sidecarSchema = sidecarSchema; + } + + public abstract Service service(); + + @Override + public ServiceConfig getConfig() + { + sidecarSchema.ensureInitialized(); + BoundStatement statement = tableSchema.selectConfig() + .bind(service.serviceName); + Row row = execute(statement).one(); + if (row == null || row.isNull(0)) + { + LOGGER.debug(String.format("No %s configs are present in the table C* table", service.serviceName)); Review Comment: NIT ```suggestion LOGGER.debug(String.format("No %s configs are present in the table Cassandra table", service.serviceName)); ``` ########## server/src/main/java/org/apache/cassandra/sidecar/db/ServiceConfig.java: ########## @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.sidecar.db; + +import java.util.HashMap; +import java.util.Map; +import com.google.common.collect.ImmutableMap; +import com.datastax.driver.core.Row; +import org.jetbrains.annotations.Nullable; + +/** + * In-memory representation of service configs stored in "configs" table in C* + */ +public class ServiceConfig +{ + private final Map<String, String> serviceConfig; + + public ServiceConfig() + { + this.serviceConfig = new HashMap<>(); Review Comment: This map should be immutable ```suggestion this.serviceConfig = Map.of(); ``` ########## server/src/main/java/org/apache/cassandra/sidecar/db/schema/ConfigsSchema.java: ########## @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.sidecar.db.schema; + +import com.datastax.driver.core.KeyspaceMetadata; +import com.datastax.driver.core.Metadata; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.Session; +import com.google.inject.Inject; +import com.google.inject.Singleton; +import org.apache.cassandra.sidecar.config.SchemaKeyspaceConfiguration; +import org.apache.cassandra.sidecar.config.ServiceConfiguration; +import org.apache.cassandra.sidecar.coordination.ExecuteOnClusterLeaseholderOnly; +import org.jetbrains.annotations.NotNull; + +/** + * {@link ConfigsSchema} holds all prepared statements needed for talking to Cassandra for various actions + * related to storing and updating CDC and Kafka configs. + */ +@Singleton +public class ConfigsSchema extends TableSchema implements ExecuteOnClusterLeaseholderOnly +{ + private static final String CONFIGS_TABLE_NAME = "configs"; + private final SchemaKeyspaceConfiguration keyspaceConfig; + + // prepared statements + private PreparedStatement selectConfig; + private PreparedStatement insertConfig; + private PreparedStatement insertConfigIfNotExist; + private PreparedStatement deleteConfig; + + @Inject + public ConfigsSchema(ServiceConfiguration configuration) + { + this.keyspaceConfig = configuration.schemaKeyspaceConfiguration(); + } + + @Override + protected String keyspaceName() + { + return keyspaceConfig.keyspace(); + } + + @Override + protected void prepareStatements(@NotNull Session session) + { + selectConfig = prepare(selectConfig, session, CqlLiterals.selectConfig(keyspaceConfig)); + insertConfig = prepare(insertConfig, session, CqlLiterals.insertConfig(keyspaceConfig)); + insertConfigIfNotExist = prepare(insertConfigIfNotExist, session, CqlLiterals.insertConfigIfNotExist(keyspaceConfig)); + deleteConfig = prepare(deleteConfig, session, CqlLiterals.deleteConfig(keyspaceConfig)); + } + + @Override + protected String tableName() + { + return CONFIGS_TABLE_NAME; + } + + @Override + protected boolean exists(@NotNull Metadata metadata) + { + KeyspaceMetadata ksMetadata = metadata.getKeyspace(keyspaceConfig.keyspace()); + if (ksMetadata == null) + return false; + return ksMetadata.getTable(CONFIGS_TABLE_NAME) != null; + } + + @Override + protected String createSchemaStatement() + { + return String.format("CREATE TABLE IF NOT EXISTS %s.%s (" + + " service text," + + " config map<text, text>," + + " PRIMARY KEY (service))", + keyspaceConfig.keyspace(), CONFIGS_TABLE_NAME); + } + + public PreparedStatement selectConfig() + { + return selectConfig; + } + + public PreparedStatement insertConfig() + { + return insertConfig; + } + + public PreparedStatement insertConfigIfNotExists() + { + return insertConfigIfNotExist; + } + + public PreparedStatement deleteConfig() + { + return deleteConfig; + } + + private static class CqlLiterals + { + static String selectConfig(SchemaKeyspaceConfiguration config) + { + return withTable("SELECT config from %s.%s" + + " WHERE service=?", config); + } + + static String insertConfig(SchemaKeyspaceConfiguration config) + { + return withTable("INSERT INTO %s.%s (service, config)" + + " VALUES (?, ?)", config); + } + + static String insertConfigIfNotExist(SchemaKeyspaceConfiguration config) Review Comment: unused, should we remove? ########## server/src/main/java/org/apache/cassandra/sidecar/routes/cdc/ServiceConfigValidator.java: ########## @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.sidecar.routes.cdc; + +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import com.google.inject.Singleton; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.vertx.core.json.JsonObject; + +import static org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException; + +/** + * Certain validations on payload and services have to be done before updating or deleting + * configs for services in "configs" table. {@link ServiceConfigValidator} has static + * utility methods for some of those validations. + */ +@Singleton +public class ServiceConfigValidator +{ + public Service validateAndGet(String requestService) + { + try + { + return Service.withName(requestService); + } + catch (Exception e) + { + Set<String> services = Stream.of(Service.values()).map(v -> v.serviceName).collect(Collectors.toSet()); + String supportedServices = String.join(", ", services); + throw wrapHttpException(HttpResponseStatus.BAD_REQUEST, "Invalid service passed. Supported services: " + + supportedServices); + } + } + + public void validateConfig(JsonObject payload) + { + try + { + payload.getJsonObject(ConfigPayloadParams.CONFIG).getMap(); + } + catch (final ClassCastException ex) Review Comment: NIT ```suggestion catch (ClassCastException ex) ``` ########## server/src/main/java/org/apache/cassandra/sidecar/routes/cdc/GetServiceConfigHandler.java: ########## @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.sidecar.routes.cdc; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import com.google.inject.Inject; +import com.google.inject.Singleton; +import io.vertx.core.Handler; +import io.vertx.ext.auth.authorization.Authorization; +import io.vertx.ext.web.RoutingContext; +import org.apache.cassandra.sidecar.acl.authorization.BasicPermissions; +import org.apache.cassandra.sidecar.common.request.data.GetServicesConfigPayload; +import org.apache.cassandra.sidecar.db.ConfigAccessor; +import org.apache.cassandra.sidecar.db.ConfigAccessorFactory; +import org.apache.cassandra.sidecar.routes.AccessProtected; + +/** + * Provides REST endpoint for getting all the configs in "configs" table in sidecar internal keyspace. + */ +@Singleton +public class GetServiceConfigHandler implements Handler<RoutingContext>, AccessProtected Review Comment: NIT: `AllServicesConfigHandler` or `GetServicesConfigHandler` ```suggestion public class AllServicesConfigHandler implements Handler<RoutingContext>, AccessProtected ``` ########## server/src/main/java/org/apache/cassandra/sidecar/routes/cdc/ServiceConfigValidator.java: ########## @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.sidecar.routes.cdc; + +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import com.google.inject.Singleton; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.vertx.core.json.JsonObject; + +import static org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException; + +/** + * Certain validations on payload and services have to be done before updating or deleting + * configs for services in "configs" table. {@link ServiceConfigValidator} has static + * utility methods for some of those validations. + */ +@Singleton +public class ServiceConfigValidator +{ + public Service validateAndGet(String requestService) + { + try + { + return Service.withName(requestService); + } + catch (Exception e) + { + Set<String> services = Stream.of(Service.values()).map(v -> v.serviceName).collect(Collectors.toSet()); + String supportedServices = String.join(", ", services); + throw wrapHttpException(HttpResponseStatus.BAD_REQUEST, "Invalid service passed. Supported services: " + + supportedServices); + } + } + + public void validateConfig(JsonObject payload) + { + try + { + payload.getJsonObject(ConfigPayloadParams.CONFIG).getMap(); + } + catch (final ClassCastException ex) + { + throw wrapHttpException(HttpResponseStatus.BAD_REQUEST, "Invalid config passed"); Review Comment: NIT: ```suggestion throw wrapHttpException(HttpResponseStatus.BAD_REQUEST, "Invalid configuration provided"); ``` ########## server/src/main/java/org/apache/cassandra/sidecar/routes/cdc/ServiceConfigValidator.java: ########## @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.sidecar.routes.cdc; + +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import com.google.inject.Singleton; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.vertx.core.json.JsonObject; + +import static org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException; + +/** + * Certain validations on payload and services have to be done before updating or deleting + * configs for services in "configs" table. {@link ServiceConfigValidator} has static + * utility methods for some of those validations. + */ +@Singleton +public class ServiceConfigValidator +{ + public Service validateAndGet(String requestService) + { + try + { + return Service.withName(requestService); + } + catch (Exception e) + { + Set<String> services = Stream.of(Service.values()).map(v -> v.serviceName).collect(Collectors.toSet()); + String supportedServices = String.join(", ", services); + throw wrapHttpException(HttpResponseStatus.BAD_REQUEST, "Invalid service passed. Supported services: " Review Comment: NIT ```suggestion throw wrapHttpException(HttpResponseStatus.BAD_REQUEST, "Invalid service provided. Supported services: " ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org For additional commands, e-mail: pr-h...@cassandra.apache.org