dcapwell commented on code in PR #3174:
URL: https://github.com/apache/cassandra/pull/3174#discussion_r1526569608
##########
src/java/org/apache/cassandra/repair/RepairJob.java:
##########
@@ -91,14 +99,24 @@ public class CassandraRepairJob extends AbstractRepairJob
* @param session RepairSession that this RepairJob belongs
* @param columnFamily name of the ColumnFamily to repair
*/
- public CassandraRepairJob(RepairSession session, String columnFamily)
+ public RepairJob(RepairSession session, String columnFamily)
{
- super(session, columnFamily);
this.ctx = session.ctx;
this.session = session;
this.taskExecutor = session.taskExecutor;
this.parallelismDegree = session.parallelismDegree;
this.desc = new RepairJobDesc(session.state.parentRepairSession,
session.getId(), session.state.keyspace, columnFamily,
session.state.commonRange.ranges);
+ this.ks = Keyspace.open(desc.keyspace);
+ this.cfs = ks.getColumnFamilyStore(columnFamily);
+ this.state = new JobState(ctx.clock(), desc,
session.state.commonRange.endpoints);
+
+ TableMetadata metadata = this.cfs.metadata();
+ if (session.paxosOnly && !metadata.supportsPaxosOperations())
+ throw new IllegalArgumentException(String.format("Cannot run paxos
only repair on %s.%s, which isn't configured for paxos operations",
cfs.keyspace.getName(), cfs.name));
Review Comment:
would be nice to check this in coordinator validation, if we are repairing
multiple tables and only one hits this condition we have a lot of wasted work
##########
src/java/org/apache/cassandra/service/accord/repair/AccordRepair.java:
##########
@@ -31,65 +32,93 @@
import accord.primitives.Seekables;
import org.apache.cassandra.dht.AccordSplitter;
import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
import org.apache.cassandra.service.accord.AccordService;
+import org.apache.cassandra.service.accord.AccordTopology;
import org.apache.cassandra.service.accord.TokenRange;
-import
org.apache.cassandra.service.consensus.migration.ConsensusMigrationRepairResult;
import org.apache.cassandra.tcm.ClusterMetadata;
import org.apache.cassandra.tcm.Epoch;
+import org.apache.cassandra.utils.concurrent.AsyncPromise;
+import org.apache.cassandra.utils.concurrent.Future;
import static com.google.common.base.Preconditions.checkState;
-import static java.util.Collections.emptyList;
import static
org.apache.cassandra.config.CassandraRelevantProperties.ACCORD_REPAIR_RANGE_STEP_UPDATE_INTERVAL;
/*
* Accord repair consists of creating a barrier transaction for all the ranges
which ensure that all Accord transactions
* before the Epoch and point in time at which the repair started have their
side effects visible to Paxos and regular quorum reads.
*/
-public class AccordRepairJob extends AbstractRepairJob
+public class AccordRepair
{
- private static final Logger logger =
LoggerFactory.getLogger(AccordRepairJob.class);
+ private static final Logger logger =
LoggerFactory.getLogger(AccordRepair.class);
public static final BigInteger TWO = BigInteger.valueOf(2);
+ interface Listener
+ {
+ void onBarrierStart();
+ void onBarrierException(Throwable throwable);
+ void onBarrierComplete();
+
+ Listener NOOP = new Listener()
+ {
+ @Override public void onBarrierStart() {}
+
+ @Override public void onBarrierException(Throwable throwable) {}
+
+ @Override public void onBarrierComplete() {}
+ };
+ }
+
+ private final Listener listener;
+
private final Ranges ranges;
private final AccordSplitter splitter;
private BigInteger rangeStep;
- private Epoch minEpoch = ClusterMetadata.current().epoch;
+ private final Epoch minEpoch = ClusterMetadata.current().epoch;
private volatile Throwable shouldAbort = null;
- public AccordRepairJob(RepairSession repairSession, String cfname)
+ public AccordRepair(Listener listener, IPartitioner partitioner, String
keyspace, Collection<Range<Token>> ranges)
{
- super(repairSession, cfname);
- IPartitioner partitioner =
desc.ranges.iterator().next().left.getPartitioner();
- this.ranges = AccordTopology.toAccordRanges(desc.keyspace,
desc.ranges);
- this.splitter = partitioner.accordSplitter().apply(ranges);
+ this.listener = listener != null ? listener : Listener.NOOP;
Review Comment:
`listener` is currently null, we do the following
```
AccordRepair repair = new AccordRepair(null, partitioner, desc.keyspace,
desc.ranges);
```
This logic dropped metrics in favor of listeners, but doesn't have the
listeners do anything yet...
##########
src/java/org/apache/cassandra/service/accord/AccordService.java:
##########
@@ -265,7 +265,7 @@ private AccordService(Id localId)
{
Invariants.checkState(localId != null, "static localId must be set
before instantiating AccordService");
logger.info("Starting accord with nodeId {}", localId);
- AccordAgent agent = new AccordAgent();
+ AccordAgent agent =
FBUtilities.construct(System.getProperty("cassandra.accord.agent",
AccordAgent.class.getName()), "AccordAgent");
Review Comment:
Also, if this is for testing and not for pluggability, maybe
`cassandra.test` prefix?
##########
src/java/org/apache/cassandra/repair/RepairJob.java:
##########
@@ -164,19 +198,59 @@ public void onFailure(Throwable t)
return;
}
+ Future<Void> accordRepair;
+ if (doAccordRepair)
+ {
+ accordRepair = paxosRepair.flatMap(unused -> {
+ logger.info("{} {}.{} starting accord repair",
session.previewKind.logPrefix(session.getId()), desc.keyspace,
desc.columnFamily);
+ IPartitioner partitioner =
desc.ranges.iterator().next().left.getPartitioner();
Review Comment:
```suggestion
IPartitioner partitioner = metadata.getPartitioner();
```
rather than walking ranges, we know the table why not pull there?
##########
src/java/org/apache/cassandra/service/accord/AccordService.java:
##########
@@ -265,7 +265,7 @@ private AccordService(Id localId)
{
Invariants.checkState(localId != null, "static localId must be set
before instantiating AccordService");
logger.info("Starting accord with nodeId {}", localId);
- AccordAgent agent = new AccordAgent();
+ AccordAgent agent =
FBUtilities.construct(System.getProperty("cassandra.accord.agent",
AccordAgent.class.getName()), "AccordAgent");
Review Comment:
this will no longer build as we don't allow random system properties in the
code anymore; check style *should* block this. Need to migrate to the related
properties enum.
##########
src/java/org/apache/cassandra/repair/RepairJob.java:
##########
@@ -91,14 +99,24 @@ public class CassandraRepairJob extends AbstractRepairJob
* @param session RepairSession that this RepairJob belongs
* @param columnFamily name of the ColumnFamily to repair
*/
- public CassandraRepairJob(RepairSession session, String columnFamily)
+ public RepairJob(RepairSession session, String columnFamily)
{
- super(session, columnFamily);
this.ctx = session.ctx;
this.session = session;
this.taskExecutor = session.taskExecutor;
this.parallelismDegree = session.parallelismDegree;
this.desc = new RepairJobDesc(session.state.parentRepairSession,
session.getId(), session.state.keyspace, columnFamily,
session.state.commonRange.ranges);
+ this.ks = Keyspace.open(desc.keyspace);
+ this.cfs = ks.getColumnFamilyStore(columnFamily);
+ this.state = new JobState(ctx.clock(), desc,
session.state.commonRange.endpoints);
+
+ TableMetadata metadata = this.cfs.metadata();
+ if (session.paxosOnly && !metadata.supportsPaxosOperations())
+ throw new IllegalArgumentException(String.format("Cannot run paxos
only repair on %s.%s, which isn't configured for paxos operations",
cfs.keyspace.getName(), cfs.name));
+
+ if (session.accordOnly && !metadata.requiresAccordSupport())
+ throw new IllegalArgumentException(String.format("Cannot run
accord only repair on %s.%s, which isn't configured for accord operations",
cfs.keyspace.getName(), cfs.name));
Review Comment:
same comment, we should check this at the start of repair validation
##########
src/java/org/apache/cassandra/service/accord/repair/AccordRepair.java:
##########
@@ -150,20 +179,17 @@ private void repairRange(TokenRange range) throws
Throwable
}
catch (RuntimeException e)
{
- // TODO Placeholder for dependency limit overflow
-// dependencyOverflow = true;
- cfs.metric.rangeMigrationDependencyLimitFailures.mark();
+ listener.onBarrierException(e);
throw e;
}
catch (Throwable t)
{
- // unexpected error
- cfs.metric.rangeMigrationUnexpectedFailures.mark();
+ listener.onBarrierException(t);
throw new RuntimeException(t);
}
finally
{
- cfs.metric.rangeMigration.addNano(start);
Review Comment:
why drop metric? Also I think it forgot to do `end - start`?
##########
src/java/org/apache/cassandra/service/accord/repair/AccordRepair.java:
##########
@@ -150,20 +179,17 @@ private void repairRange(TokenRange range) throws
Throwable
}
catch (RuntimeException e)
{
- // TODO Placeholder for dependency limit overflow
-// dependencyOverflow = true;
- cfs.metric.rangeMigrationDependencyLimitFailures.mark();
Review Comment:
why drop metric?
##########
src/java/org/apache/cassandra/service/accord/repair/AccordRepair.java:
##########
@@ -150,20 +179,17 @@ private void repairRange(TokenRange range) throws
Throwable
}
catch (RuntimeException e)
{
- // TODO Placeholder for dependency limit overflow
-// dependencyOverflow = true;
- cfs.metric.rangeMigrationDependencyLimitFailures.mark();
+ listener.onBarrierException(e);
throw e;
}
catch (Throwable t)
{
- // unexpected error
- cfs.metric.rangeMigrationUnexpectedFailures.mark();
Review Comment:
why drop metric?
##########
src/java/org/apache/cassandra/repair/RepairJob.java:
##########
@@ -114,26 +132,41 @@ public long getNowInSeconds()
}
}
+ @Override
+ public void run()
+ {
+ state.phase.start();
+ cfs.metric.repairsStarted.inc();
+ runRepair();
+ }
+
/**
* Runs repair job.
*
* This sets up necessary task and runs them on given {@code taskExecutor}.
* After submitting all tasks, waits until validation with replica
completes.
*/
- @Override
protected void runRepair()
{
List<InetAddressAndPort> allEndpoints = new
ArrayList<>(session.state.commonRange.endpoints);
allEndpoints.add(ctx.broadcastAddressAndPort());
+ TableMetadata metadata = cfs.metadata();
+
Future<List<TreeResponse>> treeResponses;
Future<Void> paxosRepair;
Epoch repairStartingEpoch = ClusterMetadata.current().epoch;
- boolean doPaxosRepair = paxosRepairEnabled() && (((useV2() ||
isMetadataKeyspace()) && session.repairPaxos) || session.paxosOnly);
+
+ Preconditions.checkArgument(!session.paxosOnly || !session.accordOnly);
Review Comment:
can we move this to the repair coordinator? Also can we fail with a useful
error message for operators?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]