aratno commented on code in PR #4565:
URL: https://github.com/apache/cassandra/pull/4565#discussion_r2777592924


##########
src/java/org/apache/cassandra/streaming/StreamOperation.java:
##########
@@ -47,6 +47,11 @@ public enum StreamOperation
         this.requiresBarrierTransaction = requiresBarrierTransaction;
     }
 
+    public boolean isTrackable()

Review Comment:
   I was thinking about names for this method, since "isTrackable" implies 
support for tracking, but this is more of a requirement: if you're not tracking 
these operations, you'll break correctness. What about something like 
"requiresTrackedReconciliation"?
   
   Could be useful to have a comment here too, along the lines of:
   > For tracked keyspaces, streams that are received while serving reads must 
be tracked in the log, to ensure that read reconciliation can represent the new 
data. Repairs and imports are handled specially as CoordinatedTransfer for 
tracked keyspaces.



##########
src/java/org/apache/cassandra/utils/concurrent/Ref.java:
##########
@@ -100,8 +100,8 @@
 public final class Ref<T> implements RefCounted<T>
 {
     static final Logger logger = LoggerFactory.getLogger(Ref.class);
-    public static final boolean TRACE_ENABLED = 
TEST_DEBUG_REF_COUNT.getBoolean();
-    public static final boolean DEBUG_EVENTS_ENABLED = 
TEST_DEBUG_REF_EVENTS.getBoolean();
+    public static final boolean TRACE_ENABLED = 
TEST_DEBUG_REF_COUNT.getBoolean(true);
+    public static final boolean DEBUG_EVENTS_ENABLED = 
TEST_DEBUG_REF_EVENTS.getBoolean(true);

Review Comment:
   Revert before merge?



##########
src/java/org/apache/cassandra/replication/UnreconciledMutations.java:
##########
@@ -274,10 +274,11 @@ static UnreconciledMutations 
loadFromJournal(Node2OffsetsMap witnessedOffsets, i
                     result.addDirectly(mutation);
                     continue;
                 }
-                CoordinatedTransfer transfer = 
LocalTransfers.instance().getActivatedTransfer(id);
+                CoordinatedTransfer transfer = 
TransferTrackingService.instance().getActivatedTransfer(id);
                 if (transfer != null)
                 {
-                    result.transfers.add(transfer.id(), transfer.sstables);
+                    Preconditions.checkState(transfer instanceof 
TrackedImportTransfer);
+                    result.transfers.add(transfer.id(), 
((TrackedImportTransfer) transfer).sstables);

Review Comment:
   We need to support TrackedRepairTransfer inclusion here to make sure 
summaries include activated repairs after restart.



##########
src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java:
##########
@@ -336,9 +337,10 @@ public final void abort()
 
     protected Map<MetadataType, MetadataComponent> finalizeMetadata()
     {
-        // Migration from incremental repair to mutation tracking will be 
supported, but support for mixing
-        // incremental repair and mutation tracking is not planned
-        if (metadata().replicationType().isTracked() && repairedAt == 
ActiveRepairService.UNREPAIRED_SSTABLE)
+        // Reconciliation should not occur before activation for coordinated 
transfer streams for tracked keyspaces. 
+        boolean reconcile = txn.opType() != OperationType.STREAM;
+
+        if (metadata().replicationType().isTracked() && repairedAt == 
ActiveRepairService.UNREPAIRED_SSTABLE && reconcile)

Review Comment:
   Makes sense 👍 



##########
src/java/org/apache/cassandra/streaming/StreamPlan.java:
##########
@@ -147,6 +143,15 @@ public StreamPlan transferRanges(InetAddressAndPort to, 
String keyspace, RangesA
         return this;
     }
 
+    /**
+     * Do we want to include mutation logs for repair? Probably not, because 
we already flush before repair, and want
+     * repair to correct issues in the LSM, ignoring the log entirely.
+     */

Review Comment:
   I wrote this comment, could you remove it? It should be clear to operators 
that running full repair for tracked keyspaces is strictly to handle cases 
outside the scope of log reconciliation, or as a backup in the case it's 
necessary to pause log reconciliation.



##########
src/java/org/apache/cassandra/replication/CoordinatorLog.java:
##########
@@ -374,7 +375,8 @@ void finishActivation(PendingLocalTransfer transfer, 
TransferActivation activati
                 return;
 
             // This is the only difference with finishWriting - can we 
consolidate these methods?
-            unreconciledMutations.activatedTransfer(activation.id(), 
transfer.sstables);
+            if (bounds != null) // TODO: Is this the correct way to handle an 
empty repair/sync? What bounds would we even use?
+                unreconciledMutations.activatedTransfer(activation.id(), 
bounds);

Review Comment:
   Should we use the bounds of the repair here (or of the covering Merkle Tree 
mismatch), instead of the bounds of the SSTable? If two replicas participate in 
a repair, we want them to include the same transfer ID for reads within the 
mismatching range, even if one didn't receive any SSTables.



##########
test/distributed/org/apache/cassandra/distributed/impl/Instance.java:
##########
@@ -1036,11 +1036,13 @@ public Future<Void> shutdown(boolean runOnExitThreads, 
boolean shutdownMessaging
                 AccordService.instance().shutdownAndWait(1l, MINUTES);
             });
 
+            // The periodic tasks in MTS might hit the CommitLog, so make sure 
those shut down first...
+            error = parallelRun(error, executor, 
MutationTrackingService.instance::shutdownBlocking);
+            error = parallelRun(error, executor, 
MutationJournal.instance::shutdownBlocking);

Review Comment:
   Thanks for fixing this!



##########
src/java/org/apache/cassandra/repair/messages/SyncResponse.java:
##########
@@ -45,20 +50,29 @@ public class SyncResponse extends RepairMessage
 
     public final List<SessionSummary> summaries;
 
-    public SyncResponse(RepairJobDesc desc, SyncNodePair nodes, boolean 
success, List<SessionSummary> summaries)
+    @Nullable
+    public final TimeUUID planId;
+    @Nullable
+    public final ShortMutationId transferId;
+
+    public SyncResponse(RepairJobDesc desc, SyncNodePair nodes, boolean 
success, List<SessionSummary> summaries, TimeUUID planId, ShortMutationId 
transferId)
     {
         super(desc);
         this.nodes = nodes;
         this.success = success;
         this.summaries = summaries;
+        this.planId = planId;
+        this.transferId = transferId;

Review Comment:
   Pondering whether we still need planId in SyncStat if it's here now...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to