yifan-c commented on code in PR #45: URL: https://github.com/apache/cassandra-sidecar/pull/45#discussion_r1194167826
########## client/src/test/java/org/apache/cassandra/sidecar/client/retry/BasicRetryPolicyTest.java: ########## @@ -0,0 +1,376 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.client.retry; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Stream; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; + +import org.apache.cassandra.sidecar.client.HttpResponse; +import org.apache.cassandra.sidecar.client.exception.ResourceNotFoundException; +import org.apache.cassandra.sidecar.client.exception.RetriesExhaustedException; +import org.apache.cassandra.sidecar.client.exception.UnexpectedStatusCodeException; +import org.apache.cassandra.sidecar.client.request.Request; + +import static io.netty.handler.codec.http.HttpResponseStatus.ACCEPTED; +import static io.netty.handler.codec.http.HttpResponseStatus.BAD_GATEWAY; +import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST; +import static io.netty.handler.codec.http.HttpResponseStatus.CONFLICT; +import static io.netty.handler.codec.http.HttpResponseStatus.EXPECTATION_FAILED; +import static io.netty.handler.codec.http.HttpResponseStatus.FAILED_DEPENDENCY; +import static io.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN; +import static io.netty.handler.codec.http.HttpResponseStatus.GATEWAY_TIMEOUT; +import static io.netty.handler.codec.http.HttpResponseStatus.GONE; +import static io.netty.handler.codec.http.HttpResponseStatus.HTTP_VERSION_NOT_SUPPORTED; +import static io.netty.handler.codec.http.HttpResponseStatus.INSUFFICIENT_STORAGE; +import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR; +import static io.netty.handler.codec.http.HttpResponseStatus.LENGTH_REQUIRED; +import static io.netty.handler.codec.http.HttpResponseStatus.LOCKED; +import static io.netty.handler.codec.http.HttpResponseStatus.METHOD_NOT_ALLOWED; +import static io.netty.handler.codec.http.HttpResponseStatus.MISDIRECTED_REQUEST; +import static io.netty.handler.codec.http.HttpResponseStatus.NETWORK_AUTHENTICATION_REQUIRED; +import static io.netty.handler.codec.http.HttpResponseStatus.NOT_ACCEPTABLE; +import static io.netty.handler.codec.http.HttpResponseStatus.NOT_EXTENDED; +import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND; +import static io.netty.handler.codec.http.HttpResponseStatus.NOT_IMPLEMENTED; +import static io.netty.handler.codec.http.HttpResponseStatus.OK; +import static io.netty.handler.codec.http.HttpResponseStatus.PAYMENT_REQUIRED; +import static io.netty.handler.codec.http.HttpResponseStatus.PRECONDITION_FAILED; +import static io.netty.handler.codec.http.HttpResponseStatus.PRECONDITION_REQUIRED; +import static io.netty.handler.codec.http.HttpResponseStatus.PROXY_AUTHENTICATION_REQUIRED; +import static io.netty.handler.codec.http.HttpResponseStatus.REQUESTED_RANGE_NOT_SATISFIABLE; +import static io.netty.handler.codec.http.HttpResponseStatus.REQUEST_HEADER_FIELDS_TOO_LARGE; +import static io.netty.handler.codec.http.HttpResponseStatus.REQUEST_TIMEOUT; +import static io.netty.handler.codec.http.HttpResponseStatus.REQUEST_URI_TOO_LONG; +import static io.netty.handler.codec.http.HttpResponseStatus.SERVICE_UNAVAILABLE; +import static io.netty.handler.codec.http.HttpResponseStatus.TOO_MANY_REQUESTS; +import static io.netty.handler.codec.http.HttpResponseStatus.UNAUTHORIZED; +import static io.netty.handler.codec.http.HttpResponseStatus.UNPROCESSABLE_ENTITY; +import static io.netty.handler.codec.http.HttpResponseStatus.UNSUPPORTED_MEDIA_TYPE; +import static io.netty.handler.codec.http.HttpResponseStatus.UPGRADE_REQUIRED; +import static io.netty.handler.codec.http.HttpResponseStatus.VARIANT_ALSO_NEGOTIATES; +import static io.netty.handler.codec.rtsp.RtspResponseStatuses.REQUEST_ENTITY_TOO_LARGE; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; +import static org.assertj.core.api.Fail.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Unit tests for the {@link BasicRetryPolicy} + */ +class BasicRetryPolicyTest +{ + HttpResponse mockResponse; + Request mockRequest; + RetryPolicy defaultBasicRetryPolicy; + Map<String, List<String>> headersMap; + + @BeforeEach + void setup() + { + defaultBasicRetryPolicy = new BasicRetryPolicy(); + headersMap = new HashMap<>(); + mockResponse = mock(HttpResponse.class); + mockRequest = mock(Request.class); + when(mockResponse.headers()).thenReturn(headersMap); + when(mockRequest.requestURI()).thenReturn("/api/uri"); + } + + @ParameterizedTest(name = "{index} => canRetryOnADifferentHost={0}") + @ValueSource(booleans = { true, false }) + void testRetriesWhenThrowableIsProvided(boolean canRetryOnADifferentHost) + { + IllegalArgumentException throwable = new IllegalArgumentException("Connection Refused"); + testWithRetries(mockRequest, null, throwable, 3, 100, canRetryOnADifferentHost); + } + + @Test + void testRetriesOnInternalServerErrorStatusCode() + { + when(mockResponse.statusCode()).thenReturn(INTERNAL_SERVER_ERROR.code()); + testWithRetries(mockRequest, mockResponse, null, 5, 200, false); + testWithRetries(mockRequest, mockResponse, null, 5, 200, true); + } + + @Test + void testCompletesWithOKStatusCode() throws ExecutionException, InterruptedException + { + when(mockResponse.statusCode()).thenReturn(OK.code()); + + CompletableFuture<HttpResponse> future = new CompletableFuture<>(); + defaultBasicRetryPolicy.onResponse(future, mockRequest, mockResponse, null, 1, false, + (attempts, retryDelayMillis) -> fail("Should never retry")); + future.join(); + assertThat(future.isDone()).isTrue(); + assertThat(future.get()).isSameAs(mockResponse); + } + + @ParameterizedTest(name = "{index} => canRetryOnADifferentHost={0}") + @ValueSource(booleans = { true, false }) + void testCompletesWithNotFoundStatusCode(boolean canRetryOnADifferentHost) Review Comment: 👍 ########## client/src/test/java/org/apache/cassandra/sidecar/client/retry/RunnableOnStatusCodeRetryPolicyTest.java: ########## @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.client.retry; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.netty.handler.codec.http.HttpResponseStatus; +import org.apache.cassandra.sidecar.client.HttpResponse; +import org.apache.cassandra.sidecar.client.request.Request; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Fail.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Unit tests for the {@link RunnableOnStatusCodeRetryPolicy} class + */ +class RunnableOnStatusCodeRetryPolicyTest +{ + private static final Logger LOGGER = LoggerFactory.getLogger(RunnableOnStatusCodeRetryPolicyTest.class); Review Comment: not used. ########## client/src/test/java/org/apache/cassandra/sidecar/client/selection/OrderedInstanceSelectionPolicyTest.java: ########## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.client.selection; + +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import org.apache.cassandra.sidecar.client.SidecarInstance; +import org.apache.cassandra.sidecar.client.SidecarInstancesProvider; +import org.apache.cassandra.sidecar.client.SimpleSidecarInstancesProvider; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; +import static org.mockito.Mockito.mock; + +/** + * Unit tests for the {@link OrderedInstanceSelectionPolicy} + */ +class OrderedInstanceSelectionPolicyTest +{ + List<SidecarInstance> mockInstanceList; + SidecarInstance mockInstance1; + SidecarInstance mockInstance2; + SidecarInstance mockInstance3; + SidecarInstance mockInstance4; + + @BeforeEach + void setup() + { + mockInstance1 = mock(SidecarInstance.class); + mockInstance2 = mock(SidecarInstance.class); + mockInstance3 = mock(SidecarInstance.class); + mockInstance4 = mock(SidecarInstance.class); + mockInstanceList = Arrays.asList(mockInstance1, mockInstance2, mockInstance3, mockInstance4); + } + + @Test + public void testIterator() + { + + SidecarInstancesProvider provider = new SimpleSidecarInstancesProvider(mockInstanceList); + InstanceSelectionPolicy instanceSelectionPolicy = new OrderedInstanceSelectionPolicy(provider); + Iterator<SidecarInstance> iterator = instanceSelectionPolicy.iterator(); + + assertThat(iterator.hasNext()).isTrue(); + assertThat(iterator.hasNext()).isTrue().as("Expected to be true"); Review Comment: nit: duplication ########## client/src/test/java/org/apache/cassandra/sidecar/client/selection/RandomInstanceSelectionPolicyTest.java: ########## @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.client.selection; + +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import org.apache.cassandra.sidecar.client.SidecarInstance; +import org.apache.cassandra.sidecar.client.SidecarInstancesProvider; +import org.apache.cassandra.sidecar.client.SimpleSidecarInstancesProvider; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; +import static org.mockito.Mockito.mock; + +/** + * Unit tests for the {@link RandomInstanceSelectionPolicy} + */ +class RandomInstanceSelectionPolicyTest +{ + List<SidecarInstance> mockInstanceList; + + @BeforeEach + void setup() + { + mockInstanceList = Arrays.asList(mock(SidecarInstance.class), mock(SidecarInstance.class), + mock(SidecarInstance.class), mock(SidecarInstance.class) + ); + } + + @Test + public void testIterator() + { + SidecarInstancesProvider provider = new SimpleSidecarInstancesProvider(mockInstanceList); + InstanceSelectionPolicy instanceSelectionPolicy = new RandomInstanceSelectionPolicy(provider); + Iterator<SidecarInstance> iterator = instanceSelectionPolicy.iterator(); + + assertThat(iterator.hasNext()).isTrue(); + assertThat(iterator.hasNext()).isTrue().as("Expected to be true"); Review Comment: nit: duplication ########## client/src/testFixtures/java/org/apache/cassandra/sidecar/client/request/RingRequestForKeyspaceTestParameters.java: ########## @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.client.request; + +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +import org.apache.cassandra.sidecar.client.RequestContext; +import org.apache.cassandra.sidecar.common.ApiEndpointsV1; +import org.apache.cassandra.sidecar.common.data.RingEntry; +import org.apache.cassandra.sidecar.common.data.RingResponse; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Unit tests for client requests accessing the ring endpoint for a given keyspace + */ +public class RingRequestForKeyspaceTestParameters implements RequestTestParameters<RingResponse> +{ + public static final String KEYSPACE = "tutorialspoint"; Review Comment: hmm... it seems to be the name of a website? ########## common/src/main/java/org/apache/cassandra/sidecar/common/data/GossipInfoResponse.java: ########## @@ -0,0 +1,292 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.common.data; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; + +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +import static org.apache.cassandra.sidecar.common.data.GossipInfoResponse.GossipField.DC; +import static org.apache.cassandra.sidecar.common.data.GossipInfoResponse.GossipField.DISK_USAGE; +import static org.apache.cassandra.sidecar.common.data.GossipInfoResponse.GossipField.GENERATION; +import static org.apache.cassandra.sidecar.common.data.GossipInfoResponse.GossipField.HEARTBEAT; +import static org.apache.cassandra.sidecar.common.data.GossipInfoResponse.GossipField.HOST_ID; +import static org.apache.cassandra.sidecar.common.data.GossipInfoResponse.GossipField.INTERNAL_ADDRESS_AND_PORT; +import static org.apache.cassandra.sidecar.common.data.GossipInfoResponse.GossipField.INTERNAL_IP; +import static org.apache.cassandra.sidecar.common.data.GossipInfoResponse.GossipField.LOAD; +import static org.apache.cassandra.sidecar.common.data.GossipInfoResponse.GossipField.NATIVE_ADDRESS_AND_PORT; +import static org.apache.cassandra.sidecar.common.data.GossipInfoResponse.GossipField.NET_VERSION; +import static org.apache.cassandra.sidecar.common.data.GossipInfoResponse.GossipField.RACK; +import static org.apache.cassandra.sidecar.common.data.GossipInfoResponse.GossipField.RELEASE_VERSION; +import static org.apache.cassandra.sidecar.common.data.GossipInfoResponse.GossipField.REMOVAL_COORDINATOR; +import static org.apache.cassandra.sidecar.common.data.GossipInfoResponse.GossipField.RPC_ADDRESS; +import static org.apache.cassandra.sidecar.common.data.GossipInfoResponse.GossipField.RPC_READY; +import static org.apache.cassandra.sidecar.common.data.GossipInfoResponse.GossipField.SCHEMA; +import static org.apache.cassandra.sidecar.common.data.GossipInfoResponse.GossipField.SEVERITY; +import static org.apache.cassandra.sidecar.common.data.GossipInfoResponse.GossipField.SSTABLE_VERSIONS; +import static org.apache.cassandra.sidecar.common.data.GossipInfoResponse.GossipField.STATUS; +import static org.apache.cassandra.sidecar.common.data.GossipInfoResponse.GossipField.STATUS_WITH_PORT; +import static org.apache.cassandra.sidecar.common.data.GossipInfoResponse.GossipField.TOKENS; +import static org.apache.cassandra.sidecar.common.data.GossipInfoResponse.GossipField.read; + +/** + * A class representing the response of the cassandra gossip endpoint + */ +public class GossipInfoResponse extends HashMap<String, GossipInfoResponse.GossipInfo> +{ + /** + * Data accessor for reading Gossip states + */ + public static class GossipInfo extends HashMap<String, String> + { + /** + * Converts the key, if it is using the UPPER UNDERSCORE format, into camel case. + * Then, put the new key and value. + * + * @param snakeCaseKey the key to convert + * @param value the value for the gossip info entry + */ + public void camelizeKeyAndPut(String snakeCaseKey, String value) + { + String key = toLowerCamelCase(snakeCaseKey.toUpperCase()); + super.put(key, value); + } + + @NotNull + public String generation() + { + return read(this, GENERATION); + } + + @NotNull + public String heartbeat() + { + return read(this, HEARTBEAT); + } + + @Nullable + public String status() + { + return read(this, STATUS); + } + + @Nullable + public String load() + { + return read(this, LOAD); + } + + /** + * @return schema version, uuid string + */ + @Nullable + public String schema() + { + return read(this, SCHEMA); + } + + /** + * @return datacenter name + */ + @Nullable + public String datacenter() + { + return read(this, DC); + } + + /** + * @return rack name + */ + @Nullable + public String rack() + { + return read(this, RACK); + } + + @Nullable + public String releaseVersion() + { + return read(this, RELEASE_VERSION); + } + + @Nullable + public String removalCoordinator() + { + return read(this, REMOVAL_COORDINATOR); + } + + @Nullable + public String internalIp() + { + return read(this, INTERNAL_IP); + } + + @Nullable + public String rpcAddress() + { + return read(this, RPC_ADDRESS); + } + + @Nullable + public String severity() + { + return read(this, SEVERITY); + } + + @Nullable + public String netVersion() + { + return read(this, NET_VERSION); + } + + @Nullable + public String hostId() + { + return read(this, HOST_ID); + } + + @Nullable + public String tokens() + { + return read(this, TOKENS); + } + + @Nullable + public Boolean rpcReady() + { + String value = read(this, RPC_READY); + return value == null ? null : Boolean.valueOf(value); + } + + @Nullable + public String internalAddressAndPort() + { + return read(this, INTERNAL_ADDRESS_AND_PORT); + } + + @Nullable + public String nativeAddressAndPort() + { + return read(this, NATIVE_ADDRESS_AND_PORT); + } + + @Nullable + public String statusWithPort() + { + return read(this, STATUS_WITH_PORT); + } + + @Nullable + public List<String> sstableVersions() + { + String value = read(this, SSTABLE_VERSIONS); + return value == null ? null : Arrays.asList(value.split(",")); + } + + @Nullable + public String diskUsage() + { + return read(this, DISK_USAGE); + } + } + + /** + * Declares all fields that gossip info can possibly contain. + * + * <p>Note: When adding a new field, make sure there is a pairing access method defined in {@link GossipInfo}. + */ + protected enum GossipField + { + GENERATION, + HEARTBEAT, + + // Below are copied from org.apache.cassandra.gms.ApplicationState + // Note: that all padding fields are not included. + @Deprecated STATUS, //Deprecated and unsued in 4.0, stop publishing in 5.0, reclaim in 6.0 Review Comment: ```suggestion @Deprecated STATUS, //Deprecated and unused in 4.0, stop publishing in 5.0, reclaim in 6.0 ``` ########## client/src/test/java/org/apache/cassandra/sidecar/client/retry/ExponentialBackoffRetryPolicyTest.java: ########## @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.client.retry; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicLong; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import org.apache.cassandra.sidecar.client.HttpResponse; +import org.apache.cassandra.sidecar.client.exception.RetriesExhaustedException; +import org.apache.cassandra.sidecar.client.request.Request; + +import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR; +import static io.netty.handler.codec.http.HttpResponseStatus.OK; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; +import static org.assertj.core.api.Fail.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +class ExponentialBackoffRetryPolicyTest +{ + Request mockRequest; + HttpResponse mockResponse; + + @BeforeEach + public void setup() + { + mockRequest = mock(Request.class); + mockResponse = mock(HttpResponse.class); + when(mockRequest.requestURI()).thenReturn("/api/uri"); + } + + @Test + void testOverflows() + { + long delay = new ExponentialBackoffRetryPolicy(1, 1, 0) + .retryDelayMillis(66); // 2 ^ 63 + 1 overflows Review Comment: It should be `64` instead of `66`. `Long.MAX_VALUE == 2 ^ 63`. So to trigger the current overflow check (which is not complete), the input attempts should be 63 + 1 = 64. The reason why the current check is not complete can be demoed in this test, which fails. ```java delay = new ExponentialBackoffRetryPolicy(1, 8, 0) .retryDelayMillis(63); // 2 ^ 62 does not overflow, but 2 ^ 62 * 8 overflows. It should cap the value at MAX_VALUE. assertThat(delay).isEqualTo(Long.MAX_VALUE); ``` ########## client/src/test/java/org/apache/cassandra/sidecar/client/selection/SingleInstanceSelectionPolicyTest.java: ########## @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.client.selection; + +import java.util.Iterator; +import java.util.NoSuchElementException; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import org.apache.cassandra.sidecar.client.SidecarInstance; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; +import static org.mockito.Mockito.mock; + +class SingleInstanceSelectionPolicyTest +{ + SidecarInstance mockSidecarInstance; + + @BeforeEach + void setup() + { + mockSidecarInstance = mock(SidecarInstance.class); + } + + @Test + void testIterator() + { + InstanceSelectionPolicy instanceSelectionPolicy = new SingleInstanceSelectionPolicy(mockSidecarInstance); + Iterator<SidecarInstance> iterator = instanceSelectionPolicy.iterator(); + + assertThat(iterator.hasNext()).isTrue(); + assertThat(iterator.hasNext()).isTrue().as("Expected to be true"); Review Comment: nit: duplication ########## client/src/testFixtures/java/org/apache/cassandra/sidecar/client/request/GossipInfoRequestTestParameters.java: ########## @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.client.request; + +import org.apache.cassandra.sidecar.client.RequestContext; +import org.apache.cassandra.sidecar.common.ApiEndpointsV1; +import org.apache.cassandra.sidecar.common.data.GossipInfoResponse; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Unit tests for client requests accessing the gossip info endpoint + */ +public class GossipInfoRequestTestParameters implements RequestTestParameters<GossipInfoResponse> +{ + @Override + public RequestContext.Builder specificRequest(RequestContext.Builder requestContextBuilder) + { + return requestContextBuilder.gossipInfoRequest(); + } + + @Override + public String okResponseBody() + { + return "{\"/127.0.0.3:7000\":{\"generation\":\"1679961466\"," + + "\"schema\":\"4994b214-7a05-35ea-a11c-d092a554dd5d\",\"rack\":\"101000301\"," + + "\"heartbeat\":\"2114\",\"releaseVersion\":\"4.0.8\"," + + "\"hostId\":\"be19c254-becb-40b9-8951-30c589c7028e\",\"nativeAddressAndPort\":\"127.0.0.3:9042\"," + + "\"load\":\"89083.0\",\"sstableVersions\":\"big-nb\",\"tokens\":\"<hidden>\",\"rpcReady\":\"true\"," + + "\"dc\":\"LO\",\"netVersion\":\"12\",\"statusWithPort\":\"NORMAL,3074457345618258602\"}," + + "\"/127.0.0.1:7000\":{\"generation\":\"1679961466\"," + + "\"schema\":\"4994b214-7a05-35ea-a11c-d092a554dd5d\",\"rack\":\"101000101\",\"heartbeat\":\"2108\"," + + "\"releaseVersion\":\"4.0.8\",\"rpcAddress\":\"172.17.0.2\"," + + "\"hostId\":\"33cae238-8203-41c1-880f-8cdf98ee6720\",\"nativeAddressAndPort\":\"172.17.0.2:9042\"," + + "\"load\":\"89082.0\",\"sstableVersions\":\"big-nb\",\"tokens\":\"<hidden>\",\"rpcReady\":\"true\"," + + "\"status\":\"NORMAL,-9223372036854775808\",\"dc\":\"LO\",\"netVersion\":\"12\",\"statusWithPort\":" + + "\"NORMAL,-9223372036854775808\"},\"/127.0.0.2:7000\":{\"generation\":\"1679961465\"," + + "\"schema\":\"4994b214-7a05-35ea-a11c-d092a554dd5d\",\"rack\":\"101000201\",\"heartbeat\":\"2108\"," + + "\"releaseVersion\":\"4.0.8\",\"hostId\":\"dba02656-ea8c-4a1d-8011-cbc0dab5f411\"," + + "\"nativeAddressAndPort\":\"127.0.0.2:9042\",\"load\":\"89084.0\",\"sstableVersions\":\"big-nb\"," + + "\"tokens\":\"<hidden>\",\"rpcReady\":\"true\",\"dc\":\"LO\",\"netVersion\":\"12\"," + + "\"statusWithPort\":\"NORMAL,-3074457345618258603\"}}[root@b77ac3582e25 /]"; Review Comment: `[root@b77ac3582e25 /]`?? Btw, the nit on preparing the response body is to build a map and serialize into json. This way, the code reads more structured, instead of a large block of string. Feel free to ignore the nit though. ########## common/src/main/java/org/apache/cassandra/sidecar/common/CassandraAdapterDelegate.java: ########## @@ -145,47 +124,73 @@ private void healthCheckInternal() Row oneResult = activeSession.execute("select release_version, partitioner from system.local") .one(); - String releaseVersion = oneResult.getString("release_version"); - // update the nodeSettings cache. // Note that within the scope of this method, we should keep on using the local releaseVersion - nodeSettings = new NodeSettings(releaseVersion, oneResult.getString("partitioner")); - // this might swap the adapter out - SimpleCassandraVersion newVersion = SimpleCassandraVersion.create(releaseVersion); - if (!newVersion.equals(currentVersion)) + String releaseVersion = oneResult.getString("release_version"); + NodeSettings newNodeSettings = new NodeSettings(releaseVersion, oneResult.getString("partitioner")); + if (!newNodeSettings.equals(nodeSettings)) { - currentVersion = newVersion; - adapter = versionProvider.getCassandra(nodeSettings.releaseVersion()).create(cqlSession, jmxClient); - logger.info("Cassandra version change detected. New adapter loaded: {}", adapter); + // update the nodeSettings cache. + SimpleCassandraVersion previousVersion = currentVersion; + currentVersion = SimpleCassandraVersion.create(releaseVersion); + adapter = versionProvider.cassandra(releaseVersion) + .create(cqlSessionProvider, jmxClient); + nodeSettings = newNodeSettings; Review Comment: There is a potential race that set `nodeSettings` to an unexpected value. But since it runs periodically, it should be corrected in the following runs. So.. I think it is acceptable. ########## client/src/test/java/org/apache/cassandra/sidecar/client/retry/BasicRetryPolicyTest.java: ########## @@ -0,0 +1,376 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.client.retry; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Stream; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; + +import org.apache.cassandra.sidecar.client.HttpResponse; +import org.apache.cassandra.sidecar.client.exception.ResourceNotFoundException; +import org.apache.cassandra.sidecar.client.exception.RetriesExhaustedException; +import org.apache.cassandra.sidecar.client.exception.UnexpectedStatusCodeException; +import org.apache.cassandra.sidecar.client.request.Request; + +import static io.netty.handler.codec.http.HttpResponseStatus.ACCEPTED; +import static io.netty.handler.codec.http.HttpResponseStatus.BAD_GATEWAY; +import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST; +import static io.netty.handler.codec.http.HttpResponseStatus.CONFLICT; +import static io.netty.handler.codec.http.HttpResponseStatus.EXPECTATION_FAILED; +import static io.netty.handler.codec.http.HttpResponseStatus.FAILED_DEPENDENCY; +import static io.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN; +import static io.netty.handler.codec.http.HttpResponseStatus.GATEWAY_TIMEOUT; +import static io.netty.handler.codec.http.HttpResponseStatus.GONE; +import static io.netty.handler.codec.http.HttpResponseStatus.HTTP_VERSION_NOT_SUPPORTED; +import static io.netty.handler.codec.http.HttpResponseStatus.INSUFFICIENT_STORAGE; +import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR; +import static io.netty.handler.codec.http.HttpResponseStatus.LENGTH_REQUIRED; +import static io.netty.handler.codec.http.HttpResponseStatus.LOCKED; +import static io.netty.handler.codec.http.HttpResponseStatus.METHOD_NOT_ALLOWED; +import static io.netty.handler.codec.http.HttpResponseStatus.MISDIRECTED_REQUEST; +import static io.netty.handler.codec.http.HttpResponseStatus.NETWORK_AUTHENTICATION_REQUIRED; +import static io.netty.handler.codec.http.HttpResponseStatus.NOT_ACCEPTABLE; +import static io.netty.handler.codec.http.HttpResponseStatus.NOT_EXTENDED; +import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND; +import static io.netty.handler.codec.http.HttpResponseStatus.NOT_IMPLEMENTED; +import static io.netty.handler.codec.http.HttpResponseStatus.OK; +import static io.netty.handler.codec.http.HttpResponseStatus.PAYMENT_REQUIRED; +import static io.netty.handler.codec.http.HttpResponseStatus.PRECONDITION_FAILED; +import static io.netty.handler.codec.http.HttpResponseStatus.PRECONDITION_REQUIRED; +import static io.netty.handler.codec.http.HttpResponseStatus.PROXY_AUTHENTICATION_REQUIRED; +import static io.netty.handler.codec.http.HttpResponseStatus.REQUESTED_RANGE_NOT_SATISFIABLE; +import static io.netty.handler.codec.http.HttpResponseStatus.REQUEST_HEADER_FIELDS_TOO_LARGE; +import static io.netty.handler.codec.http.HttpResponseStatus.REQUEST_TIMEOUT; +import static io.netty.handler.codec.http.HttpResponseStatus.REQUEST_URI_TOO_LONG; +import static io.netty.handler.codec.http.HttpResponseStatus.SERVICE_UNAVAILABLE; +import static io.netty.handler.codec.http.HttpResponseStatus.TOO_MANY_REQUESTS; +import static io.netty.handler.codec.http.HttpResponseStatus.UNAUTHORIZED; +import static io.netty.handler.codec.http.HttpResponseStatus.UNPROCESSABLE_ENTITY; +import static io.netty.handler.codec.http.HttpResponseStatus.UNSUPPORTED_MEDIA_TYPE; +import static io.netty.handler.codec.http.HttpResponseStatus.UPGRADE_REQUIRED; +import static io.netty.handler.codec.http.HttpResponseStatus.VARIANT_ALSO_NEGOTIATES; +import static io.netty.handler.codec.rtsp.RtspResponseStatuses.REQUEST_ENTITY_TOO_LARGE; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; +import static org.assertj.core.api.Fail.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Unit tests for the {@link BasicRetryPolicy} + */ +class BasicRetryPolicyTest +{ + HttpResponse mockResponse; + Request mockRequest; + RetryPolicy defaultBasicRetryPolicy; + Map<String, List<String>> headersMap; + + @BeforeEach + void setup() + { + defaultBasicRetryPolicy = new BasicRetryPolicy(); + headersMap = new HashMap<>(); + mockResponse = mock(HttpResponse.class); + mockRequest = mock(Request.class); + when(mockResponse.headers()).thenReturn(headersMap); + when(mockRequest.requestURI()).thenReturn("/api/uri"); + } + + @ParameterizedTest(name = "{index} => canRetryOnADifferentHost={0}") + @ValueSource(booleans = { true, false }) + void testRetriesWhenThrowableIsProvided(boolean canRetryOnADifferentHost) + { + IllegalArgumentException throwable = new IllegalArgumentException("Connection Refused"); + testWithRetries(mockRequest, null, throwable, 3, 100, canRetryOnADifferentHost); + } + + @Test + void testRetriesOnInternalServerErrorStatusCode() + { + when(mockResponse.statusCode()).thenReturn(INTERNAL_SERVER_ERROR.code()); + testWithRetries(mockRequest, mockResponse, null, 5, 200, false); + testWithRetries(mockRequest, mockResponse, null, 5, 200, true); + } + + @Test + void testCompletesWithOKStatusCode() throws ExecutionException, InterruptedException + { + when(mockResponse.statusCode()).thenReturn(OK.code()); + + CompletableFuture<HttpResponse> future = new CompletableFuture<>(); + defaultBasicRetryPolicy.onResponse(future, mockRequest, mockResponse, null, 1, false, + (attempts, retryDelayMillis) -> fail("Should never retry")); + future.join(); + assertThat(future.isDone()).isTrue(); + assertThat(future.get()).isSameAs(mockResponse); + } + + @ParameterizedTest(name = "{index} => canRetryOnADifferentHost={0}") + @ValueSource(booleans = { true, false }) + void testCompletesWithNotFoundStatusCode(boolean canRetryOnADifferentHost) + throws ExecutionException, InterruptedException + { + when(mockResponse.statusCode()).thenReturn(NOT_FOUND.code()); + + CompletableFuture<HttpResponse> future = new CompletableFuture<>(); + if (canRetryOnADifferentHost) + { + testWithRetries(mockRequest, mockResponse, null, 3, 50, true); + } + else + { + defaultBasicRetryPolicy.onResponse(future, mockRequest, mockResponse, null, 1, canRetryOnADifferentHost, + (attempts, retryDelayMillis) -> fail("Should never retry")); + assertThatExceptionOfType(CompletionException.class) + .isThrownBy(future::join) + .withCauseInstanceOf(ResourceNotFoundException.class); + } + } + + @ParameterizedTest(name = "{index} => canRetryOnADifferentHost={0}") + @ValueSource(booleans = { true, false }) + void testRetriesWithNotImplementedStatusCode(boolean canRetryOnADifferentHost) + { + when(mockResponse.statusCode()).thenReturn(NOT_IMPLEMENTED.code()); + + CompletableFuture<HttpResponse> future = new CompletableFuture<>(); + defaultBasicRetryPolicy.onResponse(future, mockRequest, mockResponse, null, 1, canRetryOnADifferentHost, + (attempts, retryDelayMillis) -> fail("Should never retry")); + + assertThatExceptionOfType(CompletionException.class) + .isThrownBy(future::join) + .withCauseInstanceOf(UnsupportedOperationException.class); + assertThat(future.isCompletedExceptionally()).isTrue(); + } + + @Test + void testRetriesWithNotAcceptableStatusCode() + { + when(mockResponse.statusCode()).thenReturn(NOT_ACCEPTABLE.code()); + testWithRetries(mockRequest, mockResponse, null, 8, 50, true); + } + + @ParameterizedTest(name = "{index} => canRetryOnADifferentHost={0}") + @ValueSource(booleans = { true, false }) + void testRetriesWithServiceUnavailableStatusCode(boolean canRetryOnADifferentHost) + { + when(mockResponse.statusCode()).thenReturn(SERVICE_UNAVAILABLE.code()); + testWithRetries(mockRequest, mockResponse, null, 5, 300, canRetryOnADifferentHost); + } + + @Test + void testRetriesWithServiceUnavailableStatusCodeWithRetryAfterHeader() + { + when(mockResponse.statusCode()).thenReturn(SERVICE_UNAVAILABLE.code()); + headersMap.put("Retry-After", Collections.singletonList("5")); // 5 seconds -> 5,000 millis + testWithRetries(mockRequest, mockResponse, null, 5, 1000, 5000, false); + testWithRetries(mockRequest, mockResponse, null, 5, 1000, 0, true); + } + + @Test + void testRetriesWithServiceUnavailableStatusCodeWithInvalidRetryAfterHeader() + { + when(mockResponse.statusCode()).thenReturn(SERVICE_UNAVAILABLE.code()); + headersMap.put("Retry-After", Collections.singletonList("one-thousand-seconds")); + testWithRetries(mockRequest, mockResponse, null, 5, 200, 200, false); + testWithRetries(mockRequest, mockResponse, null, 5, 200, 0, true); + } + + @Test + void testRetriesWithAcceptedStatusCode() throws ExecutionException, InterruptedException + { + when(mockResponse.statusCode()).thenReturn(ACCEPTED.code()); + RetryPolicy retryPolicy = new BasicRetryPolicy(1); + CompletableFuture<HttpResponse> future = new CompletableFuture<>(); + AtomicInteger retryActionCalls = new AtomicInteger(0); + int maxRetries = 50; + for (int currentAttempt = 1; currentAttempt <= maxRetries; currentAttempt++) + { + retryPolicy.onResponse(future, mockRequest, mockResponse, null, currentAttempt, true, + (attempts, retryDelayMillis) -> { + retryActionCalls.incrementAndGet(); + assertThat(attempts).isEqualTo(1); + assertThat(retryDelayMillis).isEqualTo(0); + }); + } + + when(mockResponse.statusCode()).thenReturn(OK.code()); + + retryPolicy.onResponse(future, mockRequest, mockResponse, null, 1, false, + (attempts, retryDelayMillis) -> fail("Should not retry")); + future.join(); + assertThat(future.isDone()).isTrue(); + assertThat(future.get()).isSameAs(mockResponse); + assertThat(retryActionCalls.get()).isEqualTo(maxRetries); + } + + private static Stream<Arguments> clientStatusCodeArguments() + { + return Stream.of( + Arguments.of(BAD_REQUEST.code()), + Arguments.of(UNAUTHORIZED.code()), + Arguments.of(PAYMENT_REQUIRED.code()), + Arguments.of(FORBIDDEN.code()), + Arguments.of(METHOD_NOT_ALLOWED.code()), + Arguments.of(NOT_ACCEPTABLE.code()), + Arguments.of(PROXY_AUTHENTICATION_REQUIRED.code()), + Arguments.of(REQUEST_TIMEOUT.code()), + Arguments.of(CONFLICT.code()), + Arguments.of(GONE.code()), + Arguments.of(LENGTH_REQUIRED.code()), + Arguments.of(PRECONDITION_FAILED.code()), + Arguments.of(REQUEST_ENTITY_TOO_LARGE.code()), + Arguments.of(REQUEST_URI_TOO_LONG.code()), + Arguments.of(UNSUPPORTED_MEDIA_TYPE.code()), + Arguments.of(REQUESTED_RANGE_NOT_SATISFIABLE.code()), + Arguments.of(EXPECTATION_FAILED.code()), + Arguments.of(MISDIRECTED_REQUEST.code()), + Arguments.of(UNPROCESSABLE_ENTITY.code()), + Arguments.of(LOCKED.code()), + Arguments.of(FAILED_DEPENDENCY.code()), + Arguments.of(UPGRADE_REQUIRED.code()), + Arguments.of(PRECONDITION_REQUIRED.code()), + Arguments.of(TOO_MANY_REQUESTS.code()), + Arguments.of(REQUEST_HEADER_FIELDS_TOO_LARGE.code()) + ); + } + + @ParameterizedTest(name = "{index} => statusCode={0}") + @MethodSource("clientStatusCodeArguments") + void testRetriesWithClientError(int statusCode) + { + when(mockResponse.statusCode()).thenReturn(statusCode); + testWithRetries(mockRequest, mockResponse, null, 2, 200, 200, false, false); + testWithRetries(mockRequest, mockResponse, null, 2, 200, 0, true, false); + } + + private static Stream<Arguments> serverStatusCodeArguments() Review Comment: same for serve error, `5xx` ########## common/src/main/java/org/apache/cassandra/sidecar/common/data/GossipInfoResponse.java: ########## @@ -0,0 +1,292 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.common.data; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; + +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +import static org.apache.cassandra.sidecar.common.data.GossipInfoResponse.GossipField.DC; +import static org.apache.cassandra.sidecar.common.data.GossipInfoResponse.GossipField.DISK_USAGE; +import static org.apache.cassandra.sidecar.common.data.GossipInfoResponse.GossipField.GENERATION; +import static org.apache.cassandra.sidecar.common.data.GossipInfoResponse.GossipField.HEARTBEAT; +import static org.apache.cassandra.sidecar.common.data.GossipInfoResponse.GossipField.HOST_ID; +import static org.apache.cassandra.sidecar.common.data.GossipInfoResponse.GossipField.INTERNAL_ADDRESS_AND_PORT; +import static org.apache.cassandra.sidecar.common.data.GossipInfoResponse.GossipField.INTERNAL_IP; +import static org.apache.cassandra.sidecar.common.data.GossipInfoResponse.GossipField.LOAD; +import static org.apache.cassandra.sidecar.common.data.GossipInfoResponse.GossipField.NATIVE_ADDRESS_AND_PORT; +import static org.apache.cassandra.sidecar.common.data.GossipInfoResponse.GossipField.NET_VERSION; +import static org.apache.cassandra.sidecar.common.data.GossipInfoResponse.GossipField.RACK; +import static org.apache.cassandra.sidecar.common.data.GossipInfoResponse.GossipField.RELEASE_VERSION; +import static org.apache.cassandra.sidecar.common.data.GossipInfoResponse.GossipField.REMOVAL_COORDINATOR; +import static org.apache.cassandra.sidecar.common.data.GossipInfoResponse.GossipField.RPC_ADDRESS; +import static org.apache.cassandra.sidecar.common.data.GossipInfoResponse.GossipField.RPC_READY; +import static org.apache.cassandra.sidecar.common.data.GossipInfoResponse.GossipField.SCHEMA; +import static org.apache.cassandra.sidecar.common.data.GossipInfoResponse.GossipField.SEVERITY; +import static org.apache.cassandra.sidecar.common.data.GossipInfoResponse.GossipField.SSTABLE_VERSIONS; +import static org.apache.cassandra.sidecar.common.data.GossipInfoResponse.GossipField.STATUS; +import static org.apache.cassandra.sidecar.common.data.GossipInfoResponse.GossipField.STATUS_WITH_PORT; +import static org.apache.cassandra.sidecar.common.data.GossipInfoResponse.GossipField.TOKENS; +import static org.apache.cassandra.sidecar.common.data.GossipInfoResponse.GossipField.read; + +/** + * A class representing the response of the cassandra gossip endpoint + */ +public class GossipInfoResponse extends HashMap<String, GossipInfoResponse.GossipInfo> +{ + /** + * Data accessor for reading Gossip states + */ + public static class GossipInfo extends HashMap<String, String> + { + /** + * Converts the key, if it is using the UPPER UNDERSCORE format, into camel case. + * Then, put the new key and value. + * + * @param snakeCaseKey the key to convert + * @param value the value for the gossip info entry + */ + public void camelizeKeyAndPut(String snakeCaseKey, String value) + { + String key = toLowerCamelCase(snakeCaseKey.toUpperCase()); + super.put(key, value); + } Review Comment: How do you make sure the input `String snakeCaseKey` is indeed in snake case. I would suggest to convert the value into enum `GossipField` first by looking up using `valueOf()`, and relocate the `toLowerCamelCase()` to the enum, which simplifies the implementation a bit, since validation for null or empty is no longer required. ```java // in protected enum GossipField String toLowerCamelCase() { String upperUnderscoreCase = name(); StringBuilder sb = new StringBuilder(upperUnderscoreCase.length()); sb.append(Character.toLowerCase(upperUnderscoreCase.charAt(0))); int length = upperUnderscoreCase.length(); for (int i = 1; i < length; i++) { if (upperUnderscoreCase.charAt(i) == '_' && i + 1 < length) { i++; sb.append(upperUnderscoreCase.charAt(i)); } else { sb.append(Character.toLowerCase(upperUnderscoreCase.charAt(i))); } } return sb.toString(); } ``` ########## client/src/test/java/org/apache/cassandra/sidecar/client/retry/BasicRetryPolicyTest.java: ########## @@ -0,0 +1,376 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.client.retry; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Stream; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; + +import org.apache.cassandra.sidecar.client.HttpResponse; +import org.apache.cassandra.sidecar.client.exception.ResourceNotFoundException; +import org.apache.cassandra.sidecar.client.exception.RetriesExhaustedException; +import org.apache.cassandra.sidecar.client.exception.UnexpectedStatusCodeException; +import org.apache.cassandra.sidecar.client.request.Request; + +import static io.netty.handler.codec.http.HttpResponseStatus.ACCEPTED; +import static io.netty.handler.codec.http.HttpResponseStatus.BAD_GATEWAY; +import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST; +import static io.netty.handler.codec.http.HttpResponseStatus.CONFLICT; +import static io.netty.handler.codec.http.HttpResponseStatus.EXPECTATION_FAILED; +import static io.netty.handler.codec.http.HttpResponseStatus.FAILED_DEPENDENCY; +import static io.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN; +import static io.netty.handler.codec.http.HttpResponseStatus.GATEWAY_TIMEOUT; +import static io.netty.handler.codec.http.HttpResponseStatus.GONE; +import static io.netty.handler.codec.http.HttpResponseStatus.HTTP_VERSION_NOT_SUPPORTED; +import static io.netty.handler.codec.http.HttpResponseStatus.INSUFFICIENT_STORAGE; +import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR; +import static io.netty.handler.codec.http.HttpResponseStatus.LENGTH_REQUIRED; +import static io.netty.handler.codec.http.HttpResponseStatus.LOCKED; +import static io.netty.handler.codec.http.HttpResponseStatus.METHOD_NOT_ALLOWED; +import static io.netty.handler.codec.http.HttpResponseStatus.MISDIRECTED_REQUEST; +import static io.netty.handler.codec.http.HttpResponseStatus.NETWORK_AUTHENTICATION_REQUIRED; +import static io.netty.handler.codec.http.HttpResponseStatus.NOT_ACCEPTABLE; +import static io.netty.handler.codec.http.HttpResponseStatus.NOT_EXTENDED; +import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND; +import static io.netty.handler.codec.http.HttpResponseStatus.NOT_IMPLEMENTED; +import static io.netty.handler.codec.http.HttpResponseStatus.OK; +import static io.netty.handler.codec.http.HttpResponseStatus.PAYMENT_REQUIRED; +import static io.netty.handler.codec.http.HttpResponseStatus.PRECONDITION_FAILED; +import static io.netty.handler.codec.http.HttpResponseStatus.PRECONDITION_REQUIRED; +import static io.netty.handler.codec.http.HttpResponseStatus.PROXY_AUTHENTICATION_REQUIRED; +import static io.netty.handler.codec.http.HttpResponseStatus.REQUESTED_RANGE_NOT_SATISFIABLE; +import static io.netty.handler.codec.http.HttpResponseStatus.REQUEST_HEADER_FIELDS_TOO_LARGE; +import static io.netty.handler.codec.http.HttpResponseStatus.REQUEST_TIMEOUT; +import static io.netty.handler.codec.http.HttpResponseStatus.REQUEST_URI_TOO_LONG; +import static io.netty.handler.codec.http.HttpResponseStatus.SERVICE_UNAVAILABLE; +import static io.netty.handler.codec.http.HttpResponseStatus.TOO_MANY_REQUESTS; +import static io.netty.handler.codec.http.HttpResponseStatus.UNAUTHORIZED; +import static io.netty.handler.codec.http.HttpResponseStatus.UNPROCESSABLE_ENTITY; +import static io.netty.handler.codec.http.HttpResponseStatus.UNSUPPORTED_MEDIA_TYPE; +import static io.netty.handler.codec.http.HttpResponseStatus.UPGRADE_REQUIRED; +import static io.netty.handler.codec.http.HttpResponseStatus.VARIANT_ALSO_NEGOTIATES; +import static io.netty.handler.codec.rtsp.RtspResponseStatuses.REQUEST_ENTITY_TOO_LARGE; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; +import static org.assertj.core.api.Fail.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Unit tests for the {@link BasicRetryPolicy} + */ +class BasicRetryPolicyTest +{ + HttpResponse mockResponse; + Request mockRequest; + RetryPolicy defaultBasicRetryPolicy; + Map<String, List<String>> headersMap; + + @BeforeEach + void setup() + { + defaultBasicRetryPolicy = new BasicRetryPolicy(); + headersMap = new HashMap<>(); + mockResponse = mock(HttpResponse.class); + mockRequest = mock(Request.class); + when(mockResponse.headers()).thenReturn(headersMap); + when(mockRequest.requestURI()).thenReturn("/api/uri"); + } + + @ParameterizedTest(name = "{index} => canRetryOnADifferentHost={0}") + @ValueSource(booleans = { true, false }) + void testRetriesWhenThrowableIsProvided(boolean canRetryOnADifferentHost) + { + IllegalArgumentException throwable = new IllegalArgumentException("Connection Refused"); + testWithRetries(mockRequest, null, throwable, 3, 100, canRetryOnADifferentHost); + } + + @Test + void testRetriesOnInternalServerErrorStatusCode() + { + when(mockResponse.statusCode()).thenReturn(INTERNAL_SERVER_ERROR.code()); + testWithRetries(mockRequest, mockResponse, null, 5, 200, false); + testWithRetries(mockRequest, mockResponse, null, 5, 200, true); + } + + @Test + void testCompletesWithOKStatusCode() throws ExecutionException, InterruptedException + { + when(mockResponse.statusCode()).thenReturn(OK.code()); + + CompletableFuture<HttpResponse> future = new CompletableFuture<>(); + defaultBasicRetryPolicy.onResponse(future, mockRequest, mockResponse, null, 1, false, + (attempts, retryDelayMillis) -> fail("Should never retry")); + future.join(); + assertThat(future.isDone()).isTrue(); + assertThat(future.get()).isSameAs(mockResponse); + } + + @ParameterizedTest(name = "{index} => canRetryOnADifferentHost={0}") + @ValueSource(booleans = { true, false }) + void testCompletesWithNotFoundStatusCode(boolean canRetryOnADifferentHost) + throws ExecutionException, InterruptedException + { + when(mockResponse.statusCode()).thenReturn(NOT_FOUND.code()); + + CompletableFuture<HttpResponse> future = new CompletableFuture<>(); + if (canRetryOnADifferentHost) + { + testWithRetries(mockRequest, mockResponse, null, 3, 50, true); + } + else + { + defaultBasicRetryPolicy.onResponse(future, mockRequest, mockResponse, null, 1, canRetryOnADifferentHost, + (attempts, retryDelayMillis) -> fail("Should never retry")); + assertThatExceptionOfType(CompletionException.class) + .isThrownBy(future::join) + .withCauseInstanceOf(ResourceNotFoundException.class); + } + } + + @ParameterizedTest(name = "{index} => canRetryOnADifferentHost={0}") + @ValueSource(booleans = { true, false }) + void testRetriesWithNotImplementedStatusCode(boolean canRetryOnADifferentHost) + { + when(mockResponse.statusCode()).thenReturn(NOT_IMPLEMENTED.code()); + + CompletableFuture<HttpResponse> future = new CompletableFuture<>(); + defaultBasicRetryPolicy.onResponse(future, mockRequest, mockResponse, null, 1, canRetryOnADifferentHost, + (attempts, retryDelayMillis) -> fail("Should never retry")); + + assertThatExceptionOfType(CompletionException.class) + .isThrownBy(future::join) + .withCauseInstanceOf(UnsupportedOperationException.class); + assertThat(future.isCompletedExceptionally()).isTrue(); + } + + @Test + void testRetriesWithNotAcceptableStatusCode() + { + when(mockResponse.statusCode()).thenReturn(NOT_ACCEPTABLE.code()); + testWithRetries(mockRequest, mockResponse, null, 8, 50, true); + } + + @ParameterizedTest(name = "{index} => canRetryOnADifferentHost={0}") + @ValueSource(booleans = { true, false }) + void testRetriesWithServiceUnavailableStatusCode(boolean canRetryOnADifferentHost) + { + when(mockResponse.statusCode()).thenReturn(SERVICE_UNAVAILABLE.code()); + testWithRetries(mockRequest, mockResponse, null, 5, 300, canRetryOnADifferentHost); + } + + @Test + void testRetriesWithServiceUnavailableStatusCodeWithRetryAfterHeader() + { + when(mockResponse.statusCode()).thenReturn(SERVICE_UNAVAILABLE.code()); + headersMap.put("Retry-After", Collections.singletonList("5")); // 5 seconds -> 5,000 millis + testWithRetries(mockRequest, mockResponse, null, 5, 1000, 5000, false); + testWithRetries(mockRequest, mockResponse, null, 5, 1000, 0, true); + } + + @Test + void testRetriesWithServiceUnavailableStatusCodeWithInvalidRetryAfterHeader() + { + when(mockResponse.statusCode()).thenReturn(SERVICE_UNAVAILABLE.code()); + headersMap.put("Retry-After", Collections.singletonList("one-thousand-seconds")); + testWithRetries(mockRequest, mockResponse, null, 5, 200, 200, false); + testWithRetries(mockRequest, mockResponse, null, 5, 200, 0, true); + } + + @Test + void testRetriesWithAcceptedStatusCode() throws ExecutionException, InterruptedException + { + when(mockResponse.statusCode()).thenReturn(ACCEPTED.code()); + RetryPolicy retryPolicy = new BasicRetryPolicy(1); + CompletableFuture<HttpResponse> future = new CompletableFuture<>(); + AtomicInteger retryActionCalls = new AtomicInteger(0); + int maxRetries = 50; + for (int currentAttempt = 1; currentAttempt <= maxRetries; currentAttempt++) + { + retryPolicy.onResponse(future, mockRequest, mockResponse, null, currentAttempt, true, + (attempts, retryDelayMillis) -> { + retryActionCalls.incrementAndGet(); + assertThat(attempts).isEqualTo(1); + assertThat(retryDelayMillis).isEqualTo(0); + }); + } + + when(mockResponse.statusCode()).thenReturn(OK.code()); + + retryPolicy.onResponse(future, mockRequest, mockResponse, null, 1, false, + (attempts, retryDelayMillis) -> fail("Should not retry")); + future.join(); + assertThat(future.isDone()).isTrue(); + assertThat(future.get()).isSameAs(mockResponse); + assertThat(retryActionCalls.get()).isEqualTo(maxRetries); + } + + private static Stream<Arguments> clientStatusCodeArguments() Review Comment: nit: All `4XX` are client error. The stream returned is not the complete set. The original implementation only returns the established client error codes, which is a subset. That being said, it is rarely a good idea to have custom status code. I only suggest to cover all 4XX code from the perspective of testing, because all of them technically represent client errors, regardless of the predefined or the custom. ```suggestion private static Stream<Arguments> clientStatusCodeArguments() { return IntStream.range(400, 500).boxed().map(Arguments::of); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]

