maedhroz commented on code in PR #4428:
URL: https://github.com/apache/cassandra/pull/4428#discussion_r2529072914
##########
src/java/org/apache/cassandra/replication/ImmutableCoordinatorLogOffsets.java:
##########
@@ -123,24 +127,90 @@ public Builder add(MutationId mutationId)
return this;
}
- public Builder addAll(CoordinatorLogOffsets<?> logOffsets)
+ private Builder addAll(CoordinatorLogOffsets.Mutations<? extends
Offsets> mutations)
{
- for (long log : logOffsets)
+ for (long log : mutations)
{
- Offsets offsets = logOffsets.offsets(log);
+ Offsets offsets = mutations.offsets(log);
ids.computeIfAbsent(log, logId -> new
Offsets.Immutable.Builder(new CoordinatorLogId(logId)))
.addAll(offsets);
}
return this;
}
+ public Builder addAll(CoordinatorLogOffsets<?> logOffsets)
+ {
+ addAll(logOffsets.mutations());
+ ActivatedTransfers newTransfers = logOffsets.transfers();
+ if (transfers == null)
+ transfers = newTransfers;
+ else
+ transfers.addAll(newTransfers);
+ return this;
+ }
+
public Builder addAll(Offsets.Immutable offsets)
{
ids.computeIfAbsent(offsets.logId.asLong(), logId -> new
Offsets.Immutable.Builder(new CoordinatorLogId(logId)))
.addAll(offsets);
return this;
}
+ @VisibleForTesting
+ public Builder addTransfer(ShortMutationId activationId, Bounds<Token>
bounds)
+ {
+ if (activationId.isNone())
+ return this;
+ if (transfers == null)
+ transfers = new ActivatedTransfers();
+ transfers.add(activationId, bounds);
+ return this;
+ }
+
+ public Builder addTransfer(ShortMutationId activationId,
Collection<SSTableReader> sstables)
+ {
+ if (activationId.isNone())
+ return this;
+ if (transfers == null)
+ transfers = new ActivatedTransfers();
+ transfers.add(activationId, sstables);
+ return this;
+ }
+
+ public Builder addTransfers(ActivatedTransfers other)
+ {
+ if (other.isEmpty())
+ return this;
+ if (transfers == null)
+ transfers = other;
+ else
+ transfers.addAll(other);
+ return this;
+ }
+
+ /**
+ * Removes expired transfers
+ */
+ public void purgeTransfers()
+ {
+ Predicate<ShortMutationId> pred =
MutationTrackingService.instance::isDurablyReconciled;
+ int purged = 0;
+ if (transfers != null)
+ {
+ Iterator<ShortMutationId> iter = transfers.iterator();
+ while (iter.hasNext()) {
Review Comment:
nit/formatting: Bump `{` to newline (and below on 203)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]