jyothsnakonisa commented on code in PR #294:
URL: https://github.com/apache/cassandra-sidecar/pull/294#discussion_r2600663155


##########
server/src/main/java/org/apache/cassandra/sidecar/cdc/CachingSchemaStore.java:
##########
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.cdc;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import io.vertx.core.Vertx;
+import io.vertx.core.eventbus.EventBus;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+
+import org.apache.cassandra.cdc.avro.AvroSchemas;
+import org.apache.cassandra.cdc.avro.CqlToAvroSchemaConverter;
+import org.apache.cassandra.cdc.kafka.KafkaOptions;
+import org.apache.cassandra.cdc.schemastore.SchemaStore;
+import org.apache.cassandra.cdc.schemastore.SchemaStorePublisherFactory;
+import org.apache.cassandra.cdc.schemastore.TableSchemaPublisher;
+import org.apache.cassandra.sidecar.db.TableHistoryDatabaseAccessor;
+import org.apache.cassandra.sidecar.db.schema.SidecarSchema;
+import org.apache.cassandra.sidecar.tasks.CassandraClusterSchemaMonitor;
+import org.apache.cassandra.spark.data.CqlTable;
+import org.apache.cassandra.spark.utils.TableIdentifier;
+
+import static 
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_CDC_CACHE_WARMED_UP;
+import static 
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_SERVER_START;
+import static 
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_SIDECAR_SCHEMA_INITIALIZED;
+
+/**
+ * Schemas cache to be used by CDC event serialization.
+ */
+@Singleton
+public class CachingSchemaStore implements SchemaStore
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(CachingSchemaStore.class);
+    private final Map<TableIdentifier, SchemaCacheEntry> avroSchemasCache = 
new ConcurrentHashMap<>();
+    private final CassandraClusterSchemaMonitor cassandraClusterSchemaMonitor;
+    private final SidecarSchema sidecarSchema;
+    private final TableHistoryDatabaseAccessor tableHistoryDatabaseAccessor;
+    private final Vertx vertx;
+    private final CdcConfigImpl cdcConfig;
+    @Nullable volatile TableSchemaPublisher publisher;

Review Comment:
   This could be private.



