bbotella commented on code in PR #193:
URL: https://github.com/apache/cassandra-sidecar/pull/193#discussion_r1955322865


##########
client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java:
##########
@@ -544,6 +550,20 @@ public void streamCdcSegments(SidecarInstance 
sidecarInstance,
                 .build(), streamConsumer);
     }
 
+    public CompletableFuture<GetServicesConfigPayload> getServiceConfig()

Review Comment:
   Thanks for bringing this in as well



##########
server/src/main/java/org/apache/cassandra/sidecar/cdc/CdcConfig.java:
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.sidecar.cdc;
+
+import java.time.Duration;
+import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * In memory representation of CDC and Kafka configurations from "configs" 
table in sidecar keyspace.
+ */
+public interface CdcConfig
+{
+    Logger LOGGER = LoggerFactory.getLogger(CdcConfig.class);
+
+    String DEFAULT_JOB_ID = "test-job";
+    int DEFAULT_MAX_WATERMARKER_SIZE = 400000;
+
+    CdcConfig STUB = new CdcConfig()
+    {
+    };
+
+    /**
+     * Topic format
+     */
+    enum TopicFormatType
+    {
+        STATIC, KEYSPACE, KEYSPACETABLE, TABLE, MAP
+    }
+
+    default String env()
+    {
+        return "if1";
+    }
+
+    @Nullable
+    default String kafkaTopic()
+    {
+        return null;
+    }
+
+    @NotNull
+    default TopicFormatType topicFormat()
+    {
+        return TopicFormatType.STATIC;
+    }
+
+    default boolean cdcEnabled()
+    {
+        return true;
+    }
+
+    default String jobId()
+    {
+        return DEFAULT_JOB_ID;
+    }
+
+    default Map<String, Object> kafkaConfigs()
+    {
+        return Map.of();
+    }
+
+    default Map<String, Object> cdcConfigs()
+    {
+        return Map.of();
+    }
+
+    default boolean logOnly()
+    {
+        return false;
+    }
+
+    default String dc()
+    {
+        return "DATACENTER1";
+    }
+
+    default Duration watermarkWindow()
+    {
+        return Duration.ofHours(4);
+    }
+
+    /**
+     * @return max Kafka record size in bytes. If value is non-negative then 
the KafkaPublisher will chunk larger records into multiple messages.
+     */
+    default int maxRecordSizeBytes()
+    {
+        return -1;
+    }
+
+    /**
+     * @return "zstd" to enable compression on large blobs, or null or empty 
string if disabled.
+     */
+    @Nullable
+    default String compression()
+    {
+        return null;
+    }
+
+    /**
+     * @return true if Kafka publisher should fail if Kafka client returns 
"record too large" error
+     */
+    default boolean failOnRecordTooLargeError()
+    {
+        return false;
+    }
+
+    /**
+     * @return true if Kafka publisher should fail if Kafka client returns any 
other error.
+     */
+    default boolean failOnKafkaError()
+    {
+        return true;
+    }
+
+    /**
+     * Initialization of tables and loading config takes some time, returns if 
the config
+     * is ready to be loaded or not.
+     *
+     * @return true if config is ready to be read.
+     */
+    default boolean isConfigReady()
+    {
+        return true;
+    }
+
+    default Duration minDelayBetweenMicroBatches()
+    {
+        return Duration.ofMillis(1000);

Review Comment:
   Let's adjust all this configs to use the new `DurationSpec` added by 
@frankgh.
   
   This should be something like:
   ```
   MillisecondBoundConfiguration.parse("1s")
   ```
   
   Same thing for the rest of the time related configs.



##########
server/src/main/java/org/apache/cassandra/sidecar/db/schema/CdcStatesSchema.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.sidecar.db.schema;
+
+import java.util.concurrent.TimeUnit;
+import com.datastax.driver.core.KeyspaceMetadata;
+import com.datastax.driver.core.Metadata;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.Session;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import org.apache.cassandra.sidecar.config.SchemaKeyspaceConfiguration;
+import org.apache.cassandra.sidecar.config.ServiceConfiguration;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * {@link CdcStatesSchema} holds all prepared statements needed for talking to 
Cassandra for various actions
+ * related to storing CDC State.
+ */
+@Singleton
+public class CdcStatesSchema extends TableSchema

Review Comment:
   Should this be part of this PR?



##########
client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java:
##########
@@ -1624,6 +1629,44 @@ public void onError(Throwable throwable)
         assertThat(new String(baos.toByteArray(), 
StandardCharsets.UTF_8)).isEqualTo("Test Content");
     }
 
+    @Test
+    public void testsGetServiceSuccessTests() throws IOException, 
ExecutionException, InterruptedException
+    {
+        List<GetServicesConfigPayload.Service> services = new ArrayList<>();
+        Map<String, String> kafkaConfigs = new HashMap<>();
+        kafkaConfigs.put("k1", "v1");
+        kafkaConfigs.put("k2", "v2");
+        Map<String, String> cdcConfigs = new HashMap<>();
+        cdcConfigs.put("k1", "v1");
+        cdcConfigs.put("k2", "v2");
+        services.add(new GetServicesConfigPayload.Service("kafka", 
kafkaConfigs));
+        services.add(new GetServicesConfigPayload.Service("cdc", cdcConfigs));
+        GetServicesConfigPayload expectedResponse = new 
GetServicesConfigPayload(services);
+
+        MockResponse response = new MockResponse();
+        response.setResponseCode(200);
+        response.setHeader("content-type", "application/json");
+        ObjectMapper mapper = new ObjectMapper();
+        response.setBody(mapper.writeValueAsString(expectedResponse));
+        enqueue(response);
+        
assertThat(client.getServiceConfig().get()).isEqualTo(expectedResponse);
+    }
+
+    @Test
+    public void testsPutCdcServiceSuccessTests() throws IOException, 
ExecutionException, InterruptedException
+    {
+        Map<String, String> payload = new HashMap<>();
+        payload.put("testKey", "testValue");
+        PutCdcServiceConfigPayload putResponse = new 
PutCdcServiceConfigPayload(payload);
+        MockResponse response = new MockResponse();
+        response.setResponseCode(200);
+        response.setHeader("content-type", "application/json");
+        ObjectMapper mapper = new ObjectMapper();
+        response.setBody(mapper.writeValueAsString(putResponse));
+        enqueue(response);
+        assertThat(client.putCdcServiceConfig(ServiceConfig.CDC, 
payload).get()).isEqualTo(putResponse);

Review Comment:
   Maybe we could add a test with an invalid payload (or service) to check that 
we are getting 400s?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org

Reply via email to