ifesdjeen commented on code in PR #3196:
URL: https://github.com/apache/cassandra/pull/3196#discussion_r1555957713
##########
src/java/org/apache/cassandra/tcm/ownership/PlacementForRange.java:
##########
@@ -307,17 +307,51 @@ public Builder
withReplicaGroup(VersionedEndpoints.ForRange replicas)
(t, v) -> {
EndpointsForRange old =
v.get();
return
VersionedEndpoints.forRange(Epoch.max(v.lastModified(),
replicas.lastModified()),
-
replicas.get()
-
.newBuilder(replicas.size() + old.size())
-
.addAll(old)
-
.addAll(replicas.get(), ReplicaCollection.Builder.Conflict.ALL)
-
.build());
+
combine(old, replicas.get()));
});
if (group == null)
replicaGroups.put(replicas.range(), replicas);
return this;
}
+ /**
+ * Combine two replica groups, assuming one is the current group and
the other is the proposed.
+ * During range movements this is used when calculating the maximal
placement, which combines the current and
+ * future replica groups. This special cases the merging of two
replica groups to make sure that when a replica
+ * moves from transient to full, it starts to act as a FULL write
replica as early as possible.
+ *
+ * Where an endpoint is present in both groups, prefer the proposed
iff it is a FULL replica. During a
+ * multi-step operation (join/leave/move), we want any change from
transient to full to happen as early
+ * as possible so that a replica being whose ownership is modified in
this way becomes FULL for writes before it
Review Comment:
nit: Could you rephrase this part a bit?
> a replica being whose ownership is modified in this way becomes FULL for
writes before it
I am a bit thrown off by replica being part.
##########
src/java/org/apache/cassandra/tcm/ownership/PlacementTransitionPlan.java:
##########
@@ -125,4 +144,78 @@ public String toString()
", compiled=" + (addToWrites == null) +
'}';
}
+
+
+ /**
+ * Makes sure that a newly added read replica for a range already exists
as a write replica
+ *
+ * We should never add both read & write replicas for the same range at
the same time (or read replica before write)
+ *
+ * Also makes sure that we don't add a full read replica while the same
write replica is only transient - we should
+ * always make the write replica full before adding the read replica.
+ *
+ * We split and merge ranges, so in the previous placements we could have
full write replicas (a, b], (b, c], but then
+ * add a full read replica (a, c].
+ *
+ * @return null if everything is good, otherwise a Transformation.Result
rejection containing information about the bad replica
+ */
+ @Nullable
+ public static Transformation.Result
assertPreExistingWriteReplica(DataPlacements placements,
PlacementTransitionPlan transitionPlan)
+ {
+ return assertPreExistingWriteReplica(placements,
+ transitionPlan.toSplit,
+ transitionPlan.addToWrites(),
+ transitionPlan.moveReads(),
+
transitionPlan.removeFromWrites());
+ }
+
+ @Nullable
+ public static Transformation.Result
assertPreExistingWriteReplica(DataPlacements placements, PlacementDeltas ...
deltasInOrder)
+ {
+ for (PlacementDeltas deltas : deltasInOrder)
+ {
+ for (Map.Entry<ReplicationParams, PlacementDeltas.PlacementDelta>
entry : deltas)
+ {
+ ReplicationParams params = entry.getKey();
+ PlacementDeltas.PlacementDelta delta = entry.getValue();
+ for (Map.Entry<InetAddressAndPort, RangesAtEndpoint> addedRead
: delta.reads.additions.entrySet())
+ {
+ RangesAtEndpoint addedReadReplicas = addedRead.getValue();
+ RangesAtEndpoint existingWriteReplicas =
placements.get(params).writes.byEndpoint().get(addedRead.getKey());
+ // we're adding read replicas - they should always exist
as write replicas before doing that
+ // BUT we split and merge ranges so we need to check
containment both ways
+ for (Replica newReadReplica : addedReadReplicas)
+ {
+ if (existingWriteReplicas.contains(newReadReplica))
+ continue;
+ boolean contained = false;
+ Set<Range<Token>> intersectingRanges = new HashSet<>();
+ for (Replica writeReplica : existingWriteReplicas)
+ {
+ if (writeReplica.isFull() ==
newReadReplica.isFull() || writeReplica.isFull() &&
newReadReplica.isTransient())
Review Comment:
nit: would you mind adding brackets here to make operator order more
explicit here?
##########
src/java/org/apache/cassandra/tcm/ownership/PlacementForRange.java:
##########
@@ -307,17 +307,51 @@ public Builder
withReplicaGroup(VersionedEndpoints.ForRange replicas)
(t, v) -> {
EndpointsForRange old =
v.get();
return
VersionedEndpoints.forRange(Epoch.max(v.lastModified(),
replicas.lastModified()),
-
replicas.get()
-
.newBuilder(replicas.size() + old.size())
-
.addAll(old)
-
.addAll(replicas.get(), ReplicaCollection.Builder.Conflict.ALL)
-
.build());
+
combine(old, replicas.get()));
});
if (group == null)
replicaGroups.put(replicas.range(), replicas);
return this;
}
+ /**
+ * Combine two replica groups, assuming one is the current group and
the other is the proposed.
+ * During range movements this is used when calculating the maximal
placement, which combines the current and
+ * future replica groups. This special cases the merging of two
replica groups to make sure that when a replica
+ * moves from transient to full, it starts to act as a FULL write
replica as early as possible.
+ *
+ * Where an endpoint is present in both groups, prefer the proposed
iff it is a FULL replica. During a
+ * multi-step operation (join/leave/move), we want any change from
transient to full to happen as early
+ * as possible so that a replica being whose ownership is modified in
this way becomes FULL for writes before it
+ * becomes FULL for reads. This works as additions to write replica
groups are applied before any other
+ * placement changes (i.e. in START_[JOIN|LEAVE|MOVE]).
+ *
+ * @param prev Initial set of replicas for a given range
+ * @param next Proposed set of replicas for the same range.
+ * @return The union of the two groups
+ */
+ private EndpointsForRange combine(EndpointsForRange prev,
EndpointsForRange next)
+ {
+ Map<InetAddressAndPort, Replica> e1 = prev.byEndpoint();
+ Map<InetAddressAndPort, Replica> e2 = next.byEndpoint();
+ EndpointsForRange.Builder combined = prev.newBuilder(prev.size() +
next.size());
+ e1.forEach((e, r1) -> {
+ Replica r2 = e2.get(e);
+ if (null == r2) // not present in next
+ combined.add(r1);
+ else if (r2.isFull()) // prefer replica from next, if it is
moving from transient to full
Review Comment:
nit: could probably short-circuit this with
```
if (r2 != null && r2.isFull)
combined.add(r2);
else
combined.add(r1);
```
If you deliberately have split these, I do nit mind leaving as-is.
##########
src/java/org/apache/cassandra/tcm/ownership/PlacementForRange.java:
##########
@@ -307,17 +307,51 @@ public Builder
withReplicaGroup(VersionedEndpoints.ForRange replicas)
(t, v) -> {
EndpointsForRange old =
v.get();
return
VersionedEndpoints.forRange(Epoch.max(v.lastModified(),
replicas.lastModified()),
-
replicas.get()
-
.newBuilder(replicas.size() + old.size())
-
.addAll(old)
-
.addAll(replicas.get(), ReplicaCollection.Builder.Conflict.ALL)
-
.build());
+
combine(old, replicas.get()));
});
if (group == null)
replicaGroups.put(replicas.range(), replicas);
return this;
}
+ /**
+ * Combine two replica groups, assuming one is the current group and
the other is the proposed.
+ * During range movements this is used when calculating the maximal
placement, which combines the current and
+ * future replica groups. This special cases the merging of two
replica groups to make sure that when a replica
+ * moves from transient to full, it starts to act as a FULL write
replica as early as possible.
+ *
+ * Where an endpoint is present in both groups, prefer the proposed
iff it is a FULL replica. During a
+ * multi-step operation (join/leave/move), we want any change from
transient to full to happen as early
+ * as possible so that a replica being whose ownership is modified in
this way becomes FULL for writes before it
+ * becomes FULL for reads. This works as additions to write replica
groups are applied before any other
+ * placement changes (i.e. in START_[JOIN|LEAVE|MOVE]).
+ *
+ * @param prev Initial set of replicas for a given range
+ * @param next Proposed set of replicas for the same range.
+ * @return The union of the two groups
+ */
+ private EndpointsForRange combine(EndpointsForRange prev,
EndpointsForRange next)
+ {
+ Map<InetAddressAndPort, Replica> e1 = prev.byEndpoint();
+ Map<InetAddressAndPort, Replica> e2 = next.byEndpoint();
+ EndpointsForRange.Builder combined = prev.newBuilder(prev.size() +
next.size());
+ e1.forEach((e, r1) -> {
+ Replica r2 = e2.get(e);
+ if (null == r2) // not present in next
+ combined.add(r1);
+ else if (r2.isFull()) // prefer replica from next, if it is
moving from transient to full
+ combined.add(r2);
+ else
+ combined.add(r1); // replica is moving from full to
transient, or staying the same
+ });
+ // any new replicas not in prev
+ e2.forEach((e, r2) -> {
Review Comment:
Would it make sense to construct a set to iterate over in the above loop?
##########
test/harry/main/org/apache/cassandra/harry/sut/TokenPlacementModel.java:
##########
@@ -199,41 +228,28 @@ public static List<Node> peerStateToNodes(Object[][]
resultset)
public static class NtsReplicationFactor extends ReplicationFactor
{
- private final Lookup lookup = new DefaultLookup();
Review Comment:
oh thank you for removing this! This was an artifact of integrating with a
simulator, but it never was actually used in this context.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]