##########
server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcManager.java:
##########
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.cdc;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.cdc.api.CdcOptions;
+import org.apache.cassandra.cdc.api.EventConsumer;
+import org.apache.cassandra.cdc.api.SchemaSupplier;
+import org.apache.cassandra.cdc.api.TokenRangeSupplier;
+import org.apache.cassandra.cdc.sidecar.CdcSidecarInstancesProvider;
+import org.apache.cassandra.cdc.sidecar.ClusterConfigProvider;
+import org.apache.cassandra.cdc.sidecar.SidecarCdc;
+import org.apache.cassandra.cdc.sidecar.SidecarCdcClient;
+import org.apache.cassandra.cdc.sidecar.SidecarCdcStats;
+import org.apache.cassandra.cdc.sidecar.SidecarStatePersister;
+import org.apache.cassandra.cdc.stats.ICdcStats;
+import org.apache.cassandra.secrets.SecretsProvider;
+import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+import org.apache.cassandra.sidecar.common.server.cluster.locator.TokenRange;
+import org.apache.cassandra.sidecar.concurrent.TaskExecutorPool;
+import org.apache.cassandra.sidecar.coordination.RangeManager;
+import org.apache.cassandra.sidecar.db.CdcDatabaseAccessor;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+import org.apache.cassandra.spark.utils.AsyncExecutor;
+import org.jetbrains.annotations.NotNull;
+
+
+/**
+ * Class handling CDC iterators

Review Comment:
   Please add better documentation.



##########
server/src/main/java/org/apache/cassandra/sidecar/db/CdcDatabaseAccessor.java:
##########
@@ -181,31 +216,4 @@ ResultSetFuture selectCdcRange(String jobId, int split)
     {
         return session().executeAsync(tableSchema.select().bind(jobId, (short) 
split));
     }
-
-    public ResultSetFuture insertTableSchemaHistory(String keyspace, String 
tableName, String schema)

Review Comment:
   Are you sure that you might not need these in the future?



##########
server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcPublisher.java:
##########
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.cdc;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import io.vertx.core.Handler;
+import io.vertx.core.Promise;
+import io.vertx.core.Vertx;
+import io.vertx.core.eventbus.Message;
+import org.apache.cassandra.cdc.CdcLogMode;
+import org.apache.cassandra.cdc.api.EventConsumer;
+import org.apache.cassandra.cdc.api.SchemaSupplier;
+import org.apache.cassandra.cdc.kafka.KafkaPublisher;
+import org.apache.cassandra.cdc.kafka.TopicSupplier;
+import org.apache.cassandra.cdc.msg.CdcEvent;
+import org.apache.cassandra.cdc.sidecar.CdcSidecarInstancesProvider;
+import org.apache.cassandra.cdc.sidecar.ClusterConfigProvider;
+import org.apache.cassandra.cdc.sidecar.SidecarCdc;
+import org.apache.cassandra.cdc.sidecar.SidecarCdcClient;
+import org.apache.cassandra.cdc.stats.ICdcStats;
+import org.apache.cassandra.secrets.SecretsProvider;
+import org.apache.cassandra.secrets.SslConfig;
+import org.apache.cassandra.secrets.SslConfigSecretsProvider;
+import org.apache.cassandra.sidecar.common.server.utils.DurationSpec;
+import 
org.apache.cassandra.sidecar.common.server.utils.MillisecondBoundConfiguration;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import org.apache.cassandra.sidecar.concurrent.TaskExecutorPool;
+import org.apache.cassandra.sidecar.config.KeyStoreConfiguration;
+import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+import org.apache.cassandra.sidecar.config.SslConfiguration;
+import org.apache.cassandra.sidecar.coordination.ContentionFreeRangeManager;
+import org.apache.cassandra.sidecar.coordination.RangeManager;
+import org.apache.cassandra.sidecar.coordination.TokenRingProvider;
+import org.apache.cassandra.sidecar.db.CdcDatabaseAccessor;
+import org.apache.cassandra.sidecar.db.VirtualTablesDatabaseAccessor;
+import org.apache.cassandra.sidecar.tasks.PeriodicTask;
+import org.apache.cassandra.sidecar.tasks.ScheduleDecision;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.common.serialization.Serializer;
+
+import static 
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_CDC_CACHE_WARMED_UP;
+import static 
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_CDC_CONFIGURATION_CHANGED;
+import static 
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_SERVER_STOP;
+
+/**
+ * Class that handles CDC life cycle
+ */
+@Singleton
+public class CdcPublisher implements Handler<Message<Object>>, PeriodicTask
+{
+    static final Logger LOGGER = LoggerFactory.getLogger(CdcPublisher.class);
+    static final long INITIALIZATION_LOOP_DELAY_MILLIS = 1000;

Review Comment:
   Please make them private.



##########
server/src/main/java/org/apache/cassandra/sidecar/db/CdcDatabaseAccessor.java:
##########
@@ -129,6 +138,32 @@ public List<ResultSetFuture> storeStateAsync(String jobId,
                      .collect(Collectors.toList());
     }
 
+    /**
+     * Load cdc state for a given jobId and token range and merge into 
canonical view
+     *
+     * @param stats SidecarCdcStats to publish metrics
+     * @param jobId Cdc job id
+     * @param range token range
+     * @return merged SidecarCdcState object that merges previous state 
objects that overlap with token range to given canonical view of Cdc state.
+     */
+    public Optional<CdcState> loadSidecarCdcState(SidecarCdcStats stats, 
String jobId, TokenRange range)

Review Comment:
   Looks like this is not used anywheere, I am assuming that it will be used 
later



##########
server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcPublisher.java:
##########
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.cdc;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import io.vertx.core.Handler;
+import io.vertx.core.Promise;
+import io.vertx.core.Vertx;
+import io.vertx.core.eventbus.Message;
+import org.apache.cassandra.cdc.CdcLogMode;
+import org.apache.cassandra.cdc.api.EventConsumer;
+import org.apache.cassandra.cdc.api.SchemaSupplier;
+import org.apache.cassandra.cdc.kafka.KafkaPublisher;
+import org.apache.cassandra.cdc.kafka.TopicSupplier;
+import org.apache.cassandra.cdc.msg.CdcEvent;
+import org.apache.cassandra.cdc.sidecar.CdcSidecarInstancesProvider;
+import org.apache.cassandra.cdc.sidecar.ClusterConfigProvider;
+import org.apache.cassandra.cdc.sidecar.SidecarCdc;
+import org.apache.cassandra.cdc.sidecar.SidecarCdcClient;
+import org.apache.cassandra.cdc.stats.ICdcStats;
+import org.apache.cassandra.secrets.SecretsProvider;
+import org.apache.cassandra.secrets.SslConfig;
+import org.apache.cassandra.secrets.SslConfigSecretsProvider;
+import org.apache.cassandra.sidecar.common.server.utils.DurationSpec;
+import 
org.apache.cassandra.sidecar.common.server.utils.MillisecondBoundConfiguration;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import org.apache.cassandra.sidecar.concurrent.TaskExecutorPool;
+import org.apache.cassandra.sidecar.config.KeyStoreConfiguration;
+import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+import org.apache.cassandra.sidecar.config.SslConfiguration;
+import org.apache.cassandra.sidecar.coordination.ContentionFreeRangeManager;
+import org.apache.cassandra.sidecar.coordination.RangeManager;
+import org.apache.cassandra.sidecar.coordination.TokenRingProvider;
+import org.apache.cassandra.sidecar.db.CdcDatabaseAccessor;
+import org.apache.cassandra.sidecar.db.VirtualTablesDatabaseAccessor;
+import org.apache.cassandra.sidecar.tasks.PeriodicTask;
+import org.apache.cassandra.sidecar.tasks.ScheduleDecision;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.common.serialization.Serializer;
+
+import static 
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_CDC_CACHE_WARMED_UP;
+import static 
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_CDC_CONFIGURATION_CHANGED;
+import static 
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_SERVER_STOP;
+
+/**
+ * Class that handles CDC life cycle
+ */
+@Singleton
+public class CdcPublisher implements Handler<Message<Object>>, PeriodicTask
+{
+    static final Logger LOGGER = LoggerFactory.getLogger(CdcPublisher.class);
+    static final long INITIALIZATION_LOOP_DELAY_MILLIS = 1000;
+    private final Vertx vertx;
+    private final TaskExecutorPool executorPools;
+    private final CdcConfig conf;
+    private volatile boolean isRunning = false;
+    private volatile boolean isInitialized = false;
+    private volatile boolean cdcCacheWarmedUp = false;
+    private final CdcDatabaseAccessor databaseAccessor;
+    private final VirtualTablesDatabaseAccessor virtualTables;
+    private final SidecarCdcStats sidecarCdcStats;
+    private RangeManager rangeManager;
+    private final TokenRingProvider tokenRingProvider;
+    private final SchemaSupplier schemaSupplier;
+    private final CdcSidecarInstancesProvider sidecarInstancesProvider;
+    private final InstanceMetadataFetcher instanceMetadataFetcher;
+    private final ClusterConfigProvider clusterConfigProvider;
+    private final SidecarCdcClient.ClientConfig clientConfig;
+    private final ICdcStats cdcStats;
+    private final SidecarConfiguration sidecarConfiguration;
+    private CdcManager cdcManager;
+    Serializer<CdcEvent> avroSerializer;

Review Comment:
   Please make this private



##########
server/src/main/java/org/apache/cassandra/sidecar/cdc/CachingSchemaStore.java:
##########
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.cdc;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import io.vertx.core.Vertx;
+import io.vertx.core.eventbus.EventBus;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+
+import org.apache.cassandra.cdc.avro.AvroSchemas;
+import org.apache.cassandra.cdc.avro.CqlToAvroSchemaConverter;
+import org.apache.cassandra.cdc.kafka.KafkaOptions;
+import org.apache.cassandra.cdc.schemastore.SchemaStore;
+import org.apache.cassandra.cdc.schemastore.SchemaStorePublisherFactory;
+import org.apache.cassandra.cdc.schemastore.TableSchemaPublisher;
+import org.apache.cassandra.sidecar.db.TableHistoryDatabaseAccessor;
+import org.apache.cassandra.sidecar.db.schema.SidecarSchema;
+import org.apache.cassandra.sidecar.tasks.CassandraClusterSchemaMonitor;
+import org.apache.cassandra.spark.data.CqlTable;
+import org.apache.cassandra.spark.utils.TableIdentifier;
+
+import static 
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_CDC_CACHE_WARMED_UP;
+import static 
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_SERVER_START;
+import static 
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_SIDECAR_SCHEMA_INITIALIZED;
+
+/**
+ * Schemas cache to be used by CDC event serialization.

Review Comment:
   Should we add detailed documentation?



##########
server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcPublisher.java:
##########
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.cdc;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import io.vertx.core.Handler;
+import io.vertx.core.Promise;
+import io.vertx.core.Vertx;
+import io.vertx.core.eventbus.Message;
+import org.apache.cassandra.cdc.CdcLogMode;
+import org.apache.cassandra.cdc.api.EventConsumer;
+import org.apache.cassandra.cdc.api.SchemaSupplier;
+import org.apache.cassandra.cdc.kafka.KafkaPublisher;
+import org.apache.cassandra.cdc.kafka.TopicSupplier;
+import org.apache.cassandra.cdc.msg.CdcEvent;
+import org.apache.cassandra.cdc.sidecar.CdcSidecarInstancesProvider;
+import org.apache.cassandra.cdc.sidecar.ClusterConfigProvider;
+import org.apache.cassandra.cdc.sidecar.SidecarCdc;
+import org.apache.cassandra.cdc.sidecar.SidecarCdcClient;
+import org.apache.cassandra.cdc.stats.ICdcStats;
+import org.apache.cassandra.secrets.SecretsProvider;
+import org.apache.cassandra.secrets.SslConfig;
+import org.apache.cassandra.secrets.SslConfigSecretsProvider;
+import org.apache.cassandra.sidecar.common.server.utils.DurationSpec;
+import 
org.apache.cassandra.sidecar.common.server.utils.MillisecondBoundConfiguration;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import org.apache.cassandra.sidecar.concurrent.TaskExecutorPool;
+import org.apache.cassandra.sidecar.config.KeyStoreConfiguration;
+import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+import org.apache.cassandra.sidecar.config.SslConfiguration;
+import org.apache.cassandra.sidecar.coordination.ContentionFreeRangeManager;
+import org.apache.cassandra.sidecar.coordination.RangeManager;
+import org.apache.cassandra.sidecar.coordination.TokenRingProvider;
+import org.apache.cassandra.sidecar.db.CdcDatabaseAccessor;
+import org.apache.cassandra.sidecar.db.VirtualTablesDatabaseAccessor;
+import org.apache.cassandra.sidecar.tasks.PeriodicTask;
+import org.apache.cassandra.sidecar.tasks.ScheduleDecision;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.common.serialization.Serializer;
+
+import static 
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_CDC_CACHE_WARMED_UP;
+import static 
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_CDC_CONFIGURATION_CHANGED;
+import static 
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_SERVER_STOP;
+
+/**
+ * Class that handles CDC life cycle
+ */
+@Singleton
+public class CdcPublisher implements Handler<Message<Object>>, PeriodicTask
+{
+    static final Logger LOGGER = LoggerFactory.getLogger(CdcPublisher.class);
+    static final long INITIALIZATION_LOOP_DELAY_MILLIS = 1000;
+    private final Vertx vertx;
+    private final TaskExecutorPool executorPools;
+    private final CdcConfig conf;
+    private volatile boolean isRunning = false;
+    private volatile boolean isInitialized = false;
+    private volatile boolean cdcCacheWarmedUp = false;
+    private final CdcDatabaseAccessor databaseAccessor;
+    private final VirtualTablesDatabaseAccessor virtualTables;
+    private final SidecarCdcStats sidecarCdcStats;
+    private RangeManager rangeManager;
+    private final TokenRingProvider tokenRingProvider;
+    private final SchemaSupplier schemaSupplier;
+    private final CdcSidecarInstancesProvider sidecarInstancesProvider;
+    private final InstanceMetadataFetcher instanceMetadataFetcher;
+    private final ClusterConfigProvider clusterConfigProvider;
+    private final SidecarCdcClient.ClientConfig clientConfig;
+    private final ICdcStats cdcStats;
+    private final SidecarConfiguration sidecarConfiguration;
+    private CdcManager cdcManager;
+    Serializer<CdcEvent> avroSerializer;
+
+    @Inject
+    public CdcPublisher(Vertx vertx,
+                        SidecarConfiguration sidecarConfiguration,
+                        ExecutorPools executorPools,
+                        ClusterConfigProvider clusterConfigProvider,
+                        SchemaSupplier schemaSupplier,
+                        CdcSidecarInstancesProvider sidecarInstancesProvider,
+                        SidecarCdcClient.ClientConfig clientConfig,
+                        InstanceMetadataFetcher instanceMetadataFetcher,
+                        CdcConfig conf,
+                        CdcDatabaseAccessor databaseAccessor,
+                        ICdcStats cdcStats,
+                        TokenRingProvider tokenRingProvider,
+                        VirtualTablesDatabaseAccessor virtualTables,
+                        SidecarCdcStats sidecarCdcStats,
+                        Serializer<CdcEvent> avroSerializer)
+    {
+        this.vertx = vertx;
+
+        this.sidecarCdcStats = sidecarCdcStats;
+        this.executorPools = executorPools.internal();
+        this.conf = conf;
+        this.databaseAccessor = databaseAccessor;
+        this.tokenRingProvider = tokenRingProvider;
+        this.virtualTables = virtualTables;
+
+        this.schemaSupplier = schemaSupplier;
+        this.sidecarInstancesProvider = sidecarInstancesProvider;
+        this.instanceMetadataFetcher = instanceMetadataFetcher;
+        this.clusterConfigProvider = clusterConfigProvider;
+        this.clientConfig = clientConfig;
+        this.cdcStats = cdcStats;
+        this.sidecarConfiguration = sidecarConfiguration;
+        this.avroSerializer = avroSerializer;
+        
vertx.eventBus().localConsumer(RangeManager.RangeManagerEvents.ON_TOKEN_RANGE_CHANGED.address(),
 this);
+        
vertx.eventBus().localConsumer(RangeManager.LeadershipEvents.ON_TOKEN_RANGE_GAINED.address(),
 this);
+        
vertx.eventBus().localConsumer(RangeManager.LeadershipEvents.ON_TOKEN_RANGE_LOST.address(),
 this);
+        vertx.eventBus().localConsumer(ON_SERVER_STOP.address(), this);
+        vertx.eventBus().localConsumer(ON_CDC_CACHE_WARMED_UP.address(), this);
+        vertx.eventBus().localConsumer(ON_CDC_CONFIGURATION_CHANGED.address(), 
new ConfigChangedHandler());
+    }
+
+    public SecretsProvider secretsProvider()
+    {
+        SslConfiguration sslConfiguration = 
sidecarConfiguration.sidecarClientConfiguration().sslConfiguration();
+
+        if (sslConfiguration == null || !sslConfiguration.enabled())
+        {
+            return null;
+        }
+
+        Map<String, String> sslConfigMap = new HashMap<>();
+
+        sslConfigMap.put("enabled", sslConfiguration.enabled() + "");
+        sslConfigMap.put("preferOpenSSL", sslConfiguration.preferOpenSSL() + 
"");
+        sslConfigMap.put("clientAuth", sslConfiguration.clientAuth());
+        sslConfigMap.put("cipherSuites", String.join(",", 
sslConfiguration.cipherSuites()));
+        sslConfigMap.put("secureTransportProtocols", String.join(",", 
sslConfiguration.secureTransportProtocols()));
+        sslConfigMap.put("handshakeTimeout", 
sslConfiguration.handshakeTimeout().toString());
+
+        if (sslConfiguration.isKeystoreConfigured())
+        {
+            KeyStoreConfiguration keystore = sslConfiguration.keystore();
+            sslConfigMap.put("keystorePath", keystore.path());
+            sslConfigMap.put("keystorePassword", keystore.password());
+            sslConfigMap.put("keystoreType", keystore.type());
+        }
+
+        if (sslConfiguration.isTrustStoreConfigured())
+        {
+            KeyStoreConfiguration truststore = sslConfiguration.truststore();
+            sslConfigMap.put("truststorePath", truststore.path());
+            sslConfigMap.put("truststorePassword", truststore.password());
+            sslConfigMap.put("truststoreType", truststore.type());
+        }
+
+        SslConfig sslConfig = SslConfig.create(sslConfigMap);
+        return new SslConfigSecretsProvider(sslConfig);
+    }
+
+    public EventConsumer eventConsumer(CdcConfig conf,
+                                       Serializer<CdcEvent> avroSerializer)
+    {
+        KafkaProducer<String, byte[]> producer = new 
KafkaProducer<>(conf.kafkaConfigs());
+        KafkaPublisher kafkaPublisher = new 
KafkaPublisher(TopicSupplier.staticTopicSupplier(conf.kafkaTopic()),
+                                                           producer,
+                                                           avroSerializer,
+                                                           
conf.maxRecordSizeBytes(),
+                                                           
conf.failOnRecordTooLargeError(),
+                                                           
conf.failOnKafkaError(),
+                                                           CdcLogMode.FULL);
+        return new CdcEventConsumer(kafkaPublisher);
+    }
+
+    private class ConfigChangedHandler implements Handler<Message<Object>>
+    {
+        public void handle(Message<Object> event)
+        {
+            sidecarCdcStats.captureCdcConfigChange();
+            // Execute restart on worker thread to avoid blocking event loop
+            executorPools.executeBlocking(() -> {
+                restart();
+                return null;
+            });
+        }
+    }
+
+    @SuppressWarnings("resource")
+    private synchronized void run() throws IllegalStateException
+    {
+        if (isRunning)
+        {
+            return;
+        }
+        databaseAccessor.session(); // throws IllegalStateException if session 
unavailable
+
+        cdcManager = new CdcManager(eventConsumer(conf, avroSerializer),
+                                    schemaSupplier,
+                                    conf,
+                                    rangeManager,
+                                    instanceMetadataFetcher,
+                                    clusterConfigProvider,
+                                    sidecarInstancesProvider,
+                                    secretsProvider(),
+                                    clientConfig,
+                                    cdcStats,
+                                    this.executorPools,
+                                    databaseAccessor);
+
+        List<SidecarCdc> consumers = cdcManager.buildCdcConsumers();
+        cdcManager.startConsumers();
+        LOGGER.info("{} CDC iterators started successfully", consumers.size());
+        isRunning = true;
+        sidecarCdcStats.captureCdcStarted(consumers.size());
+    }
+
+    protected synchronized void restart()
+    {
+        try
+        {
+            stop();
+            initialize();
+
+            LOGGER.info("Iterators restarted.");
+            sidecarCdcStats.captureCdcRestart();
+        }
+        catch (Exception e)
+        {
+            LOGGER.error("Failed to restart iterators", e);
+            sidecarCdcStats.captureCdcStartFailure(e);
+        }
+    }
+
+    public boolean isRunning()
+    {
+        return isRunning;
+    }
+
+    public synchronized void stop()
+    {
+        if (!isRunning)
+        {
+            return;
+        }
+
+        try
+        {
+            cdcManager.stopConsumers();
+            sidecarCdcStats.captureCdcStopped();
+        }
+        catch (Throwable t)
+        {
+            LOGGER.error("Failed to gracefully shutdown CDC", t);
+            sidecarCdcStats.captureCdcStopFailed(t);
+        }
+        finally
+        {
+            isRunning = false;
+            isInitialized = false;
+        }
+    }
+
+    private void initialize()
+    {
+        try
+        {
+            if (this.rangeManager == null)
+            {
+                this.rangeManager = new ContentionFreeRangeManager(vertx, 
tokenRingProvider);
+            }
+            String localDc = rangeManager.getLocalDcSafe();
+            if (conf.datacenter() != null && !conf.datacenter().isEmpty() && 
!conf.datacenter().equals(localDc))
+            {
+                LOGGER.info("Cdc not enabled in this DC localDc={} cdcDc={}", 
localDc, conf.datacenter());
+                return;

Review Comment:
   Return not needed here and other if else blocks



##########
server/src/main/java/org/apache/cassandra/sidecar/cdc/CachingSchemaStore.java:
##########
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.cdc;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import io.vertx.core.Vertx;
+import io.vertx.core.eventbus.EventBus;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+
+import org.apache.cassandra.cdc.avro.AvroSchemas;
+import org.apache.cassandra.cdc.avro.CqlToAvroSchemaConverter;
+import org.apache.cassandra.cdc.kafka.KafkaOptions;
+import org.apache.cassandra.cdc.schemastore.SchemaStore;
+import org.apache.cassandra.cdc.schemastore.SchemaStorePublisherFactory;
+import org.apache.cassandra.cdc.schemastore.TableSchemaPublisher;
+import org.apache.cassandra.sidecar.db.TableHistoryDatabaseAccessor;
+import org.apache.cassandra.sidecar.db.schema.SidecarSchema;
+import org.apache.cassandra.sidecar.tasks.CassandraClusterSchemaMonitor;
+import org.apache.cassandra.spark.data.CqlTable;
+import org.apache.cassandra.spark.utils.TableIdentifier;
+
+import static 
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_CDC_CACHE_WARMED_UP;
+import static 
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_SERVER_START;
+import static 
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_SIDECAR_SCHEMA_INITIALIZED;
+
+/**
+ * Schemas cache to be used by CDC event serialization.
+ */
+@Singleton
+public class CachingSchemaStore implements SchemaStore
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(CachingSchemaStore.class);
+    private final Map<TableIdentifier, SchemaCacheEntry> avroSchemasCache = 
new ConcurrentHashMap<>();
+    private final CassandraClusterSchemaMonitor cassandraClusterSchemaMonitor;
+    private final SidecarSchema sidecarSchema;
+    private final TableHistoryDatabaseAccessor tableHistoryDatabaseAccessor;
+    private final Vertx vertx;
+    private final CdcConfigImpl cdcConfig;
+    @Nullable volatile TableSchemaPublisher publisher;
+    private final CqlToAvroSchemaConverter cqlToAvroSchemaConverter;
+    private final SidecarCdcStats sidecarCdcStats;
+
+    @Inject
+    CachingSchemaStore(Vertx vertx,
+                       CassandraClusterSchemaMonitor 
cassandraClusterSchemaMonitor,
+                       TableHistoryDatabaseAccessor 
tableHistoryDatabaseAccessor,
+                       CdcConfigImpl cdcConfig,
+                       SidecarCdcStats sidecarCdcStats,
+                       SidecarSchema sidecarSchema,
+                       CqlToAvroSchemaConverter cqlToAvroSchemaConverter)
+    {
+        super();
+        this.cassandraClusterSchemaMonitor = cassandraClusterSchemaMonitor;
+        this.tableHistoryDatabaseAccessor = tableHistoryDatabaseAccessor;
+        this.sidecarSchema = sidecarSchema;
+        this.cqlToAvroSchemaConverter = cqlToAvroSchemaConverter;
+        
this.avroSchemasCache.putAll(createSchemaCache(cassandraClusterSchemaMonitor.getCdcTables()));
+        AvroSchemas.registerLogicalTypes();
+        
cassandraClusterSchemaMonitor.addSchemaChangeListener(this::onSchemaChanged);
+        this.vertx = vertx;
+        this.cdcConfig = cdcConfig;
+        this.sidecarCdcStats = sidecarCdcStats;
+
+        configureSidecarServerEventListeners();
+    }
+
+    private void loadPublisher()
+    {
+        KafkaOptions kafkaOptions = new KafkaOptions()
+        {
+            public Map<String, Object> kafkaConfigs()
+            {
+                return cdcConfig.kafkaConfigs();
+            }
+        };
+        this.publisher = 
SchemaStorePublisherFactory.DEFAULT.buildPublisher(kafkaOptions);
+    }
+
+    private void configureSidecarServerEventListeners()
+    {
+        EventBus eventBus = vertx.eventBus();
+
+        eventBus.localConsumer(ON_SERVER_START.address(), startMessage -> {
+            eventBus.localConsumer(ON_SIDECAR_SCHEMA_INITIALIZED.address(), 
message -> {
+                LOGGER.debug("Sidecar Schema initialized message={}", message);
+                Set<CqlTable> refreshedCdcTables = 
cassandraClusterSchemaMonitor.getCdcTables();
+                for (CqlTable cqlTable : refreshedCdcTables)
+                {
+                    TableIdentifier tableIdentifier = 
TableIdentifier.of(cqlTable.keyspace(), cqlTable.table());
+                    avroSchemasCache.compute(tableIdentifier, (k, v) ->
+                    {
+                        
tableHistoryDatabaseAccessor.insertTableSchemaHistory(cqlTable.keyspace(), 
cqlTable.table(), cqlTable.createStatement());
+                        return v;
+                    });
+                }
+                loadPublisher();
+                publishSchemas();
+            });
+        });
+    }
+
+    private void publishSchemas()
+    {
+        Set<CqlTable> refreshedCdcTables = 
cassandraClusterSchemaMonitor.getCdcTables();
+        for (CqlTable cqlTable : refreshedCdcTables)
+        {
+            TableIdentifier tableIdentifier = 
TableIdentifier.of(cqlTable.keyspace(), cqlTable.table());
+            avroSchemasCache.compute(tableIdentifier, (k, v) ->
+            {
+                if (null != publisher)
+                {
+                    Schema schema = cqlToAvroSchemaConverter.convert(cqlTable);
+                    TableSchemaPublisher.SchemaPublishMetadata metadata = new 
TableSchemaPublisher.SchemaPublishMetadata();
+                    metadata.put("name", cqlTable.table());
+                    metadata.put("namespace", cqlTable.keyspace());
+                    publisher.publishSchema(schema.toString(false), metadata);
+                    sidecarCdcStats.capturePublishedSchema();
+                }
+                return new SchemaCacheEntry(cqlTable, 
cqlToAvroSchemaConverter);
+            });
+        }
+    }
+
+    @VisibleForTesting
+    void onSchemaChanged()
+    {
+        Set<CqlTable> refreshedCdcTables = 
cassandraClusterSchemaMonitor.getCdcTables();
+        for (CqlTable cqlTable : refreshedCdcTables)
+        {
+            TableIdentifier tableIdentifier = 
TableIdentifier.of(cqlTable.keyspace(), cqlTable.table());
+            avroSchemasCache.compute(tableIdentifier, (k, v) ->
+            {
+                if (v == null || 
!v.tableSchema().equals(cqlTable.createStatement()))
+                {
+                    if (sidecarSchema.isInitialized())
+                    {
+                        
tableHistoryDatabaseAccessor.insertTableSchemaHistory(cqlTable.keyspace(), 
cqlTable.table(), cqlTable.createStatement());
+                    }
+                    LOGGER.info("Re-generating Avro Schema after schema change 
keyspace={} table={}", tableIdentifier.keyspace(), tableIdentifier.table());
+                    return new SchemaCacheEntry(cqlTable, 
cqlToAvroSchemaConverter);
+                }
+                return v;
+            });
+            loadPublisher();
+            publishSchemas();
+        }
+        // Remove any old schema entries for deleted tables, this operation 
can be done in the end as this is
+        // only for removing stale entries and no one is going to use these 
entries once the table is removed.
+        // This doesn't have to be an atomic operation.
+        
avroSchemasCache.keySet().retainAll(refreshedCdcTables.stream().map(cqlTable -> 
TableIdentifier.of(cqlTable.keyspace(), 
cqlTable.table())).collect(Collectors.toList()));
+        vertx.eventBus().publish(ON_CDC_CACHE_WARMED_UP.address(), "Cdc cache 
warmed up");
+    }
+
+    @Override
+    public Schema getSchema(String namespace, String name)
+    {
+        TableIdentifier tableIdentifier = 
getTableIdentifierFromNamespace(namespace);
+        return avroSchemasCache.computeIfAbsent(tableIdentifier, k ->
+        {
+            LOGGER.warn("Unknown table for getting schema keyspace={} 
table={}", tableIdentifier.keyspace(), tableIdentifier.table());
+            throw new RuntimeException("Unable to get schema for unknown table 
" + tableIdentifier);
+        }).schema;
+    }
+
+    @Override
+    public GenericDatumWriter<GenericRecord> getWriter(String namespace, 
String name)
+    {
+        TableIdentifier tableIdentifier = 
getTableIdentifierFromNamespace(namespace);
+        return avroSchemasCache.computeIfAbsent(tableIdentifier, k -> {
+            LOGGER.warn("Unknown table for getting writer keyspace={} 
table={}", tableIdentifier.keyspace(), tableIdentifier.table());
+            throw new RuntimeException("Unable to get writer for unknown table 
" + tableIdentifier);
+        }).writer;
+    }
+
+    @Override
+    public GenericDatumReader<GenericRecord> getReader(String namespace, 
String name)
+    {
+        TableIdentifier tableIdentifier = 
getTableIdentifierFromNamespace(namespace);
+        return avroSchemasCache.computeIfAbsent(tableIdentifier, k -> {
+            LOGGER.warn("Unknown table for getting reader keyspace={} 
table={}", tableIdentifier.keyspace(), tableIdentifier.table());
+            throw new RuntimeException("Unable to get reader for unknown table 
" + tableIdentifier);
+        }).reader;
+    }
+
+    @Override
+    public String getVersion(String namespace, String name)
+    {
+        TableIdentifier tableIdentifier = 
getTableIdentifierFromNamespace(namespace);
+        return avroSchemasCache.computeIfAbsent(tableIdentifier, k -> {
+            LOGGER.warn("Unknown table for getting reader keyspace={} 
table={}", tableIdentifier.keyspace(), tableIdentifier.table());
+            throw new RuntimeException("Unable to get reader for unknown table 
" + tableIdentifier);
+        }).schemaUuid;
+    }
+
+    public Map<String, Schema> getSchemas()
+    {
+        return avroSchemasCache.values().stream()
+                               .collect(Collectors.toMap(e -> 
e.schema.getNamespace(), e -> e.schema));
+    }

Review Comment:
   Unused method, do we need it in future patches?



##########
server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcPublisher.java:
##########
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.cdc;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import io.vertx.core.Handler;
+import io.vertx.core.Promise;
+import io.vertx.core.Vertx;
+import io.vertx.core.eventbus.Message;
+import org.apache.cassandra.cdc.CdcLogMode;
+import org.apache.cassandra.cdc.api.EventConsumer;
+import org.apache.cassandra.cdc.api.SchemaSupplier;
+import org.apache.cassandra.cdc.kafka.KafkaPublisher;
+import org.apache.cassandra.cdc.kafka.TopicSupplier;
+import org.apache.cassandra.cdc.msg.CdcEvent;
+import org.apache.cassandra.cdc.sidecar.CdcSidecarInstancesProvider;
+import org.apache.cassandra.cdc.sidecar.ClusterConfigProvider;
+import org.apache.cassandra.cdc.sidecar.SidecarCdc;
+import org.apache.cassandra.cdc.sidecar.SidecarCdcClient;
+import org.apache.cassandra.cdc.stats.ICdcStats;
+import org.apache.cassandra.secrets.SecretsProvider;
+import org.apache.cassandra.secrets.SslConfig;
+import org.apache.cassandra.secrets.SslConfigSecretsProvider;
+import org.apache.cassandra.sidecar.common.server.utils.DurationSpec;
+import 
org.apache.cassandra.sidecar.common.server.utils.MillisecondBoundConfiguration;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import org.apache.cassandra.sidecar.concurrent.TaskExecutorPool;
+import org.apache.cassandra.sidecar.config.KeyStoreConfiguration;
+import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+import org.apache.cassandra.sidecar.config.SslConfiguration;
+import org.apache.cassandra.sidecar.coordination.ContentionFreeRangeManager;
+import org.apache.cassandra.sidecar.coordination.RangeManager;
+import org.apache.cassandra.sidecar.coordination.TokenRingProvider;
+import org.apache.cassandra.sidecar.db.CdcDatabaseAccessor;
+import org.apache.cassandra.sidecar.db.VirtualTablesDatabaseAccessor;
+import org.apache.cassandra.sidecar.tasks.PeriodicTask;
+import org.apache.cassandra.sidecar.tasks.ScheduleDecision;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.common.serialization.Serializer;
+
+import static 
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_CDC_CACHE_WARMED_UP;
+import static 
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_CDC_CONFIGURATION_CHANGED;
+import static 
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_SERVER_STOP;
+
+/**
+ * Class that handles CDC life cycle
+ */
+@Singleton
+public class CdcPublisher implements Handler<Message<Object>>, PeriodicTask
+{
+    static final Logger LOGGER = LoggerFactory.getLogger(CdcPublisher.class);
+    static final long INITIALIZATION_LOOP_DELAY_MILLIS = 1000;
+    private final Vertx vertx;
+    private final TaskExecutorPool executorPools;
+    private final CdcConfig conf;
+    private volatile boolean isRunning = false;
+    private volatile boolean isInitialized = false;
+    private volatile boolean cdcCacheWarmedUp = false;
+    private final CdcDatabaseAccessor databaseAccessor;
+    private final VirtualTablesDatabaseAccessor virtualTables;
+    private final SidecarCdcStats sidecarCdcStats;
+    private RangeManager rangeManager;
+    private final TokenRingProvider tokenRingProvider;
+    private final SchemaSupplier schemaSupplier;
+    private final CdcSidecarInstancesProvider sidecarInstancesProvider;
+    private final InstanceMetadataFetcher instanceMetadataFetcher;
+    private final ClusterConfigProvider clusterConfigProvider;
+    private final SidecarCdcClient.ClientConfig clientConfig;
+    private final ICdcStats cdcStats;
+    private final SidecarConfiguration sidecarConfiguration;
+    private CdcManager cdcManager;
+    Serializer<CdcEvent> avroSerializer;
+
+    @Inject
+    public CdcPublisher(Vertx vertx,
+                        SidecarConfiguration sidecarConfiguration,
+                        ExecutorPools executorPools,
+                        ClusterConfigProvider clusterConfigProvider,
+                        SchemaSupplier schemaSupplier,
+                        CdcSidecarInstancesProvider sidecarInstancesProvider,
+                        SidecarCdcClient.ClientConfig clientConfig,
+                        InstanceMetadataFetcher instanceMetadataFetcher,
+                        CdcConfig conf,
+                        CdcDatabaseAccessor databaseAccessor,
+                        ICdcStats cdcStats,
+                        TokenRingProvider tokenRingProvider,
+                        VirtualTablesDatabaseAccessor virtualTables,
+                        SidecarCdcStats sidecarCdcStats,
+                        Serializer<CdcEvent> avroSerializer)
+    {
+        this.vertx = vertx;
+
+        this.sidecarCdcStats = sidecarCdcStats;
+        this.executorPools = executorPools.internal();
+        this.conf = conf;
+        this.databaseAccessor = databaseAccessor;
+        this.tokenRingProvider = tokenRingProvider;
+        this.virtualTables = virtualTables;
+
+        this.schemaSupplier = schemaSupplier;
+        this.sidecarInstancesProvider = sidecarInstancesProvider;
+        this.instanceMetadataFetcher = instanceMetadataFetcher;
+        this.clusterConfigProvider = clusterConfigProvider;
+        this.clientConfig = clientConfig;
+        this.cdcStats = cdcStats;
+        this.sidecarConfiguration = sidecarConfiguration;
+        this.avroSerializer = avroSerializer;
+        
vertx.eventBus().localConsumer(RangeManager.RangeManagerEvents.ON_TOKEN_RANGE_CHANGED.address(),
 this);
+        
vertx.eventBus().localConsumer(RangeManager.LeadershipEvents.ON_TOKEN_RANGE_GAINED.address(),
 this);
+        
vertx.eventBus().localConsumer(RangeManager.LeadershipEvents.ON_TOKEN_RANGE_LOST.address(),
 this);
+        vertx.eventBus().localConsumer(ON_SERVER_STOP.address(), this);
+        vertx.eventBus().localConsumer(ON_CDC_CACHE_WARMED_UP.address(), this);
+        vertx.eventBus().localConsumer(ON_CDC_CONFIGURATION_CHANGED.address(), 
new ConfigChangedHandler());
+    }
+
+    public SecretsProvider secretsProvider()
+    {
+        SslConfiguration sslConfiguration = 
sidecarConfiguration.sidecarClientConfiguration().sslConfiguration();
+
+        if (sslConfiguration == null || !sslConfiguration.enabled())
+        {
+            return null;
+        }
+
+        Map<String, String> sslConfigMap = new HashMap<>();
+
+        sslConfigMap.put("enabled", sslConfiguration.enabled() + "");
+        sslConfigMap.put("preferOpenSSL", sslConfiguration.preferOpenSSL() + 
"");
+        sslConfigMap.put("clientAuth", sslConfiguration.clientAuth());
+        sslConfigMap.put("cipherSuites", String.join(",", 
sslConfiguration.cipherSuites()));
+        sslConfigMap.put("secureTransportProtocols", String.join(",", 
sslConfiguration.secureTransportProtocols()));
+        sslConfigMap.put("handshakeTimeout", 
sslConfiguration.handshakeTimeout().toString());
+
+        if (sslConfiguration.isKeystoreConfigured())
+        {
+            KeyStoreConfiguration keystore = sslConfiguration.keystore();
+            sslConfigMap.put("keystorePath", keystore.path());
+            sslConfigMap.put("keystorePassword", keystore.password());
+            sslConfigMap.put("keystoreType", keystore.type());
+        }
+
+        if (sslConfiguration.isTrustStoreConfigured())
+        {
+            KeyStoreConfiguration truststore = sslConfiguration.truststore();
+            sslConfigMap.put("truststorePath", truststore.path());
+            sslConfigMap.put("truststorePassword", truststore.password());
+            sslConfigMap.put("truststoreType", truststore.type());
+        }
+
+        SslConfig sslConfig = SslConfig.create(sslConfigMap);
+        return new SslConfigSecretsProvider(sslConfig);
+    }
+
+    public EventConsumer eventConsumer(CdcConfig conf,
+                                       Serializer<CdcEvent> avroSerializer)
+    {
+        KafkaProducer<String, byte[]> producer = new 
KafkaProducer<>(conf.kafkaConfigs());
+        KafkaPublisher kafkaPublisher = new 
KafkaPublisher(TopicSupplier.staticTopicSupplier(conf.kafkaTopic()),
+                                                           producer,
+                                                           avroSerializer,
+                                                           
conf.maxRecordSizeBytes(),
+                                                           
conf.failOnRecordTooLargeError(),
+                                                           
conf.failOnKafkaError(),
+                                                           CdcLogMode.FULL);
+        return new CdcEventConsumer(kafkaPublisher);
+    }
+
+    private class ConfigChangedHandler implements Handler<Message<Object>>
+    {
+        public void handle(Message<Object> event)
+        {
+            sidecarCdcStats.captureCdcConfigChange();
+            // Execute restart on worker thread to avoid blocking event loop
+            executorPools.executeBlocking(() -> {
+                restart();
+                return null;
+            });
+        }
+    }
+
+    @SuppressWarnings("resource")
+    private synchronized void run() throws IllegalStateException
+    {
+        if (isRunning)
+        {
+            return;
+        }
+        databaseAccessor.session(); // throws IllegalStateException if session 
unavailable
+
+        cdcManager = new CdcManager(eventConsumer(conf, avroSerializer),
+                                    schemaSupplier,
+                                    conf,
+                                    rangeManager,
+                                    instanceMetadataFetcher,
+                                    clusterConfigProvider,
+                                    sidecarInstancesProvider,
+                                    secretsProvider(),
+                                    clientConfig,
+                                    cdcStats,
+                                    this.executorPools,
+                                    databaseAccessor);
+
+        List<SidecarCdc> consumers = cdcManager.buildCdcConsumers();
+        cdcManager.startConsumers();
+        LOGGER.info("{} CDC iterators started successfully", consumers.size());
+        isRunning = true;
+        sidecarCdcStats.captureCdcStarted(consumers.size());
+    }
+
+    protected synchronized void restart()
+    {
+        try
+        {
+            stop();
+            initialize();
+
+            LOGGER.info("Iterators restarted.");
+            sidecarCdcStats.captureCdcRestart();
+        }
+        catch (Exception e)
+        {
+            LOGGER.error("Failed to restart iterators", e);
+            sidecarCdcStats.captureCdcStartFailure(e);
+        }
+    }
+
+    public boolean isRunning()
+    {
+        return isRunning;
+    }
+
+    public synchronized void stop()
+    {
+        if (!isRunning)
+        {
+            return;
+        }
+
+        try
+        {
+            cdcManager.stopConsumers();
+            sidecarCdcStats.captureCdcStopped();
+        }
+        catch (Throwable t)
+        {
+            LOGGER.error("Failed to gracefully shutdown CDC", t);
+            sidecarCdcStats.captureCdcStopFailed(t);
+        }
+        finally
+        {
+            isRunning = false;
+            isInitialized = false;
+        }
+    }
+
+    private void initialize()
+    {
+        try
+        {
+            if (this.rangeManager == null)
+            {
+                this.rangeManager = new ContentionFreeRangeManager(vertx, 
tokenRingProvider);

Review Comment:
   Why are we hardcoding range manager to be `ContentionFreeRangeManager` 
shouldn't that be injected?
   
   `ContentionFreeRangeManager` always succeeds on propose and release 
ownership.



##########
server/src/main/java/org/apache/cassandra/sidecar/cdc/CachingSchemaStore.java:
##########
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.cdc;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import io.vertx.core.Vertx;
+import io.vertx.core.eventbus.EventBus;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+
+import org.apache.cassandra.cdc.avro.AvroSchemas;
+import org.apache.cassandra.cdc.avro.CqlToAvroSchemaConverter;
+import org.apache.cassandra.cdc.kafka.KafkaOptions;
+import org.apache.cassandra.cdc.schemastore.SchemaStore;
+import org.apache.cassandra.cdc.schemastore.SchemaStorePublisherFactory;
+import org.apache.cassandra.cdc.schemastore.TableSchemaPublisher;
+import org.apache.cassandra.sidecar.db.TableHistoryDatabaseAccessor;
+import org.apache.cassandra.sidecar.db.schema.SidecarSchema;
+import org.apache.cassandra.sidecar.tasks.CassandraClusterSchemaMonitor;
+import org.apache.cassandra.spark.data.CqlTable;
+import org.apache.cassandra.spark.utils.TableIdentifier;
+
+import static 
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_CDC_CACHE_WARMED_UP;
+import static 
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_SERVER_START;
+import static 
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_SIDECAR_SCHEMA_INITIALIZED;
+
+/**
+ * Schemas cache to be used by CDC event serialization.
+ */
+@Singleton
+public class CachingSchemaStore implements SchemaStore
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(CachingSchemaStore.class);
+    private final Map<TableIdentifier, SchemaCacheEntry> avroSchemasCache = 
new ConcurrentHashMap<>();
+    private final CassandraClusterSchemaMonitor cassandraClusterSchemaMonitor;
+    private final SidecarSchema sidecarSchema;
+    private final TableHistoryDatabaseAccessor tableHistoryDatabaseAccessor;
+    private final Vertx vertx;
+    private final CdcConfigImpl cdcConfig;
+    @Nullable volatile TableSchemaPublisher publisher;
+    private final CqlToAvroSchemaConverter cqlToAvroSchemaConverter;
+    private final SidecarCdcStats sidecarCdcStats;
+
+    @Inject
+    CachingSchemaStore(Vertx vertx,
+                       CassandraClusterSchemaMonitor 
cassandraClusterSchemaMonitor,
+                       TableHistoryDatabaseAccessor 
tableHistoryDatabaseAccessor,
+                       CdcConfigImpl cdcConfig,
+                       SidecarCdcStats sidecarCdcStats,
+                       SidecarSchema sidecarSchema,
+                       CqlToAvroSchemaConverter cqlToAvroSchemaConverter)
+    {
+        super();
+        this.cassandraClusterSchemaMonitor = cassandraClusterSchemaMonitor;
+        this.tableHistoryDatabaseAccessor = tableHistoryDatabaseAccessor;
+        this.sidecarSchema = sidecarSchema;
+        this.cqlToAvroSchemaConverter = cqlToAvroSchemaConverter;
+        
this.avroSchemasCache.putAll(createSchemaCache(cassandraClusterSchemaMonitor.getCdcTables()));
+        AvroSchemas.registerLogicalTypes();
+        
cassandraClusterSchemaMonitor.addSchemaChangeListener(this::onSchemaChanged);
+        this.vertx = vertx;
+        this.cdcConfig = cdcConfig;
+        this.sidecarCdcStats = sidecarCdcStats;
+
+        configureSidecarServerEventListeners();
+    }
+
+    private void loadPublisher()
+    {
+        KafkaOptions kafkaOptions = new KafkaOptions()
+        {
+            public Map<String, Object> kafkaConfigs()
+            {
+                return cdcConfig.kafkaConfigs();
+            }
+        };

Review Comment:
   Can this be lambda instead?



##########
server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcSchemaSupplier.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.cdc;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import com.google.inject.Singleton;
+import org.apache.cassandra.bridge.CassandraBridge;
+import org.apache.cassandra.bridge.CassandraBridgeFactory;
+import org.apache.cassandra.bridge.CdcBridge;
+import org.apache.cassandra.bridge.CdcBridgeFactory;
+import org.apache.cassandra.cdc.api.SchemaSupplier;
+import org.apache.cassandra.sidecar.common.response.NodeSettings;
+import org.apache.cassandra.sidecar.db.CdcDatabaseAccessor;
+import org.apache.cassandra.sidecar.utils.CdcUtil;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+import org.apache.cassandra.spark.data.CqlTable;
+import org.apache.cassandra.spark.data.ReplicationFactor;
+import org.apache.cassandra.spark.data.partitioner.Partitioner;
+import org.apache.cassandra.spark.utils.CqlUtils;
+import org.apache.cassandra.spark.utils.TableIdentifier;
+import org.jetbrains.annotations.NotNull;
+
+
+/**
+ * Class providing the schema for CDC

Review Comment:
   May be more descriptive documentation about the class?



##########
server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcPublisher.java:
##########
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.cdc;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import io.vertx.core.Handler;
+import io.vertx.core.Promise;
+import io.vertx.core.Vertx;
+import io.vertx.core.eventbus.Message;
+import org.apache.cassandra.cdc.CdcLogMode;
+import org.apache.cassandra.cdc.api.EventConsumer;
+import org.apache.cassandra.cdc.api.SchemaSupplier;
+import org.apache.cassandra.cdc.kafka.KafkaPublisher;
+import org.apache.cassandra.cdc.kafka.TopicSupplier;
+import org.apache.cassandra.cdc.msg.CdcEvent;
+import org.apache.cassandra.cdc.sidecar.CdcSidecarInstancesProvider;
+import org.apache.cassandra.cdc.sidecar.ClusterConfigProvider;
+import org.apache.cassandra.cdc.sidecar.SidecarCdc;
+import org.apache.cassandra.cdc.sidecar.SidecarCdcClient;
+import org.apache.cassandra.cdc.stats.ICdcStats;
+import org.apache.cassandra.secrets.SecretsProvider;
+import org.apache.cassandra.secrets.SslConfig;
+import org.apache.cassandra.secrets.SslConfigSecretsProvider;
+import org.apache.cassandra.sidecar.common.server.utils.DurationSpec;
+import 
org.apache.cassandra.sidecar.common.server.utils.MillisecondBoundConfiguration;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import org.apache.cassandra.sidecar.concurrent.TaskExecutorPool;
+import org.apache.cassandra.sidecar.config.KeyStoreConfiguration;
+import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+import org.apache.cassandra.sidecar.config.SslConfiguration;
+import org.apache.cassandra.sidecar.coordination.ContentionFreeRangeManager;
+import org.apache.cassandra.sidecar.coordination.RangeManager;
+import org.apache.cassandra.sidecar.coordination.TokenRingProvider;
+import org.apache.cassandra.sidecar.db.CdcDatabaseAccessor;
+import org.apache.cassandra.sidecar.db.VirtualTablesDatabaseAccessor;
+import org.apache.cassandra.sidecar.tasks.PeriodicTask;
+import org.apache.cassandra.sidecar.tasks.ScheduleDecision;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.common.serialization.Serializer;
+
+import static 
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_CDC_CACHE_WARMED_UP;
+import static 
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_CDC_CONFIGURATION_CHANGED;
+import static 
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_SERVER_STOP;
+
+/**
+ * Class that handles CDC life cycle
+ */
+@Singleton
+public class CdcPublisher implements Handler<Message<Object>>, PeriodicTask
+{
+    static final Logger LOGGER = LoggerFactory.getLogger(CdcPublisher.class);
+    static final long INITIALIZATION_LOOP_DELAY_MILLIS = 1000;
+    private final Vertx vertx;
+    private final TaskExecutorPool executorPools;
+    private final CdcConfig conf;
+    private volatile boolean isRunning = false;
+    private volatile boolean isInitialized = false;
+    private volatile boolean cdcCacheWarmedUp = false;
+    private final CdcDatabaseAccessor databaseAccessor;
+    private final VirtualTablesDatabaseAccessor virtualTables;
+    private final SidecarCdcStats sidecarCdcStats;
+    private RangeManager rangeManager;
+    private final TokenRingProvider tokenRingProvider;
+    private final SchemaSupplier schemaSupplier;
+    private final CdcSidecarInstancesProvider sidecarInstancesProvider;
+    private final InstanceMetadataFetcher instanceMetadataFetcher;
+    private final ClusterConfigProvider clusterConfigProvider;
+    private final SidecarCdcClient.ClientConfig clientConfig;
+    private final ICdcStats cdcStats;
+    private final SidecarConfiguration sidecarConfiguration;
+    private CdcManager cdcManager;
+    Serializer<CdcEvent> avroSerializer;
+
+    @Inject
+    public CdcPublisher(Vertx vertx,
+                        SidecarConfiguration sidecarConfiguration,
+                        ExecutorPools executorPools,
+                        ClusterConfigProvider clusterConfigProvider,
+                        SchemaSupplier schemaSupplier,
+                        CdcSidecarInstancesProvider sidecarInstancesProvider,
+                        SidecarCdcClient.ClientConfig clientConfig,
+                        InstanceMetadataFetcher instanceMetadataFetcher,
+                        CdcConfig conf,
+                        CdcDatabaseAccessor databaseAccessor,
+                        ICdcStats cdcStats,
+                        TokenRingProvider tokenRingProvider,
+                        VirtualTablesDatabaseAccessor virtualTables,
+                        SidecarCdcStats sidecarCdcStats,
+                        Serializer<CdcEvent> avroSerializer)
+    {
+        this.vertx = vertx;
+
+        this.sidecarCdcStats = sidecarCdcStats;
+        this.executorPools = executorPools.internal();
+        this.conf = conf;
+        this.databaseAccessor = databaseAccessor;
+        this.tokenRingProvider = tokenRingProvider;
+        this.virtualTables = virtualTables;
+
+        this.schemaSupplier = schemaSupplier;
+        this.sidecarInstancesProvider = sidecarInstancesProvider;
+        this.instanceMetadataFetcher = instanceMetadataFetcher;
+        this.clusterConfigProvider = clusterConfigProvider;
+        this.clientConfig = clientConfig;
+        this.cdcStats = cdcStats;
+        this.sidecarConfiguration = sidecarConfiguration;
+        this.avroSerializer = avroSerializer;
+        
vertx.eventBus().localConsumer(RangeManager.RangeManagerEvents.ON_TOKEN_RANGE_CHANGED.address(),
 this);
+        
vertx.eventBus().localConsumer(RangeManager.LeadershipEvents.ON_TOKEN_RANGE_GAINED.address(),
 this);
+        
vertx.eventBus().localConsumer(RangeManager.LeadershipEvents.ON_TOKEN_RANGE_LOST.address(),
 this);
+        vertx.eventBus().localConsumer(ON_SERVER_STOP.address(), this);
+        vertx.eventBus().localConsumer(ON_CDC_CACHE_WARMED_UP.address(), this);
+        vertx.eventBus().localConsumer(ON_CDC_CONFIGURATION_CHANGED.address(), 
new ConfigChangedHandler());
+    }
+
+    public SecretsProvider secretsProvider()
+    {
+        SslConfiguration sslConfiguration = 
sidecarConfiguration.sidecarClientConfiguration().sslConfiguration();
+
+        if (sslConfiguration == null || !sslConfiguration.enabled())
+        {
+            return null;
+        }
+
+        Map<String, String> sslConfigMap = new HashMap<>();
+
+        sslConfigMap.put("enabled", sslConfiguration.enabled() + "");
+        sslConfigMap.put("preferOpenSSL", sslConfiguration.preferOpenSSL() + 
"");
+        sslConfigMap.put("clientAuth", sslConfiguration.clientAuth());
+        sslConfigMap.put("cipherSuites", String.join(",", 
sslConfiguration.cipherSuites()));
+        sslConfigMap.put("secureTransportProtocols", String.join(",", 
sslConfiguration.secureTransportProtocols()));
+        sslConfigMap.put("handshakeTimeout", 
sslConfiguration.handshakeTimeout().toString());

Review Comment:
   Should we make these and other keys constants?



##########
server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcAvroSerializer.java:
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.cdc;
+
+import org.apache.cassandra.bridge.CassandraBridgeFactory;
+import org.apache.cassandra.cdc.TypeCache;
+import org.apache.cassandra.cdc.kafka.AvroGenericRecordSerializer;
+import org.apache.cassandra.cdc.schemastore.SchemaStore;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+
+/**
+ * Taking a schema, this class serializes a CDC into AVRO format

Review Comment:
   May be better description for the class, how about the following?
   
   > Serializer to convert Cassandra CDC events into Avro GenericRecord objects.



##########
server/src/main/java/org/apache/cassandra/sidecar/cdc/CachingSchemaStore.java:
##########
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.cdc;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import io.vertx.core.Vertx;
+import io.vertx.core.eventbus.EventBus;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+
+import org.apache.cassandra.cdc.avro.AvroSchemas;
+import org.apache.cassandra.cdc.avro.CqlToAvroSchemaConverter;
+import org.apache.cassandra.cdc.kafka.KafkaOptions;
+import org.apache.cassandra.cdc.schemastore.SchemaStore;
+import org.apache.cassandra.cdc.schemastore.SchemaStorePublisherFactory;
+import org.apache.cassandra.cdc.schemastore.TableSchemaPublisher;
+import org.apache.cassandra.sidecar.db.TableHistoryDatabaseAccessor;
+import org.apache.cassandra.sidecar.db.schema.SidecarSchema;
+import org.apache.cassandra.sidecar.tasks.CassandraClusterSchemaMonitor;
+import org.apache.cassandra.spark.data.CqlTable;
+import org.apache.cassandra.spark.utils.TableIdentifier;
+
+import static 
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_CDC_CACHE_WARMED_UP;
+import static 
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_SERVER_START;
+import static 
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_SIDECAR_SCHEMA_INITIALIZED;
+
+/**
+ * Schemas cache to be used by CDC event serialization.
+ */
+@Singleton
+public class CachingSchemaStore implements SchemaStore
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(CachingSchemaStore.class);
+    private final Map<TableIdentifier, SchemaCacheEntry> avroSchemasCache = 
new ConcurrentHashMap<>();
+    private final CassandraClusterSchemaMonitor cassandraClusterSchemaMonitor;
+    private final SidecarSchema sidecarSchema;
+    private final TableHistoryDatabaseAccessor tableHistoryDatabaseAccessor;
+    private final Vertx vertx;
+    private final CdcConfigImpl cdcConfig;
+    @Nullable volatile TableSchemaPublisher publisher;
+    private final CqlToAvroSchemaConverter cqlToAvroSchemaConverter;
+    private final SidecarCdcStats sidecarCdcStats;
+
+    @Inject
+    CachingSchemaStore(Vertx vertx,
+                       CassandraClusterSchemaMonitor 
cassandraClusterSchemaMonitor,
+                       TableHistoryDatabaseAccessor 
tableHistoryDatabaseAccessor,
+                       CdcConfigImpl cdcConfig,
+                       SidecarCdcStats sidecarCdcStats,
+                       SidecarSchema sidecarSchema,
+                       CqlToAvroSchemaConverter cqlToAvroSchemaConverter)
+    {
+        super();
+        this.cassandraClusterSchemaMonitor = cassandraClusterSchemaMonitor;
+        this.tableHistoryDatabaseAccessor = tableHistoryDatabaseAccessor;
+        this.sidecarSchema = sidecarSchema;
+        this.cqlToAvroSchemaConverter = cqlToAvroSchemaConverter;
+        
this.avroSchemasCache.putAll(createSchemaCache(cassandraClusterSchemaMonitor.getCdcTables()));
+        AvroSchemas.registerLogicalTypes();
+        
cassandraClusterSchemaMonitor.addSchemaChangeListener(this::onSchemaChanged);
+        this.vertx = vertx;
+        this.cdcConfig = cdcConfig;
+        this.sidecarCdcStats = sidecarCdcStats;
+
+        configureSidecarServerEventListeners();
+    }
+
+    private void loadPublisher()
+    {
+        KafkaOptions kafkaOptions = new KafkaOptions()
+        {
+            public Map<String, Object> kafkaConfigs()
+            {
+                return cdcConfig.kafkaConfigs();
+            }
+        };
+        this.publisher = 
SchemaStorePublisherFactory.DEFAULT.buildPublisher(kafkaOptions);
+    }
+
+    private void configureSidecarServerEventListeners()
+    {
+        EventBus eventBus = vertx.eventBus();
+
+        eventBus.localConsumer(ON_SERVER_START.address(), startMessage -> {
+            eventBus.localConsumer(ON_SIDECAR_SCHEMA_INITIALIZED.address(), 
message -> {
+                LOGGER.debug("Sidecar Schema initialized message={}", message);
+                Set<CqlTable> refreshedCdcTables = 
cassandraClusterSchemaMonitor.getCdcTables();
+                for (CqlTable cqlTable : refreshedCdcTables)
+                {
+                    TableIdentifier tableIdentifier = 
TableIdentifier.of(cqlTable.keyspace(), cqlTable.table());
+                    avroSchemasCache.compute(tableIdentifier, (k, v) ->
+                    {
+                        
tableHistoryDatabaseAccessor.insertTableSchemaHistory(cqlTable.keyspace(), 
cqlTable.table(), cqlTable.createStatement());
+                        return v;
+                    });
+                }
+                loadPublisher();
+                publishSchemas();
+            });
+        });
+    }
+
+    private void publishSchemas()
+    {
+        Set<CqlTable> refreshedCdcTables = 
cassandraClusterSchemaMonitor.getCdcTables();
+        for (CqlTable cqlTable : refreshedCdcTables)
+        {
+            TableIdentifier tableIdentifier = 
TableIdentifier.of(cqlTable.keyspace(), cqlTable.table());
+            avroSchemasCache.compute(tableIdentifier, (k, v) ->
+            {
+                if (null != publisher)
+                {
+                    Schema schema = cqlToAvroSchemaConverter.convert(cqlTable);
+                    TableSchemaPublisher.SchemaPublishMetadata metadata = new 
TableSchemaPublisher.SchemaPublishMetadata();
+                    metadata.put("name", cqlTable.table());
+                    metadata.put("namespace", cqlTable.keyspace());

Review Comment:
   Can we have constants for the keys?



##########
server/src/main/java/org/apache/cassandra/sidecar/coordination/RangeManager.java:
##########
@@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.coordination;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.vertx.core.Future;
+import io.vertx.core.Handler;
+import io.vertx.core.Vertx;
+import io.vertx.core.eventbus.Message;
+import org.apache.cassandra.sidecar.client.SidecarInstance;
+import org.apache.cassandra.sidecar.codecs.RangeChangeEventCodec;
+import org.apache.cassandra.sidecar.common.server.cluster.locator.TokenRange;
+import org.apache.cassandra.spark.utils.Pair;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import static 
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_CASSANDRA_CQL_READY;
+import static 
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_SIDECAR_PEER_DOWN;
+import static 
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_SIDECAR_PEER_UP;
+import static 
org.apache.cassandra.sidecar.tasks.ClusterTopologyMonitor.ClusterTopologyEventType.ON_DC_TOPOLOGY_CHANGE;
+
+/**
+ * This class manages the token ranges owned by this Sidecar instance and 
listens on the DownDetector for other Sidecar instances going up/down.
+ * The underlying implementation can implement a consensus algorithm to 
provide strong guarantees around gaining/releasing token range ownership.
+ */
+public abstract class RangeManager implements Handler<Message<Object>>
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(RangeManager.class);
+
+    /**
+     * Events published when token range ownership changes.
+     */
+    public enum LeadershipEvents
+    {
+        ON_TOKEN_RANGE_GAINED,
+        ON_TOKEN_RANGE_LOST;
+
+        /**
+         * Returns the event bus address for this leadership event type.
+         */
+        public String address()
+        {
+            return LeadershipEvents.class.getName() + "." + name();
+        }
+    }
+
+    /**
+     * Events published when the token ring topology changes.
+     */
+    public enum RangeManagerEvents
+    {
+        ON_TOKEN_RANGE_CHANGED;
+
+        /**
+         * Returns the event bus address for this range manager event type.
+         */
+        public String address()
+        {
+            return RangeManagerEvents.class.getName() + "." + name();
+        }
+    }
+
+    /**
+     * Event data containing information about token range ownership changes.
+     */
+    public static class RangeChangeEvent
+    {
+        public final ImmutableMap<String, Set<TokenRange>> change; // ranges 
gained/lost by this change
+        public final ImmutableMap<String, Set<TokenRange>> newView; // new 
view of the ring after this change
+
+        /**
+         * Creates a new range change event with the specified changes and new 
view.
+         */
+        public RangeChangeEvent(Map<String, Set<TokenRange>> change, 
Map<String, Set<TokenRange>> newView)
+        {
+            this.change = ImmutableMap.copyOf(change);
+            this.newView = ImmutableMap.copyOf(newView);
+        }
+
+        public boolean equals(Object o)
+        {
+            if (this == o)
+            {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass())
+            {
+                return false;
+            }
+
+            RangeChangeEvent event = (RangeChangeEvent) o;
+            return Objects.equals(change, event.change) && 
Objects.equals(newView, event.newView);
+        }
+
+        public int hashCode()
+        {
+            return Objects.hash(change, newView);
+        }
+    }
+
+    protected final Vertx vertx;
+    protected final TokenRingProvider tokenRingProvider;
+    @Nullable
+    protected volatile ImmutableMap<String, Set<TokenRange>> ownedRanges = 
null;
+
+    public RangeManager(Vertx vertx, TokenRingProvider tokenRingProvider)
+    {
+        this.vertx = vertx;
+        this.tokenRingProvider = tokenRingProvider;
+        initializeRanges();
+        try
+        {
+            vertx.eventBus().registerDefaultCodec(RangeChangeEvent.class, 
RangeChangeEventCodec.INSTANCE);
+        }
+        catch (IllegalStateException ise)
+        {
+            if (ise.getMessage().contains("Already a default codec 
registered"))
+            {
+                // TODO: do not register codec in the constructor when OSS; 
All codecs should be managed in a central repository
+                // ignore
+            }
+            else
+            {
+                throw ise;
+            }
+        }
+        vertx.eventBus().localConsumer(ON_CASSANDRA_CQL_READY.address(), this);
+        vertx.eventBus().localConsumer(ON_SIDECAR_PEER_UP.address(), this);
+        vertx.eventBus().localConsumer(ON_SIDECAR_PEER_DOWN.address(), this);
+        vertx.eventBus().localConsumer(ON_DC_TOPOLOGY_CHANGE.address(), this);
+    }
+
+    /**
+     * Returns the token ranges currently owned by this Sidecar instance.
+     */
+    @Nullable
+    public Map<String, Set<TokenRange>> ownedTokenRanges()
+    {
+        return this.ownedRanges;
+    }
+
+    // Sidecar coordination
+
+    /**
+     * @param primaryOwner SidecarInstance that is the primary owner for the 
token ranges.
+     * @param ranges       token ranges this SidecarInstance wishes to gain 
ownership.
+     * @return a future that completes when propose request completes, 
returning the ranges that were successfully gained as part of the request.
+     */
+    /**
+     * Proposes to gain ownership of the specified token ranges.
+     */
+    abstract Future<Boolean> proposeOwnership(SidecarInstance primaryOwner, 
Map<String, Set<TokenRange>> ranges);
+
+    /**
+     * @param primaryOwner SidecarInstance that is the primary owner for the 
token ranges.
+     * @param ranges       token ranges this SidecarInstance wishes to release.
+     * @return a future that completes when release request completes, 
returning the ranges that were released as part of the request.
+     */
+    /**
+     * Proposes to release ownership of the specified token ranges.
+     */
+    abstract Future<Boolean> releaseOwnership(SidecarInstance primaryOwner, 
Map<String, Set<TokenRange>> ranges);
+
+    // DownDetector notification handler
+
+    /**
+     * Handles event bus messages for peer up/down and topology changes.
+     */
+    @Override
+    public void handle(Message<Object> msg)
+    {
+        if (ON_SIDECAR_PEER_UP.address().equals(msg.address()))
+        {
+            onSidecarUp((SidecarInstance) msg.body());
+        }
+        else if (ON_SIDECAR_PEER_DOWN.address().equals(msg.address()))
+        {
+            onSidecarDown((SidecarInstance) msg.body());
+        }
+        else if (ON_CASSANDRA_CQL_READY.address().equals(msg.address()) || 
ON_DC_TOPOLOGY_CHANGE.address().equals(msg.address()))
+        {
+            initializeRanges();
+        }
+    }
+
+    /**
+     * Initializes owned token ranges from the token ring provider.
+     */
+    protected synchronized void initializeRanges()
+    {
+        LOGGER.info("Initializing ranges");
+        final String dc = getLocalDcSafe();
+        if (dc == null)
+        {
+            LOGGER.error("DC null, cannot initialize owned ranges");
+            return;
+        }
+        ImmutableMap<String, Set<TokenRange>> newRanges = 
ImmutableMap.copyOf(toSet(tokenRingProvider.getPrimaryTokenRanges(dc)));
+        if (!newRanges.equals(this.ownedRanges))
+        {
+            this.ownedRanges = newRanges;
+            
vertx.eventBus().publish(RangeManagerEvents.ON_TOKEN_RANGE_CHANGED.address(), 
null);
+        }
+    }
+
+    /**
+     * Handles a Sidecar instance coming online by releasing its primary 
ranges.
+     */
+    protected synchronized void onSidecarUp(SidecarInstance instance)
+    {
+        // on Sidecar detected to be UP, initiate proposal to release 
ownership of its primary ranges
+        final String dc = getLocalDcSafe();
+        if (dc == null)
+        {
+            LOGGER.error("DC null, cannot respond to onSidecarUp event");
+            return;
+        }
+        Map<String, Set<TokenRange>> primaryRanges = 
toSet(tokenRingProvider.getPrimaryRanges(instance, dc));

Review Comment:
   Why is sidecar releasing ownership if the peer is up? Don't we run into 
situation where all sidecar instances release ownership and there will be no 
owner for the token range?



##########
server/src/main/java/org/apache/cassandra/sidecar/tasks/CdcConfigRefresherNotifierTask.java:
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.tasks;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import io.vertx.core.Promise;
+import io.vertx.core.Vertx;
+import org.apache.cassandra.sidecar.codecs.CdcConfigMappingsCodec;
+import org.apache.cassandra.sidecar.common.server.ThrowingRunnable;
+import org.apache.cassandra.sidecar.common.server.utils.DurationSpec;
+import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+import org.apache.cassandra.sidecar.db.CdcConfigAccessor;
+import org.apache.cassandra.sidecar.db.KafkaConfigAccessor;
+import org.apache.cassandra.sidecar.utils.EventBusUtils;
+
+import static 
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_CDC_CONFIG_MAPPINGS_CHANGED;
+import static 
org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_SIDECAR_SCHEMA_INITIALIZED;
+
+/**
+ * Periodic task notifying if the CDC config changed
+ */
+@Singleton
+public class CdcConfigRefresherNotifierTask implements PeriodicTask
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(CdcConfigRefresherNotifierTask.class);
+    private final SidecarConfiguration sidecarConfiguration;
+    private final CdcConfigAccessor cdcConfigAccessor;
+    private final KafkaConfigAccessor kafkaConfigAccessor;
+    private final Vertx vertx;
+    private final List<ThrowingRunnable> configChangeListeners = 
Collections.synchronizedList(new ArrayList<>());
+    public ConfigMappings configMappings = new ConfigMappings();

Review Comment:
   can be private



##########
server/src/main/java/org/apache/cassandra/sidecar/coordination/TokenRingProvider.java:
##########
@@ -68,7 +67,7 @@ public TokenRingProvider(InstancesMetadata instancesMetadata, 
InstanceMetadataFe
      * @param dc          data center
      * @return map of token ranges per Cassandra instance IP
      */
-    protected abstract Map<String, List<Range<BigInteger>>> 
getAllTokenRanges(Partitioner partitioner, @Nullable String dc);
+    protected abstract Map<String, List<TokenRange>> 
getAllTokenRanges(Partitioner partitioner, @Nullable String dc);

Review Comment:
   Wondering why didn't we use `TokenRange` earlier, seems like we have that 
class available from before



##########
server/src/main/java/org/apache/cassandra/sidecar/db/TableHistoryDatabaseAccessor.java:
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.sidecar.db;
+
+import java.nio.charset.StandardCharsets;
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Row;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import org.apache.cassandra.sidecar.common.server.CQLSessionProvider;
+import org.apache.cassandra.sidecar.db.schema.SidecarSchema;
+import org.apache.cassandra.sidecar.db.schema.TableHistorySchema;
+
+/**
+ * Database accessor for Table History operations.
+ */
+@SuppressWarnings("resource")
+@Singleton
+public class TableHistoryDatabaseAccessor extends 
DatabaseAccessor<TableHistorySchema>
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(TableHistoryDatabaseAccessor.class);

Review Comment:
   Unused logger.



##########
server/src/main/java/org/apache/cassandra/sidecar/tasks/ClusterTopologyMonitor.java:
##########
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.tasks;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import io.vertx.core.Promise;
+import io.vertx.core.Vertx;
+import org.apache.cassandra.sidecar.codecs.DcLocalTopologyChangeEventCodec;
+
+import org.apache.cassandra.sidecar.common.server.cluster.locator.TokenRange;
+import org.apache.cassandra.sidecar.common.server.utils.DurationSpec;
+import 
org.apache.cassandra.sidecar.common.server.utils.MillisecondBoundConfiguration;
+import org.apache.cassandra.sidecar.coordination.TokenRingProvider;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Periodic task that monitors Cassandra cluster topology changes and 
publishes events when detected.
+ */
+@Singleton
+public class ClusterTopologyMonitor implements PeriodicTask
+{
+    /**
+     * Event types published when cluster topology changes are detected.
+     */
+    public enum ClusterTopologyEventType
+    {
+        ON_DC_TOPOLOGY_CHANGE;
+
+        /**
+         * Returns the event bus address for this event type.
+         */
+        public String address()
+        {
+            return ClusterTopologyMonitor.class.getName() + "." + name();
+        }
+    }
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(ClusterTopologyMonitor.class);
+    private final Vertx vertx;
+    private final TokenRingProvider tokenRingProvider;
+
+    private final ConcurrentHashMap<String, Map<String, List<TokenRange>>> 
perDcRanges = new ConcurrentHashMap<>(4);
+
+    /**
+     * Event data containing datacenter-local topology change information.
+     */
+    public static class DcLocalTopologyChangeEvent
+    {
+        public final String dc;
+        @Nullable
+        public final Map<String, List<TokenRange>> prev;
+        @NotNull
+        public final Map<String, List<TokenRange>> curr;
+
+        /**
+         * Creates a new datacenter topology change event.
+         */
+        public DcLocalTopologyChangeEvent(String dc,
+                                          @Nullable Map<String, 
List<TokenRange>> prev,
+                                          @NotNull Map<String, 
List<TokenRange>> curr)
+        {
+            this.dc = dc;
+            this.prev = prev;
+            this.curr = curr;
+        }
+
+        /**
+         * Factory method to create a topology change event with instance ID 
keyed maps.
+         */
+        public static DcLocalTopologyChangeEvent of(String dc,
+                                                    @Nullable Map<String, 
List<TokenRange>> prev,
+                                                    @NotNull Map<String, 
List<TokenRange>> curr)
+        {
+            return new DcLocalTopologyChangeEvent(dc, keyByInstanceId(prev), 
keyByInstanceId(curr));
+        }
+
+        private static Map<String, List<TokenRange>> keyByInstanceId(@Nullable 
Map<String, List<TokenRange>> map)
+        {
+            return map == null ? null : map.entrySet().stream()
+                                           
.collect(Collectors.toUnmodifiableMap(Map.Entry::getKey,
+                                                                               
  Map.Entry::getValue));
+        }
+
+        public boolean equals(Object o)
+        {
+            if (this == o)
+            {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass())
+            {
+                return false;
+            }
+
+            DcLocalTopologyChangeEvent that = (DcLocalTopologyChangeEvent) o;
+            return Objects.equals(dc, that.dc) &&
+                   Objects.equals(prev, that.prev) &&
+                   curr.equals(that.curr);
+        }
+
+        public int hashCode()
+        {
+            return Objects.hash(dc, prev, curr);
+        }
+    }
+
+    /**
+     * Creates a cluster topology monitor with periodic task scheduling.
+     */
+    @Inject
+    public ClusterTopologyMonitor(Vertx vertx, TokenRingProvider 
tokenRingProvider, PeriodicTaskExecutor periodicTaskExecutor)
+    {
+        this(vertx, tokenRingProvider);
+        LOGGER.info("Starting Cluster Topology Monitor");
+        periodicTaskExecutor.schedule(this);
+        LOGGER.info("Cluster Topology Monitor started");
+    }
+
+    /**
+     * Creates a cluster topology monitor without scheduling.
+     */
+    public ClusterTopologyMonitor(Vertx vertx, TokenRingProvider 
tokenRingProvider)
+    {
+        this.vertx = vertx;
+        this.tokenRingProvider = tokenRingProvider;
+        try
+        {
+            
vertx.eventBus().registerDefaultCodec(DcLocalTopologyChangeEvent.class, 
DcLocalTopologyChangeEventCodec.INSTANCE);
+        }
+        catch (IllegalStateException ise)
+        {
+            if (ise.getMessage().contains("Already a default codec 
registered"))
+            {
+                // TODO: do not register codec in the constructor when OSS; 
All codecs should be managed in a central repository
+                // ignore
+            }
+            else
+            {
+                throw ise;
+            }
+        }
+    }
+
+    /**
+     * Returns the delay between topology refresh cycles.
+     */
+    @Override
+    public DurationSpec delay()
+    {
+//        return 
MillisecondBoundConfiguration.parse(sidecarConfiguration.serviceConfiguration().refreshClusterTopology());

Review Comment:
   Can you please remove the commented code.



##########
server/src/main/java/org/apache/cassandra/sidecar/tasks/ClusterTopologyMonitor.java:
##########
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.tasks;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import io.vertx.core.Promise;
+import io.vertx.core.Vertx;
+import org.apache.cassandra.sidecar.codecs.DcLocalTopologyChangeEventCodec;
+
+import org.apache.cassandra.sidecar.common.server.cluster.locator.TokenRange;
+import org.apache.cassandra.sidecar.common.server.utils.DurationSpec;
+import 
org.apache.cassandra.sidecar.common.server.utils.MillisecondBoundConfiguration;
+import org.apache.cassandra.sidecar.coordination.TokenRingProvider;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Periodic task that monitors Cassandra cluster topology changes and 
publishes events when detected.
+ */
+@Singleton
+public class ClusterTopologyMonitor implements PeriodicTask
+{
+    /**
+     * Event types published when cluster topology changes are detected.
+     */
+    public enum ClusterTopologyEventType
+    {
+        ON_DC_TOPOLOGY_CHANGE;
+
+        /**
+         * Returns the event bus address for this event type.
+         */
+        public String address()
+        {
+            return ClusterTopologyMonitor.class.getName() + "." + name();
+        }
+    }
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(ClusterTopologyMonitor.class);
+    private final Vertx vertx;
+    private final TokenRingProvider tokenRingProvider;
+
+    private final ConcurrentHashMap<String, Map<String, List<TokenRange>>> 
perDcRanges = new ConcurrentHashMap<>(4);
+
+    /**
+     * Event data containing datacenter-local topology change information.
+     */
+    public static class DcLocalTopologyChangeEvent
+    {
+        public final String dc;
+        @Nullable
+        public final Map<String, List<TokenRange>> prev;
+        @NotNull
+        public final Map<String, List<TokenRange>> curr;
+
+        /**
+         * Creates a new datacenter topology change event.
+         */
+        public DcLocalTopologyChangeEvent(String dc,
+                                          @Nullable Map<String, 
List<TokenRange>> prev,
+                                          @NotNull Map<String, 
List<TokenRange>> curr)
+        {
+            this.dc = dc;
+            this.prev = prev;
+            this.curr = curr;
+        }
+
+        /**
+         * Factory method to create a topology change event with instance ID 
keyed maps.
+         */
+        public static DcLocalTopologyChangeEvent of(String dc,
+                                                    @Nullable Map<String, 
List<TokenRange>> prev,
+                                                    @NotNull Map<String, 
List<TokenRange>> curr)
+        {
+            return new DcLocalTopologyChangeEvent(dc, keyByInstanceId(prev), 
keyByInstanceId(curr));
+        }
+
+        private static Map<String, List<TokenRange>> keyByInstanceId(@Nullable 
Map<String, List<TokenRange>> map)
+        {
+            return map == null ? null : map.entrySet().stream()
+                                           
.collect(Collectors.toUnmodifiableMap(Map.Entry::getKey,
+                                                                               
  Map.Entry::getValue));
+        }
+
+        public boolean equals(Object o)
+        {
+            if (this == o)
+            {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass())
+            {
+                return false;
+            }
+
+            DcLocalTopologyChangeEvent that = (DcLocalTopologyChangeEvent) o;
+            return Objects.equals(dc, that.dc) &&
+                   Objects.equals(prev, that.prev) &&
+                   curr.equals(that.curr);
+        }
+
+        public int hashCode()
+        {
+            return Objects.hash(dc, prev, curr);
+        }
+    }
+
+    /**
+     * Creates a cluster topology monitor with periodic task scheduling.
+     */
+    @Inject
+    public ClusterTopologyMonitor(Vertx vertx, TokenRingProvider 
tokenRingProvider, PeriodicTaskExecutor periodicTaskExecutor)
+    {
+        this(vertx, tokenRingProvider);
+        LOGGER.info("Starting Cluster Topology Monitor");
+        periodicTaskExecutor.schedule(this);
+        LOGGER.info("Cluster Topology Monitor started");
+    }
+
+    /**
+     * Creates a cluster topology monitor without scheduling.
+     */
+    public ClusterTopologyMonitor(Vertx vertx, TokenRingProvider 
tokenRingProvider)
+    {
+        this.vertx = vertx;
+        this.tokenRingProvider = tokenRingProvider;
+        try
+        {
+            
vertx.eventBus().registerDefaultCodec(DcLocalTopologyChangeEvent.class, 
DcLocalTopologyChangeEventCodec.INSTANCE);
+        }
+        catch (IllegalStateException ise)
+        {
+            if (ise.getMessage().contains("Already a default codec 
registered"))
+            {
+                // TODO: do not register codec in the constructor when OSS; 
All codecs should be managed in a central repository
+                // ignore
+            }
+            else
+            {
+                throw ise;
+            }
+        }
+    }
+
+    /**
+     * Returns the delay between topology refresh cycles.
+     */
+    @Override
+    public DurationSpec delay()
+    {
+//        return 
MillisecondBoundConfiguration.parse(sidecarConfiguration.serviceConfiguration().refreshClusterTopology());
+        return MillisecondBoundConfiguration.parse("1000");
+    }
+
+    /**
+     * Executes the topology monitoring task.
+     */
+    @Override
+    public void execute(Promise<Void> promise)
+    {
+        try
+        {
+            refresh();
+        }
+        catch (Exception e)
+        {
+            LOGGER.error("Unexpected error in ClusterTopologyMonitor", e);
+        }
+        finally
+        {
+            promise.complete();
+        }
+    }
+
+    /**
+     * Refreshes topology information for all datacenters and publishes change 
events.
+     */
+    protected void refresh()
+    {
+        for (String dc : tokenRingProvider.dcs())
+        {
+            @Nullable final Map<String, List<TokenRange>> prev = 
perDcRanges.get(dc);

Review Comment:
   We might not need `@nullable` for variable and  also noticed that other 
classes do not have final for local variables but this class seems to have 
final for local variable. Please remove them for consistency across this file.



##########
server/src/test/java/org/apache/cassandra/sidecar/cdc/CdcConfigImplTest.java:
##########
@@ -150,103 +137,11 @@ void testConfigsWhenConfigsAreNotEmpty()
         assertThat(cdcConfig.persistDelay()).isEqualTo(new 
MillisecondBoundConfiguration(5, TimeUnit.SECONDS));
     }
 
-    @Test
-    void testConfigChanged() throws Exception

Review Comment:
   Why did you remove this test? I think this is valuable to have a test for 
checking callback and config updating when config is changed.



##########
server/src/main/java/org/apache/cassandra/sidecar/db/TableHistoryDatabaseAccessor.java:
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.sidecar.db;
+
+import java.nio.charset.StandardCharsets;
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Row;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import org.apache.cassandra.sidecar.common.server.CQLSessionProvider;
+import org.apache.cassandra.sidecar.db.schema.SidecarSchema;
+import org.apache.cassandra.sidecar.db.schema.TableHistorySchema;
+
+/**
+ * Database accessor for Table History operations.
+ */
+@SuppressWarnings("resource")
+@Singleton
+public class TableHistoryDatabaseAccessor extends 
DatabaseAccessor<TableHistorySchema>
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(TableHistoryDatabaseAccessor.class);
+
+    @Inject
+    public TableHistoryDatabaseAccessor(SidecarSchema sidecarSchema,
+                                        CQLSessionProvider sessionProvider)
+    {
+        super(sidecarSchema.tableSchema(TableHistorySchema.class), 
sessionProvider);
+    }
+
+    public ResultSetFuture insertTableSchemaHistory(String keyspace, String 
tableName, String schema)
+    {
+        UUID schemaUuid = 
UUID.nameUUIDFromBytes(schema.getBytes(StandardCharsets.UTF_8));
+        return session().executeAsync(tableSchema
+                                      .insertTableSchema()
+                                      .bind(keyspace, tableName, schemaUuid, 
schema));
+    }
+
+    public String tableSchemaFromVersion(String keyspace, String tableName, 
String version)

Review Comment:
   Unused method.



##########
server/src/main/java/org/apache/cassandra/sidecar/modules/CdcModule.java:
##########
@@ -269,4 +326,156 @@ public SidecarCdcStats sidecarCdcStats()
         {
         };
     }
+
+    @Provides
+    @Singleton
+    public Serializer<CdcEvent> getSerializer(CachingSchemaStore schemaStore,
+                                              InstanceMetadataFetcher 
instanceMetadataFetcher,
+                                              CassandraBridgeFactory 
cassandraBridgeFactory)
+    {
+        return new CdcAvroSerializer(schemaStore, instanceMetadataFetcher, 
cassandraBridgeFactory);
+    }
+
+    @Provides
+    @Singleton
+    public SchemaSupplier schemaSupplier(InstanceMetadataFetcher 
instanceMetadataFetcher,
+                                         CassandraBridgeFactory 
cassandraBridgeFactory,
+                                         CdcDatabaseAccessor 
cdcDatabaseAccessor)
+    {
+        return new CdcSchemaSupplier(instanceMetadataFetcher, 
cassandraBridgeFactory, cdcDatabaseAccessor);
+    }
+
+    @Provides
+    @Singleton
+    public ClusterConfigProvider clusterConfigProvider(InstanceMetadataFetcher 
instanceMetadataFetcher, CassandraBridgeFactory cassandraBridgeFactory)
+    {
+        return new ClusterConfigProvider()
+        {
+            public String dc()
+            {
+                NodeSettings nodeSettings = 
instanceMetadataFetcher.callOnFirstAvailableInstance(instance-> 
instance.delegate().nodeSettings());
+                return nodeSettings.datacenter();
+            }
+
+            public Set<CassandraInstance> getCluster()
+            {
+                Set<Host> hosts = 
instanceMetadataFetcher.callOnFirstAvailableInstance(instance ->
+                                                                               
        instance.delegate().metadata().getAllHosts());
+                return hosts.stream()
+                            .filter(host -> host.getListenAddress() != null)
+                            .flatMap(host -> host.getTokens().stream()
+                                                 .map(token -> new 
CassandraInstance(
+                                                 token.toString(),
+                                                 
host.getEndPoint().resolve().getHostName(),
+                                                 host.getDatacenter()
+                                                 ))
+                            ).collect(Collectors.toSet());
+            }
+
+            public Partitioner partitioner()
+            {
+                NodeSettings nodeSettings = 
instanceMetadataFetcher.callOnFirstAvailableInstance(instance-> 
instance.delegate().nodeSettings());
+                String[] parts = nodeSettings.partitioner().split("\\.");
+                return Partitioner.valueOf(parts[parts.length - 1]);
+            }
+        };

Review Comment:
   How about moving this to a separate class for better readability?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to