aratno commented on code in PR #4360:
URL: https://github.com/apache/cassandra/pull/4360#discussion_r2361582368
##########
src/java/org/apache/cassandra/replication/UnreconciledMutations.java:
##########
@@ -197,4 +198,41 @@ else if (cmp > 0)
return collect(statesSet.subSet(start, end), tableId,
includePending, into);
}
}
+
+ @VisibleForTesting
+ boolean equalsForTesting(UnreconciledMutations other)
+ {
+ return this.statesMap.equals(other.statesMap) &&
this.statesSet.equals(other.statesSet);
+ }
+
+ @VisibleForTesting
+ void addForTesting(Mutation mutation)
+ {
+ Entry entry = Entry.create(mutation);
+ entry.visibility = Visibility.VISIBLE;
+ statesMap.put(entry.offset, entry);
+ statesSet.add(entry);
+ }
+
+ static UnreconciledMutations loadFromJournal(Node2OffsetsMap
witnessedOffsets, int localNodeId)
+ {
+ UnreconciledMutations result = new UnreconciledMutations();
+
+ Offsets.Mutable witnessed = witnessedOffsets.get(localNodeId);
+ Offsets.Mutable reconciled = witnessedOffsets.intersection();
+
+ // difference between locally witnessed offsets and fully reconciled
ones is all the ids
+ // that need to be loaded into UnreconciledMutations index
+ Offsets.RangeIterator iter =
Offsets.difference(witnessed.rangeIterator(), reconciled.rangeIterator());
+ while (iter.tryAdvance())
+ {
+ for (int offset = iter.start(), end = iter.end(); offset <= end;
offset++)
+ {
+ ShortMutationId id = new ShortMutationId(witnessed.logId,
offset);
+ result.addForTesting(MutationJournal.instance.read(id));
Review Comment:
Can we avoid calling a *ForTesting method from a production path?
##########
src/java/org/apache/cassandra/replication/CoordinatorLog.java:
##########
@@ -343,16 +402,119 @@ private long nextSequenceId()
static class CoordinatorLogReplica extends CoordinatorLog
{
- CoordinatorLogReplica(int localHostId, CoordinatorLogId logId,
Participants participants)
+ CoordinatorLogReplica(
+ String keyspace, Range<Token> range, int localNodeId,
CoordinatorLogId logId, Participants participants,
+ Node2OffsetsMap witnessedOffsets, Node2OffsetsMap persistedOffsets)
{
- super(localHostId, logId, participants);
+ super(keyspace, range, localNodeId, logId, participants,
witnessedOffsets, persistedOffsets);
+ }
+
+ CoordinatorLogReplica(String keyspace, Range<Token> range, int
localNodeId, CoordinatorLogId logId, Participants participants)
+ {
+ super(keyspace, range, localNodeId, logId, participants);
}
@Override
- void receivedWriteResponse(ShortMutationId mutationId, int fromHostId)
+ void receivedWriteResponse(ShortMutationId mutationId, int fromNodeId)
{
// no-op
}
}
+ /*
+ * Persist to / load from system table.
+ */
+
+ private static final String INSERT_QUERY =
+ format("INSERT INTO %s.%s (keyspace_name, range_start, range_end,
host_id, host_log_id, participants, witnessed_offsets, persisted_offsets) "
+ + "VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
+ SchemaConstants.SYSTEM_KEYSPACE_NAME,
SystemKeyspace.COORDINATOR_LOGS);
+
+ void persistToSystemTable()
+ {
+ Map<Integer, List<Integer>> witnessed = new Int2ObjectHashMap<>();
+ Map<Integer, List<Integer>> persisted = new Int2ObjectHashMap<>();
+
+ lock.readLock().lock();
+ try
+ {
+ witnessedOffsets.convertToPrimitiveMap(witnessed);
+ persistedOffsets.convertToPrimitiveMap(persisted);
+ }
+ finally
+ {
+ lock.readLock().unlock();
+ }
+ executeInternal(INSERT_QUERY, keyspace, range.left.toString(),
range.right.toString(), logId.hostId,
+ logId.hostLogId, participants.asSet(), witnessed,
persisted);
+ }
+
+ void updateLogsInSystemTable()
+ {
+ Offsets.Mutable localWitnessed;
+ Map<Integer, List<Integer>> witnessed = new Int2ObjectHashMap<>();
+ Map<Integer, List<Integer>> persisted = new Int2ObjectHashMap<>();
+
+ lock.readLock().lock();
+ try
+ {
+ localWitnessed =
Offsets.Mutable.copy(witnessedOffsets.get(localNodeId));
+
+ witnessedOffsets.convertToPrimitiveMap(witnessed);
+ persistedOffsets.convertToPrimitiveMap(persisted);
Review Comment:
Right, I wasn't thinking about the frozen<list> representation at the time.
Makes sense as-is for now.
`persistedOffsets` could grow for quite a while if a single replica is down
and can't reconcile, so that's where I see this getting expensive in the future.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]