aratno commented on code in PR #4565:
URL: https://github.com/apache/cassandra/pull/4565#discussion_r2777592924
##########
src/java/org/apache/cassandra/streaming/StreamOperation.java:
##########
@@ -47,6 +47,11 @@ public enum StreamOperation
this.requiresBarrierTransaction = requiresBarrierTransaction;
}
+ public boolean isTrackable()
Review Comment:
I was thinking about names for this method, since "isTrackable" implies
support for tracking, but this is more of a requirement: if you're not tracking
these operations, you'll break correctness. What about something like
"requiresTrackedReconciliation"?
Could be useful to have a comment here too, along the lines of:
> For tracked keyspaces, streams that are received while serving reads must
be tracked in the log, to ensure that read reconciliation can represent the new
data. Repairs and imports are handled specially as CoordinatedTransfer for
tracked keyspaces.
##########
src/java/org/apache/cassandra/utils/concurrent/Ref.java:
##########
@@ -100,8 +100,8 @@
public final class Ref<T> implements RefCounted<T>
{
static final Logger logger = LoggerFactory.getLogger(Ref.class);
- public static final boolean TRACE_ENABLED =
TEST_DEBUG_REF_COUNT.getBoolean();
- public static final boolean DEBUG_EVENTS_ENABLED =
TEST_DEBUG_REF_EVENTS.getBoolean();
+ public static final boolean TRACE_ENABLED =
TEST_DEBUG_REF_COUNT.getBoolean(true);
+ public static final boolean DEBUG_EVENTS_ENABLED =
TEST_DEBUG_REF_EVENTS.getBoolean(true);
Review Comment:
Revert before merge?
##########
src/java/org/apache/cassandra/replication/UnreconciledMutations.java:
##########
@@ -274,10 +274,11 @@ static UnreconciledMutations
loadFromJournal(Node2OffsetsMap witnessedOffsets, i
result.addDirectly(mutation);
continue;
}
- CoordinatedTransfer transfer =
LocalTransfers.instance().getActivatedTransfer(id);
+ CoordinatedTransfer transfer =
TransferTrackingService.instance().getActivatedTransfer(id);
if (transfer != null)
{
- result.transfers.add(transfer.id(), transfer.sstables);
+ Preconditions.checkState(transfer instanceof
TrackedImportTransfer);
+ result.transfers.add(transfer.id(),
((TrackedImportTransfer) transfer).sstables);
Review Comment:
We need to support TrackedRepairTransfer inclusion here to make sure
summaries include activated repairs after restart.
##########
src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java:
##########
@@ -336,9 +337,10 @@ public final void abort()
protected Map<MetadataType, MetadataComponent> finalizeMetadata()
{
- // Migration from incremental repair to mutation tracking will be
supported, but support for mixing
- // incremental repair and mutation tracking is not planned
- if (metadata().replicationType().isTracked() && repairedAt ==
ActiveRepairService.UNREPAIRED_SSTABLE)
+ // Reconciliation should not occur before activation for coordinated
transfer streams for tracked keyspaces.
+ boolean reconcile = txn.opType() != OperationType.STREAM;
+
+ if (metadata().replicationType().isTracked() && repairedAt ==
ActiveRepairService.UNREPAIRED_SSTABLE && reconcile)
Review Comment:
Makes sense 👍
##########
src/java/org/apache/cassandra/streaming/StreamPlan.java:
##########
@@ -147,6 +143,15 @@ public StreamPlan transferRanges(InetAddressAndPort to,
String keyspace, RangesA
return this;
}
+ /**
+ * Do we want to include mutation logs for repair? Probably not, because
we already flush before repair, and want
+ * repair to correct issues in the LSM, ignoring the log entirely.
+ */
Review Comment:
I wrote this comment, could you remove it? It should be clear to operators
that running full repair for tracked keyspaces is strictly to handle cases
outside the scope of log reconciliation, or as a backup in the case it's
necessary to pause log reconciliation.
##########
src/java/org/apache/cassandra/replication/CoordinatorLog.java:
##########
@@ -374,7 +375,8 @@ void finishActivation(PendingLocalTransfer transfer,
TransferActivation activati
return;
// This is the only difference with finishWriting - can we
consolidate these methods?
- unreconciledMutations.activatedTransfer(activation.id(),
transfer.sstables);
+ if (bounds != null) // TODO: Is this the correct way to handle an
empty repair/sync? What bounds would we even use?
+ unreconciledMutations.activatedTransfer(activation.id(),
bounds);
Review Comment:
Should we use the bounds of the repair here (or of the covering Merkle Tree
mismatch), instead of the bounds of the SSTable? If two replicas participate in
a repair, we want them to include the same transfer ID for reads within the
mismatching range, even if one didn't receive any SSTables.
##########
test/distributed/org/apache/cassandra/distributed/impl/Instance.java:
##########
@@ -1036,11 +1036,13 @@ public Future<Void> shutdown(boolean runOnExitThreads,
boolean shutdownMessaging
AccordService.instance().shutdownAndWait(1l, MINUTES);
});
+ // The periodic tasks in MTS might hit the CommitLog, so make sure
those shut down first...
+ error = parallelRun(error, executor,
MutationTrackingService.instance::shutdownBlocking);
+ error = parallelRun(error, executor,
MutationJournal.instance::shutdownBlocking);
Review Comment:
Thanks for fixing this!
##########
src/java/org/apache/cassandra/repair/messages/SyncResponse.java:
##########
@@ -45,20 +50,29 @@ public class SyncResponse extends RepairMessage
public final List<SessionSummary> summaries;
- public SyncResponse(RepairJobDesc desc, SyncNodePair nodes, boolean
success, List<SessionSummary> summaries)
+ @Nullable
+ public final TimeUUID planId;
+ @Nullable
+ public final ShortMutationId transferId;
+
+ public SyncResponse(RepairJobDesc desc, SyncNodePair nodes, boolean
success, List<SessionSummary> summaries, TimeUUID planId, ShortMutationId
transferId)
{
super(desc);
this.nodes = nodes;
this.success = success;
this.summaries = summaries;
+ this.planId = planId;
+ this.transferId = transferId;
Review Comment:
Pondering whether we still need planId in SyncStat if it's here now...
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]