dcapwell commented on code in PR #3656:
URL: https://github.com/apache/cassandra/pull/3656#discussion_r1829911612
##########
test/distributed/org/apache/cassandra/fuzz/topology/HarryTopologyMixupTest.java:
##########
@@ -66,65 +107,130 @@ protected void destroyState(State<Spec> state, @Nullable
Throwable cause)
}
}
- private static Spec createSchemaSpec(RandomSource rs, Cluster cluster)
+ private static BiFunction<RandomSource, Cluster, Spec>
createSchemaSpec(AccordMode mode)
{
- ReplayingHistoryBuilder harry = HarryHelper.dataGen(rs.nextLong(),
- new
InJvmSut(cluster),
- new
TokenPlacementModel.SimpleReplicationFactor(3),
-
SystemUnderTest.ConsistencyLevel.QUORUM);
- cluster.schemaChange(String.format("CREATE KEYSPACE %s WITH
replication = {'class': 'SimpleStrategy', 'replication_factor' : 3};",
HarryHelper.KEYSPACE));
- var schema = harry.schema();
- cluster.schemaChange(schema.compile().cql());
- waitForCMSToQuiesce(cluster, cluster.get(1));
- return new Spec(harry);
+ return (rs, cluster) -> {
+ long seed = rs.nextLong();
+ var schema = HarryHelper.schemaSpecBuilder("harry",
"tbl").surjection().inflate(seed);
+ if (mode.kind != AccordMode.Kind.None)
+ schema = schema.withTransactionMode(mode.passthroughMode);
+ ReplayingHistoryBuilder harry = HarryHelper.dataGen(seed,
+ mode.kind == AccordMode.Kind.Direct ? new
AccordSut(cluster) : new InJvmSut(cluster),
+ new TokenPlacementModel.SimpleReplicationFactor(3),
+ SystemUnderTest.ConsistencyLevel.QUORUM,
+ schema);
+ cluster.schemaChange(String.format("CREATE KEYSPACE %s WITH
replication = {'class': 'SimpleStrategy', 'replication_factor' : 3};",
HarryHelper.KEYSPACE));
+ cluster.schemaChange(schema.compile().cql());
+ waitForCMSToQuiesce(cluster, cluster.get(1));
+ return new Spec(harry, mode);
+ };
}
- private static CommandGen<Spec> cqlOperations(Spec spec)
+ private static class HarryCommand extends SimpleCommand<State<Spec>>
{
- class HarryCommand extends SimpleCommand<State<Spec>>
+ HarryCommand(Function<State<Spec>, String> name, Consumer<State<Spec>>
fn)
{
- HarryCommand(Function<State<Spec>, String> name,
Consumer<State<Spec>> fn)
- {
- super(name, fn);
- }
+ super(name, fn);
+ }
- @Override
- public PreCheckResult checkPreconditions(State<Spec> state)
- {
- int clusterSize = state.topologyHistory.up().length;
- return clusterSize >= 3 ? PreCheckResult.Ok :
PreCheckResult.Ignore;
- }
+ @Override
+ public PreCheckResult checkPreconditions(State<Spec> state)
+ {
+ int clusterSize = state.topologyHistory.up().length;
+ return clusterSize >= 3 ? PreCheckResult.Ok :
PreCheckResult.Ignore;
}
+ }
+
+ private static CommandGen<Spec> cqlOperations(Spec spec)
+ {
Command<State<Spec>, Void, ?> insert = new HarryCommand(state ->
"Harry Insert" + state.commandNamePostfix(), state -> {
spec.harry.insert();
((HarryState) state).numInserts++;
});
- Command<State<Spec>, Void, ?> validateAll = new HarryCommand(state ->
"Harry Validate All" + state.commandNamePostfix(), state -> {
- spec.harry.validateAll(spec.harry.quiescentChecker());
- ((HarryState) state).numInserts = 0;
- });
return (rs, state) -> {
HarryState harryState = (HarryState) state;
TopologyHistory history = state.topologyHistory;
// if any topology change happened, then always validate all
if (harryState.generation != history.generation())
{
harryState.generation = history.generation();
- return validateAll;
+ return validateAll(state);
}
if ((harryState.numInserts > 0 && rs.decide(0.2))) // 20% of the
time do reads
- return validateAll;
+ return validateAll(state);
return insert;
};
}
+ private static Command<State<Spec>, Void, ?> validateAll(State<Spec> state)
+ {
+ Spec spec = state.schemaSpec;
+ var schema = spec.harry.schema();
+ boolean writeThroughAccord = schema.isWriteTimeFromAccord();
+ List<Command<State<Spec>, Void, ?>> reads = new ArrayList<>();
+ Model model = spec.harry.quiescentChecker();
+ for (Long pd : new TreeSet<>(spec.harry.pds()))
+ {
+ reads.add(new HarryCommand(s -> "Harry Validate pd=" + pd +
state.commandNamePostfix(), s -> model.validate(Query.selectAllColumns(schema,
pd, false))));
+ // as of this writing Accord does not support ORDER BY
+ if (!writeThroughAccord)
+ reads.add(new HarryCommand(s -> "Harry Reverse Validate pd=" +
pd + state.commandNamePostfix(), s ->
model.validate(Query.selectAllColumns(schema, pd, true))));
+ }
+// if (reads.isEmpty())
+// throw new IllegalStateException("Attempted to read when no
partitions have been written to");
+ reads.add(new HarryCommand(s -> "Reset Harry Write State" +
state.commandNamePostfix(), s -> ((HarryState) s).numInserts = 0));
+ return Property.multistep(reads);
+ }
+
+ private static class AccordSut extends InJvmSut
+ {
+ private AccordSut(Cluster cluster)
+ {
+ super(cluster, roundRobin(cluster), retryOnTimeout(), 10, 3);
+ }
+
+ @Override
+ public Object[][] execute(String statement, ConsistencyLevel cl, int
coordinator, int pageSize, Object... bindings)
+ {
+ return super.execute(wrapInTxn(statement), cl, coordinator,
pageSize, bindings);
+ }
+
+ @Override
+ protected void onException(Throwable t)
+ {
+ t = Throwables.getRootCause(t);
+ if (!TIMEOUT_CHECKER.matches(t)) return;
+
+ TxnId id;
+ try
+ {
+ id = TxnId.parse(t.getMessage());
Review Comment:
this just adds extra debugging, this doesn't "swallow" anything as its a
listener effectively in harry... Harry manages retries and throwing, but each
exception we check if its a accord timeout and has the TxnId, then try to get
extra debugging
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]