dcapwell commented on code in PR #3174:
URL: https://github.com/apache/cassandra/pull/3174#discussion_r1526569608


##########
src/java/org/apache/cassandra/repair/RepairJob.java:
##########
@@ -91,14 +99,24 @@ public class CassandraRepairJob extends AbstractRepairJob
      *  @param session RepairSession that this RepairJob belongs
      * @param columnFamily name of the ColumnFamily to repair
      */
-    public CassandraRepairJob(RepairSession session, String columnFamily)
+    public RepairJob(RepairSession session, String columnFamily)
     {
-        super(session, columnFamily);
         this.ctx = session.ctx;
         this.session = session;
         this.taskExecutor = session.taskExecutor;
         this.parallelismDegree = session.parallelismDegree;
         this.desc = new RepairJobDesc(session.state.parentRepairSession, 
session.getId(), session.state.keyspace, columnFamily, 
session.state.commonRange.ranges);
+        this.ks = Keyspace.open(desc.keyspace);
+        this.cfs = ks.getColumnFamilyStore(columnFamily);
+        this.state = new JobState(ctx.clock(), desc, 
session.state.commonRange.endpoints);
+
+        TableMetadata metadata = this.cfs.metadata();
+        if (session.paxosOnly && !metadata.supportsPaxosOperations())
+            throw new IllegalArgumentException(String.format("Cannot run paxos 
only repair on %s.%s, which isn't configured for paxos operations", 
cfs.keyspace.getName(), cfs.name));

Review Comment:
   would be nice to check this in coordinator validation, if we are repairing 
multiple tables and only one hits this condition we have a lot of wasted work



##########
src/java/org/apache/cassandra/service/accord/repair/AccordRepair.java:
##########
@@ -31,65 +32,93 @@
 import accord.primitives.Seekables;
 import org.apache.cassandra.dht.AccordSplitter;
 import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.service.accord.AccordService;
+import org.apache.cassandra.service.accord.AccordTopology;
 import org.apache.cassandra.service.accord.TokenRange;
-import 
org.apache.cassandra.service.consensus.migration.ConsensusMigrationRepairResult;
 import org.apache.cassandra.tcm.ClusterMetadata;
 import org.apache.cassandra.tcm.Epoch;
+import org.apache.cassandra.utils.concurrent.AsyncPromise;
+import org.apache.cassandra.utils.concurrent.Future;
 
 import static com.google.common.base.Preconditions.checkState;
-import static java.util.Collections.emptyList;
 import static 
org.apache.cassandra.config.CassandraRelevantProperties.ACCORD_REPAIR_RANGE_STEP_UPDATE_INTERVAL;
 
 /*
  * Accord repair consists of creating a barrier transaction for all the ranges 
which ensure that all Accord transactions
  * before the Epoch and point in time at which the repair started have their 
side effects visible to Paxos and regular quorum reads.
  */
-public class AccordRepairJob extends AbstractRepairJob
+public class AccordRepair
 {
-    private static final Logger logger = 
LoggerFactory.getLogger(AccordRepairJob.class);
+    private static final Logger logger = 
LoggerFactory.getLogger(AccordRepair.class);
 
     public static final BigInteger TWO = BigInteger.valueOf(2);
 
+    interface Listener
+    {
+        void onBarrierStart();
+        void onBarrierException(Throwable throwable);
+        void onBarrierComplete();
+
+        Listener NOOP = new Listener()
+        {
+            @Override public void onBarrierStart() {}
+
+            @Override public void onBarrierException(Throwable throwable) {}
+
+            @Override public void onBarrierComplete() {}
+        };
+    }
+
+    private final Listener listener;
+
     private final Ranges ranges;
 
     private final AccordSplitter splitter;
 
     private BigInteger rangeStep;
 
-    private Epoch minEpoch = ClusterMetadata.current().epoch;
+    private final Epoch minEpoch = ClusterMetadata.current().epoch;
 
     private volatile Throwable shouldAbort = null;
 
-    public AccordRepairJob(RepairSession repairSession, String cfname)
+    public AccordRepair(Listener listener, IPartitioner partitioner, String 
keyspace, Collection<Range<Token>> ranges)
     {
-        super(repairSession, cfname);
-        IPartitioner partitioner = 
desc.ranges.iterator().next().left.getPartitioner();
-        this.ranges = AccordTopology.toAccordRanges(desc.keyspace, 
desc.ranges);
-        this.splitter = partitioner.accordSplitter().apply(ranges);
+        this.listener = listener != null ? listener : Listener.NOOP;

Review Comment:
   `listener` is currently null, we do the following
   
   ```
   AccordRepair repair = new AccordRepair(null, partitioner, desc.keyspace, 
desc.ranges);
   ```
   
   This logic dropped metrics in favor of listeners, but doesn't have the 
listeners do anything yet...



##########
src/java/org/apache/cassandra/service/accord/AccordService.java:
##########
@@ -265,7 +265,7 @@ private AccordService(Id localId)
     {
         Invariants.checkState(localId != null, "static localId must be set 
before instantiating AccordService");
         logger.info("Starting accord with nodeId {}", localId);
-        AccordAgent agent = new AccordAgent();
+        AccordAgent agent = 
FBUtilities.construct(System.getProperty("cassandra.accord.agent", 
AccordAgent.class.getName()), "AccordAgent");

Review Comment:
   Also, if this is for testing and not for pluggability, maybe 
`cassandra.test` prefix?



##########
src/java/org/apache/cassandra/repair/RepairJob.java:
##########
@@ -164,19 +198,59 @@ public void onFailure(Throwable t)
             return;
         }
 
+        Future<Void> accordRepair;
+        if (doAccordRepair)
+        {
+            accordRepair = paxosRepair.flatMap(unused -> {
+                logger.info("{} {}.{} starting accord repair", 
session.previewKind.logPrefix(session.getId()), desc.keyspace, 
desc.columnFamily);
+                IPartitioner partitioner = 
desc.ranges.iterator().next().left.getPartitioner();

Review Comment:
   ```suggestion
                   IPartitioner partitioner = metadata.getPartitioner();
   ```
   
   rather than walking ranges, we know the table why not pull there?  



##########
src/java/org/apache/cassandra/service/accord/AccordService.java:
##########
@@ -265,7 +265,7 @@ private AccordService(Id localId)
     {
         Invariants.checkState(localId != null, "static localId must be set 
before instantiating AccordService");
         logger.info("Starting accord with nodeId {}", localId);
-        AccordAgent agent = new AccordAgent();
+        AccordAgent agent = 
FBUtilities.construct(System.getProperty("cassandra.accord.agent", 
AccordAgent.class.getName()), "AccordAgent");

Review Comment:
   this will no longer build as we don't allow random system properties in the 
code anymore; check style *should* block this.  Need to migrate to the related 
properties enum.



##########
src/java/org/apache/cassandra/repair/RepairJob.java:
##########
@@ -91,14 +99,24 @@ public class CassandraRepairJob extends AbstractRepairJob
      *  @param session RepairSession that this RepairJob belongs
      * @param columnFamily name of the ColumnFamily to repair
      */
-    public CassandraRepairJob(RepairSession session, String columnFamily)
+    public RepairJob(RepairSession session, String columnFamily)
     {
-        super(session, columnFamily);
         this.ctx = session.ctx;
         this.session = session;
         this.taskExecutor = session.taskExecutor;
         this.parallelismDegree = session.parallelismDegree;
         this.desc = new RepairJobDesc(session.state.parentRepairSession, 
session.getId(), session.state.keyspace, columnFamily, 
session.state.commonRange.ranges);
+        this.ks = Keyspace.open(desc.keyspace);
+        this.cfs = ks.getColumnFamilyStore(columnFamily);
+        this.state = new JobState(ctx.clock(), desc, 
session.state.commonRange.endpoints);
+
+        TableMetadata metadata = this.cfs.metadata();
+        if (session.paxosOnly && !metadata.supportsPaxosOperations())
+            throw new IllegalArgumentException(String.format("Cannot run paxos 
only repair on %s.%s, which isn't configured for paxos operations", 
cfs.keyspace.getName(), cfs.name));
+
+        if (session.accordOnly && !metadata.requiresAccordSupport())
+            throw new IllegalArgumentException(String.format("Cannot run 
accord only repair on %s.%s, which isn't configured for accord operations", 
cfs.keyspace.getName(), cfs.name));

Review Comment:
   same comment, we should check this at the start of repair validation



##########
src/java/org/apache/cassandra/service/accord/repair/AccordRepair.java:
##########
@@ -150,20 +179,17 @@ private void repairRange(TokenRange range) throws 
Throwable
             }
             catch (RuntimeException e)
             {
-                // TODO Placeholder for dependency limit overflow
-//                dependencyOverflow = true;
-                cfs.metric.rangeMigrationDependencyLimitFailures.mark();
+                listener.onBarrierException(e);
                 throw e;
             }
             catch (Throwable t)
             {
-                // unexpected error
-                cfs.metric.rangeMigrationUnexpectedFailures.mark();
+                listener.onBarrierException(t);
                 throw new RuntimeException(t);
             }
             finally
             {
-                cfs.metric.rangeMigration.addNano(start);

Review Comment:
   why drop metric?  Also I think it forgot to do `end - start`?



##########
src/java/org/apache/cassandra/service/accord/repair/AccordRepair.java:
##########
@@ -150,20 +179,17 @@ private void repairRange(TokenRange range) throws 
Throwable
             }
             catch (RuntimeException e)
             {
-                // TODO Placeholder for dependency limit overflow
-//                dependencyOverflow = true;
-                cfs.metric.rangeMigrationDependencyLimitFailures.mark();

Review Comment:
   why drop metric?



##########
src/java/org/apache/cassandra/service/accord/repair/AccordRepair.java:
##########
@@ -150,20 +179,17 @@ private void repairRange(TokenRange range) throws 
Throwable
             }
             catch (RuntimeException e)
             {
-                // TODO Placeholder for dependency limit overflow
-//                dependencyOverflow = true;
-                cfs.metric.rangeMigrationDependencyLimitFailures.mark();
+                listener.onBarrierException(e);
                 throw e;
             }
             catch (Throwable t)
             {
-                // unexpected error
-                cfs.metric.rangeMigrationUnexpectedFailures.mark();

Review Comment:
   why drop metric?



##########
src/java/org/apache/cassandra/repair/RepairJob.java:
##########
@@ -114,26 +132,41 @@ public long getNowInSeconds()
         }
     }
 
+    @Override
+    public void run()
+    {
+        state.phase.start();
+        cfs.metric.repairsStarted.inc();
+        runRepair();
+    }
+
     /**
      * Runs repair job.
      *
      * This sets up necessary task and runs them on given {@code taskExecutor}.
      * After submitting all tasks, waits until validation with replica 
completes.
      */
-    @Override
     protected void runRepair()
     {
         List<InetAddressAndPort> allEndpoints = new 
ArrayList<>(session.state.commonRange.endpoints);
         allEndpoints.add(ctx.broadcastAddressAndPort());
 
+        TableMetadata metadata = cfs.metadata();
+
         Future<List<TreeResponse>> treeResponses;
         Future<Void> paxosRepair;
         Epoch repairStartingEpoch = ClusterMetadata.current().epoch;
-        boolean doPaxosRepair = paxosRepairEnabled() && (((useV2() || 
isMetadataKeyspace()) && session.repairPaxos) || session.paxosOnly);
+
+        Preconditions.checkArgument(!session.paxosOnly || !session.accordOnly);

Review Comment:
   can we move this to the repair coordinator?  Also can we fail with a useful 
error message for operators?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to