frankgh commented on code in PR #171:
URL: https://github.com/apache/cassandra-sidecar/pull/171#discussion_r1924457257
##########
server-common/src/main/java/org/apache/cassandra/sidecar/common/server/StorageOperations.java:
##########
@@ -62,6 +61,8 @@ void takeSnapshot(@NotNull String tag, @NotNull String
keyspace, @NotNull String
* @param keyspace keyspace to check the data ownership; Cassandra selects
a keyspace if null value is passed.
* @return ring view
* @throws UnknownHostException when hostname of peer Cassandra nodes
cannot be resolved
+ *
+ * TODO: refactor. Do not return http response payload object from this
layer.
Review Comment:
I think it's better to file JIRAs because these sometimes get lost in the
code
##########
server-common/src/main/java/org/apache/cassandra/sidecar/common/server/exceptions/SchemaUnavailableException.java:
##########
@@ -16,21 +16,21 @@
* limitations under the License.
*/
-package org.apache.cassandra.sidecar.exceptions;
+package org.apache.cassandra.sidecar.common.server.exceptions;
Review Comment:
I think it makes more sense to keep a flat package structure
cross-subprojects for exceptions
##########
server/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobDiscoverer.java:
##########
@@ -19,17 +19,19 @@
package org.apache.cassandra.sidecar.restore;
import java.nio.file.Paths;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
Review Comment:
use `org.apache.cassandra.sidecar.common.utils.Preconditions` instead?
##########
server/src/main/java/org/apache/cassandra/sidecar/restore/RingTopologyRefresher.java:
##########
@@ -81,21 +101,54 @@ public ScheduleDecision scheduleDecision()
return replicaByTokenRangePerKeyspace.isEmpty() ?
ScheduleDecision.SKIP : ScheduleDecision.EXECUTE;
}
+ /**
+ * Execute the periodic task to refresh the topology layout for all
registered keyspaces
+ *
+ * <p>It is synchronized as there is potential contention from {@link
#localTokenRanges(String)}
+ * @param promise a promise when the execution completes
+ */
@Override
- public void execute(Promise<Void> promise)
+ public synchronized void execute(Promise<Void> promise)
{
- executeBlocking();
+ prepareAndFetch(this::loadAll);
promise.tryComplete();
}
- public void register(RestoreJob restoreJob)
+ public Set<UUID> allRestoreJobsOfKeyspace(String keyspace)
+ {
+ return replicaByTokenRangePerKeyspace.jobsByKeyspace.get(keyspace);
+ }
+
+ public void register(RestoreJob restoreJob, RingTopologyChangeListener
listener)
{
replicaByTokenRangePerKeyspace.register(restoreJob);
+ addRingTopologyChangeListener(restoreJob.keyspaceName, listener);
+ }
+
+ public void unregister(RestoreJob restoreJob, RingTopologyChangeListener
listener)
+ {
+ boolean allRemoved =
replicaByTokenRangePerKeyspace.unregister(restoreJob);
+ if (allRemoved)
+ {
+ removeRingTopologyChangeListener(restoreJob.keyspaceName,
listener);
+ }
}
- public void unregister(RestoreJob restoreJob)
+ public void addRingTopologyChangeListener(String keyspace,
RingTopologyChangeListener listener)
{
- replicaByTokenRangePerKeyspace.unregister(restoreJob);
+ listenersByKeyspace.compute(keyspace, (k, v) -> {
+ Set<RingTopologyChangeListener> listeners = v == null ?
ConcurrentHashMap.newKeySet() : v;
+ listeners.add(listener);
+ return listeners;
+ });
Review Comment:
```suggestion
listenersByKeyspace.computeIfAbsent(keyspace, ks ->
ConcurrentHashMap.newKeySet())
.add(listener);
```
##########
server/src/test/java/org/apache/cassandra/sidecar/restore/RingTopologyRefresherTest.java:
##########
@@ -141,10 +157,65 @@ void testLoadAndGetMapping()
Future<TokenRangeReplicasResponse> future =
replicaByTokenRangePerKeyspace.futureOf(job);
assertThat(future.isComplete()).isFalse();
assertThat(replicaByTokenRangePerKeyspace.forRestoreJob(job)).isNull();
+ assertThat(listenerValueCaptor).isEmpty();
TokenRangeReplicasResponse mockTopology =
mock(TokenRangeReplicasResponse.class);
replicaByTokenRangePerKeyspace.load(ks -> mockTopology);
assertThat(future.isComplete()).isTrue();
assertThat(future.result()).isSameAs(mockTopology);
assertThat(replicaByTokenRangePerKeyspace.forRestoreJob(job)).isSameAs(mockTopology);
+ assertThat(listenerValueCaptor).hasSize(1);
+ assertListenerCapturedValues(listenerValueCaptor.get(0),
job.keyspaceName, null, mockTopology);
+ }
+
+ @Test
+ void testTopologyChanged()
+ {
+ assertThat(listenerValueCaptor).isEmpty();
+ RestoreJob job = RestoreJobTest.createNewTestingJob(UUIDs.timeBased());
+ replicaByTokenRangePerKeyspace.futureOf(job);
+ TokenRangeReplicasResponse mockTopologyEpoch1 =
mock(TokenRangeReplicasResponse.class);
+ when(mockTopologyEpoch1.writeReplicas())
+ .thenReturn(Collections.singletonList(new
TokenRangeReplicasResponse.ReplicaInfo("1", "10", null)));
+ replicaByTokenRangePerKeyspace.load(ks -> mockTopologyEpoch1);
+ assertThat(listenerValueCaptor).hasSize(1);
+ assertListenerCapturedValues(listenerValueCaptor.get(0),
job.keyspaceName, null, mockTopologyEpoch1);
+ TokenRangeReplicasResponse mockTopologyEpoch2 =
mock(TokenRangeReplicasResponse.class);
+ when(mockTopologyEpoch1.writeReplicas())
+ .thenReturn(Collections.singletonList(new
TokenRangeReplicasResponse.ReplicaInfo("100", "110", null)));
+ replicaByTokenRangePerKeyspace.load(ks -> mockTopologyEpoch2);
+ assertThat(listenerValueCaptor).hasSize(2);
+ assertListenerCapturedValues(listenerValueCaptor.get(1),
job.keyspaceName, mockTopologyEpoch1, mockTopologyEpoch2);
+ }
+
+ @Test
+ void testMergeTokenRanges()
+ {
+ List<Set<TokenRange>> testAndResults = Arrays.asList(
+ // Test set, Result set
+// ImmutableSet.of(r(1, 10)), ImmutableSet.of(r(1, 10)),
Review Comment:
remove or uncomment?
##########
server/src/test/java/org/apache/cassandra/sidecar/restore/RingTopologyRefresherTest.java:
##########
@@ -141,10 +157,65 @@ void testLoadAndGetMapping()
Future<TokenRangeReplicasResponse> future =
replicaByTokenRangePerKeyspace.futureOf(job);
assertThat(future.isComplete()).isFalse();
assertThat(replicaByTokenRangePerKeyspace.forRestoreJob(job)).isNull();
+ assertThat(listenerValueCaptor).isEmpty();
TokenRangeReplicasResponse mockTopology =
mock(TokenRangeReplicasResponse.class);
replicaByTokenRangePerKeyspace.load(ks -> mockTopology);
assertThat(future.isComplete()).isTrue();
assertThat(future.result()).isSameAs(mockTopology);
assertThat(replicaByTokenRangePerKeyspace.forRestoreJob(job)).isSameAs(mockTopology);
+ assertThat(listenerValueCaptor).hasSize(1);
+ assertListenerCapturedValues(listenerValueCaptor.get(0),
job.keyspaceName, null, mockTopology);
+ }
+
+ @Test
+ void testTopologyChanged()
+ {
+ assertThat(listenerValueCaptor).isEmpty();
+ RestoreJob job = RestoreJobTest.createNewTestingJob(UUIDs.timeBased());
+ replicaByTokenRangePerKeyspace.futureOf(job);
+ TokenRangeReplicasResponse mockTopologyEpoch1 =
mock(TokenRangeReplicasResponse.class);
+ when(mockTopologyEpoch1.writeReplicas())
+ .thenReturn(Collections.singletonList(new
TokenRangeReplicasResponse.ReplicaInfo("1", "10", null)));
+ replicaByTokenRangePerKeyspace.load(ks -> mockTopologyEpoch1);
+ assertThat(listenerValueCaptor).hasSize(1);
+ assertListenerCapturedValues(listenerValueCaptor.get(0),
job.keyspaceName, null, mockTopologyEpoch1);
+ TokenRangeReplicasResponse mockTopologyEpoch2 =
mock(TokenRangeReplicasResponse.class);
+ when(mockTopologyEpoch1.writeReplicas())
+ .thenReturn(Collections.singletonList(new
TokenRangeReplicasResponse.ReplicaInfo("100", "110", null)));
+ replicaByTokenRangePerKeyspace.load(ks -> mockTopologyEpoch2);
+ assertThat(listenerValueCaptor).hasSize(2);
+ assertListenerCapturedValues(listenerValueCaptor.get(1),
job.keyspaceName, mockTopologyEpoch1, mockTopologyEpoch2);
+ }
+
+ @Test
+ void testMergeTokenRanges()
+ {
+ List<Set<TokenRange>> testAndResults = Arrays.asList(
+ // Test set, Result set
+// ImmutableSet.of(r(1, 10)), ImmutableSet.of(r(1, 10)),
+ ImmutableSet.of(r(1, 10), r(11, 20)), ImmutableSet.of(r(1, 10), r(11,
20)),
+ ImmutableSet.of(r(1, 10), r(10, 20)), ImmutableSet.of(r(1, 20)),
+ ImmutableSet.of(r(1, 10), r(5, 20)), ImmutableSet.of(r(1, 20)),
+ ImmutableSet.of(r(1, 10), r(5, 20), r(25, 30)), ImmutableSet.of(r(1,
20), r(25, 30)),
+ ImmutableSet.of(r(1, 4), r(5, 20), r(15, 30)), ImmutableSet.of(r(1,
4), r(5, 30))
+ );
+ for (int i = 0; i < testAndResults.size(); i += 2)
+ {
+
assertThat(RingTopologyRefresher.mergeTokenRanges(testAndResults.get(i))).isEqualTo(testAndResults.get(i
+ 1));
+ }
+ }
Review Comment:
use parameterized test for this?
```
@ParameterizedTest(name = "{index}: input={0} expected={1}")
@MethodSource("inputTokenRangesToMerge")
void testMergeTokenRanges(Set<TokenRange> input, Set<TokenRange>
expected)
{
assertThat(RingTopologyRefresher.mergeTokenRanges(input)).isEqualTo(expected);
}
public static Stream<Arguments> inputTokenRangesToMerge()
{
return Stream.of(
Arguments.arguments(ImmutableSet.of(r(1, 10), r(11, 20)),
ImmutableSet.of(r(1, 10), r(11, 20))),
Arguments.arguments(ImmutableSet.of(r(1, 10), r(10, 20)),
ImmutableSet.of(r(1, 20))),
Arguments.arguments(ImmutableSet.of(r(1, 10), r(5, 20)),
ImmutableSet.of(r(1, 20))),
Arguments.arguments(ImmutableSet.of(r(1, 10), r(5, 20), r(25, 30)),
ImmutableSet.of(r(1, 20), r(25, 30))),
Arguments.arguments(ImmutableSet.of(r(1, 4), r(5, 20), r(15, 30)),
ImmutableSet.of(r(1, 4), r(5, 30)))
);
}
```
##########
server/src/main/java/org/apache/cassandra/sidecar/db/RestoreRange.java:
##########
@@ -613,16 +658,42 @@ public Builder endToken(BigInteger endToken)
public Builder replicaStatus(Map<String, RestoreRangeStatus>
statusByReplica)
{
+ for (RestoreRangeStatus status : statusByReplica.values())
+ {
+ if (status == RestoreRangeStatus.DISCARDED)
+ {
+ return discard();
+ }
+ }
Review Comment:
this is a comment for the line below, should we just keep an immutable copy
here instead?
```
return update(b -> b.statusByReplica =
Collections.unmodifiableMap(statusByReplica));
```
##########
server/src/main/java/org/apache/cassandra/sidecar/utils/InstanceMetadataFetcher.java:
##########
@@ -133,6 +133,12 @@ public InstanceMetadata anyInstance()
return instances.get(randomPick);
}
+ public List<InstanceMetadata> allInstances()
Review Comment:
hmm, this might be misleading. This should be renamed from `allInstances` to
`allLocalInstances`.
##########
client-common/src/main/java/org/apache/cassandra/sidecar/common/data/RestoreJobStatus.java:
##########
@@ -23,13 +23,36 @@
*/
public enum RestoreJobStatus
{
+ /**
+ * The external controller creates the RestoreJob
+ */
CREATED,
+ /**
+ * The external controller updates the status of the RestoreJob to
STAGE_READY.
+ * It indicates that all relevant slices of the RestoreJob have been
uploaded and ready to consume.
+ */
STAGE_READY,
+ /**
+ * All relevant slices are staged and the staged data satisfies the
consistency requirement of the RestoreJob.
+ * The external controller updates the status of the RestoreJob to STAGED.
+ */
STAGED,
+ /**
+ * The external controller updates the status of the RestoreJob to
IMPORT_READY
+ * It indicates that all staged data now are ready to be imported into
Cassandra.
+ */
IMPORT_READY,
@Deprecated // replaced by ABORTED
Review Comment:
NIT
```suggestion
/**
* @deprecated replaced by {@link #ABORTED}
*/
@Deprecated
```
##########
client-common/src/main/java/org/apache/cassandra/sidecar/common/data/RestoreJobStatus.java:
##########
@@ -23,13 +23,36 @@
*/
public enum RestoreJobStatus
{
+ /**
+ * The external controller creates the RestoreJob
+ */
CREATED,
+ /**
+ * The external controller updates the status of the RestoreJob to
STAGE_READY.
+ * It indicates that all relevant slices of the RestoreJob have been
uploaded and ready to consume.
Review Comment:
```suggestion
* It indicates that all relevant slices of the RestoreJob have been
uploaded and ready to be consumed.
```
##########
server/src/testFixtures/java/org/apache/cassandra/sidecar/config/yaml/TestServiceConfiguration.java:
##########
@@ -35,7 +35,7 @@ public static ServiceConfiguration newInstance()
public static Builder builder()
{
return ServiceConfigurationImpl.builder()
- .host("127.0.0.1")
+ .host("0.0.0.0")
Review Comment:
what's the need for this change?
##########
server/src/main/java/org/apache/cassandra/sidecar/cluster/locator/CachedLocalTokenRanges.java:
##########
@@ -78,7 +78,7 @@ public CachedLocalTokenRanges(InstancesMetadata
instancesMetadata, DnsResolver d
}
@Override
- public Map<Integer, Set<TokenRange>> localTokenRanges(String keyspace)
+ public Map<Integer, Set<TokenRange>> localTokenRanges(String keyspace,
boolean forceRefresh)
Review Comment:
maybe rename `forceRefresh` to `ignored` and add a javadoc to explain why in
this case `forceRefresh` is ignored?
##########
server/src/main/java/org/apache/cassandra/sidecar/restore/RestoreJobProgress.java:
##########
@@ -106,6 +108,22 @@ private String buildMessage(RestoreJobStatus jobStatus)
return message + " Current job status: " + jobStatus;
}
+ @VisibleForTesting
+ public List<RestoreRange> allRanges()
Review Comment:
it doesn't need to be public, we only consume it for testing purposes
```suggestion
List<RestoreRange> allRanges()
```
##########
server-common/src/main/java/org/apache/cassandra/sidecar/common/server/cluster/locator/TokenRange.java:
##########
@@ -161,6 +164,22 @@ public boolean overlaps(TokenRange other)
&&
other.range.lowerEndpoint().compareTo(this.range.upperEndpoint()) < 0;
}
+ /**
+ * Two ranges are connects with each other when 1) they overlap or 2)
their ends are connected.
Review Comment:
```suggestion
* Two ranges connect with each other when 1) they overlap or 2) their
ends are connected.
```
##########
server/src/main/java/org/apache/cassandra/sidecar/db/RestoreRange.java:
##########
@@ -385,6 +423,11 @@ public BigInteger endToken()
return this.endToken;
}
+ public TokenRange tokenRange()
+ {
+ return new TokenRange(startToken, endToken);
Review Comment:
do we really need to build the token range every time? is it possible to
just keep a field with the token range and remove the `startToken` and
`endToken` fields from this class?
##########
server-common/src/test/java/org/apache/cassandra/sidecar/common/server/data/RestoreRangeStatusTest.java:
##########
@@ -56,7 +59,10 @@ void testInvalidStatusAdvancing()
{ STAGED, STAGED },
{ SUCCEEDED, FAILED },
{ FAILED, SUCCEEDED },
- { FAILED, ABORTED }
+ { FAILED, ABORTED },
+ { DISCARDED, CREATED },
+ { DISCARDED, STAGED },
+ { DISCARDED, SUCCEEDED },
Review Comment:
this should be a parameterized test instead:
```java
@ParameterizedTest(name = "{index}: {0} -> {1}")
@MethodSource("invalidStatusAdvancingSource")
void testInvalidStatusAdvancing(RestoreRangeStatus from,
RestoreRangeStatus to)
{
String commonErrorMsg = "status can only advance to one of the
follow statuses";
assertThatThrownBy(() ->
from.advanceTo(to)).isExactlyInstanceOf(IllegalArgumentException.class)
.hasNoCause()
.hasMessageContaining(commonErrorMsg);
}
public static Stream<Arguments> invalidStatusAdvancingSource()
{
return Stream.of(Arguments.of(STAGED, CREATED),
Arguments.of(CREATED, SUCCEEDED),
Arguments.of(STAGED, STAGED),
Arguments.of(SUCCEEDED, FAILED),
Arguments.of(FAILED, SUCCEEDED),
Arguments.of(FAILED, ABORTED),
Arguments.of(DISCARDED, CREATED),
Arguments.of(DISCARDED, STAGED),
Arguments.of(DISCARDED, SUCCEEDED));
}
```
##########
server/src/main/java/org/apache/cassandra/sidecar/db/RestoreRangeDatabaseAccessor.java:
##########
@@ -70,8 +76,12 @@ public RestoreRange updateStatus(RestoreRange range)
{
sidecarSchema.ensureInitialized();
+ Map<String, String> statusTextByReplica = range.isDiscarded()
Review Comment:
we should probably only add the discard marker for the local replica and not
for all replicas. We should also check the discard marker applies to the local
replica when consumed.
##########
server/src/test/integration/org/apache/cassandra/testing/CassandraTestTemplate.java:
##########
@@ -429,7 +431,7 @@ private static boolean portNotAvailableToBind(Throwable
cause)
static
{
// Settings to reduce the test setup delay incurred if gossip is
enabled
- System.setProperty("cassandra.ring_delay_ms", "5000"); // down from
30s default
+ System.setProperty("cassandra.ring_delay_ms", "5000"); // down from
30s default; this change has no effect is GOSSIP feature is enabled
Review Comment:
```suggestion
System.setProperty("cassandra.ring_delay_ms", "5000"); // down from
30s default; this change has no effect if GOSSIP feature is enabled
```
##########
server/src/test/integration/org/apache/cassandra/sidecar/testing/IntegrationTestModule.java:
##########
@@ -99,6 +100,7 @@ public SidecarConfiguration
configuration(CoordinationConfiguration clusterLease
.isEnabled(true)
.build())
.coordinationConfiguration(clusterLeaseClaimTaskConfiguration)
+ .sstableUploadConfiguration(new
SSTableUploadConfigurationImpl(0F))
Review Comment:
there are a few unrelated changes to this PR, and it is hard to follow
sometimes. Why do we want to do this?
##########
server/src/test/java/org/apache/cassandra/sidecar/CassandraSidecarDaemonTest.java:
##########
@@ -87,14 +89,15 @@ void testSuccessfulStartup() throws Exception
CassandraSidecarDaemon.main(NO_ARGS);
WebClient client = WebClient.create(Vertx.vertx());
- HttpResponse<String> response = client.get(9043, "localhost",
"/api/v1/__health")
- .as(BodyCodec.string())
- .send()
- .toCompletionStage()
- .toCompletableFuture()
- .get(10, TimeUnit.SECONDS);
- assertThat(response.statusCode()).isEqualTo(OK.code());
- assertThat(response.body()).isEqualTo("{\"status\":\"OK\"}");
+ loopAssert(10, () -> {
+ HttpResponse<String> response = getBlocking(client.get(9043,
"localhost", "/api/v1/__health")
Review Comment:
flakiness?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]