belliottsmith commented on code in PR #2056:
URL: https://github.com/apache/cassandra/pull/2056#discussion_r1057170160
##########
test/simulator/main/org/apache/cassandra/simulator/ActionList.java:
##########
@@ -41,6 +41,13 @@ public class ActionList extends AbstractCollection<Action>
public static ActionList empty() { return EMPTY; }
public static ActionList of(Action action) { return new ActionList(new
Action[] { action }); }
public static ActionList of(Stream<Action> action) { return new
ActionList(action.toArray(Action[]::new)); }
+ public static ActionList of(Stream<Action> action, Stream<Action>...
actions)
+ {
+ Stream<Action> accm = action;
+ for (Stream<Action> a : actions)
Review Comment:
probably cleaner (and more efficient) to use
`Stream.of(actions).flatMap(a->a)`
##########
test/simulator/main/org/apache/cassandra/simulator/paxos/AbstractPairOfSequencesPaxosSimulation.java:
##########
@@ -227,7 +149,7 @@ public Action get()
{
if (simulated.time.nanoTime() >= untilNanos)
{
- available.add(next);
+
IntStream.of(partitions).mapToObj(Integer::valueOf).forEach(available::add);
Review Comment:
`.boxed()?`
##########
test/simulator/main/org/apache/cassandra/simulator/paxos/AbstractPairOfSequencesPaxosSimulation.java:
##########
@@ -124,101 +122,25 @@ public ActionPlan plan()
plan = plan.encapsulate(ActionPlan.setUpTearDown(
ActionList.of(
- cluster.stream().map(i -> simulated.run("Insert Partitions",
i, executeForPrimaryKeys(preInsertStmt(), primaryKeys)))
- ).andThen(
- // TODO (now): this is temporary until we have correct epoch
handling
- ActionList.of(
- cluster.stream().map(i -> simulated.run("Create Accord
Epoch", i, () -> AccordService.instance().createEpochFromConfigUnsafe()))
- )
-// ).andThen(
-// // TODO (now): this is temporary until we have
parameterisation of simulation
-// ActionList.of(
-// cluster.stream().map(i -> simulated.run("Disable Accord
Cache", i, () -> AccordService.instance.setCacheSize(0)))
-// )
+ cluster.stream().map(i -> simulated.run("Insert Partitions",
i, executeForPrimaryKeys(preInsertStmt(), primaryKeys))),
+ cluster.stream().map(i -> simulated.run("Create Accord Epoch",
i, () -> AccordService.instance().createEpochFromConfigUnsafe()))
),
ActionList.of(
cluster.stream().map(i ->
SimulatedActionTask.unsafeTask("Shutdown " + i.broadcastAddress(), RELIABLE,
RELIABLE_NO_TIMEOUTS, simulated, i, i::shutdown))
)
));
- final int nodes = cluster.size();
- for (int primaryKey : primaryKeys)
- historyCheckers.add(new HistoryChecker(primaryKey));
-
- List<Supplier<Action>> primaryKeyActions = new ArrayList<>();
- for (int pki = 0 ; pki < primaryKeys.length ; ++pki)
- {
- int primaryKey = primaryKeys[pki];
- HistoryChecker historyChecker = historyCheckers.get(pki);
- Supplier<Action> supplier = new Supplier<Action>()
- {
- int i = 0;
-
- @Override
- public Action get()
- {
- int node = simulated.random.uniform(1, nodes + 1);
- IInvokableInstance instance = cluster.get(node);
- switch (serialConsistency)
- {
- default: throw new AssertionError();
- case LOCAL_SERIAL:
- if (simulated.snitch.dcOf(node) > 0)
- {
- // perform some queries against these nodes
but don't expect them to be linearizable
- return nonVerifying(i++, instance, primaryKey,
historyChecker);
- }
- case SERIAL:
- return simulated.random.decide(readRatio)
- ? verifying(i++, instance, primaryKey,
historyChecker)
- : modifying(i++, instance, primaryKey,
historyChecker);
- }
- }
-
- @Override
- public String toString()
- {
- return Integer.toString(primaryKey);
- }
- };
-
- final ActionListener listener = debug.debug(PARTITION,
simulated.time, cluster, KEYSPACE, primaryKey);
- if (listener != null)
- {
- Supplier<Action> wrap = supplier;
- supplier = new Supplier<Action>()
- {
- @Override
- public Action get()
- {
- Action action = wrap.get();
- action.register(listener);
- return action;
- }
-
- @Override
- public String toString()
- {
- return wrap.toString();
- }
- };
- }
-
- primaryKeyActions.add(supplier);
- }
+ BiFunction<SimulatedSystems, int[], Supplier<Action>> factory =
actionFactory();
List<Integer> available = IntStream.range(0,
primaryKeys.length).boxed().collect(Collectors.toList());
Action stream = Actions.infiniteStream(concurrency, new
Supplier<Action>() {
@Override
public Action get()
{
- int i = simulated.random.uniform(0, available.size());
- int next = available.get(i);
- available.set(i, available.get(available.size() - 1));
- available.remove(available.size() - 1);
+ int[] partitions = consume(simulated.random, available);
Review Comment:
`primaryKeys`? or `selectedPrimaryKeys`? To keep nomenclature consistent
##########
test/simulator/main/org/apache/cassandra/simulator/paxos/HistoryValidator.java:
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.simulator.paxos;
+
+import javax.annotation.Nullable;
+
+public interface HistoryValidator
+{
+ Checker start(Observation observation);
Review Comment:
`start` is perhaps a confusing name when the operation has completed, and we
refer to start/end times of an observation. `begin` seems less confusing, but
perhaps `log` or `enter` or something else to distinguish it would be better.
I also don't think we should be saving these as registers inside the
checkers themselves - if we want this API (and I'm fine with it) we should
allocate a temporary object. Efficiency here isn't important, and it is much
clearer and less prone to misuse.
##########
test/simulator/main/org/apache/cassandra/simulator/paxos/PairOfSequencesAccordSimulation.java:
##########
@@ -47,90 +56,34 @@
import org.apache.cassandra.distributed.api.IInvokableInstance;
import org.apache.cassandra.distributed.api.IIsolatedExecutor;
import org.apache.cassandra.distributed.api.QueryResults;
-import org.apache.cassandra.exceptions.RequestTimeoutException;
+import org.apache.cassandra.distributed.api.SimpleQueryResult;
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.accord.AccordService;
import org.apache.cassandra.service.accord.AccordTestUtils;
import org.apache.cassandra.service.accord.txn.TxnData;
-import org.apache.cassandra.service.accord.txn.TxnDataName;
+import org.apache.cassandra.simulator.Action;
import org.apache.cassandra.simulator.Debug;
import org.apache.cassandra.simulator.RunnableActionScheduler;
import org.apache.cassandra.simulator.cluster.ClusterActions;
import org.apache.cassandra.simulator.systems.SimulatedSystems;
import org.apache.cassandra.simulator.utils.IntRange;
+import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
-import static org.apache.cassandra.distributed.api.ConsistencyLevel.ANY;
import static org.apache.cassandra.simulator.paxos.HistoryChecker.fail;
-import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
+import static org.apache.cassandra.utils.AssertionUtils.hasCause;
+import static org.apache.cassandra.utils.AssertionUtils.hasCauseAnyOf;
+import static org.assertj.core.api.Assertions.anyOf;
Review Comment:
unused
##########
test/simulator/main/org/apache/cassandra/simulator/paxos/StrictSerializabilityValidator.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.simulator.paxos;
+
+import javax.annotation.Nullable;
+
+import accord.verify.StrictSerializabilityVerifier;
+import com.carrotsearch.hppc.IntIntHashMap;
+import com.carrotsearch.hppc.IntIntMap;
+
+public class StrictSerializabilityValidator implements HistoryValidator,
HistoryValidator.Checker
+{
+ private final StrictSerializabilityVerifier verifier;
+ private final IntIntMap index;
+ private Observation observation;
+
+ public StrictSerializabilityValidator(int[] primaryKeys)
+ {
+ this.verifier = new StrictSerializabilityVerifier(primaryKeys.length);
+ index = new IntIntHashMap(primaryKeys.length);
+ for (int i = 0; i < primaryKeys.length; i++)
+ index.put(primaryKeys[i], i);
+ }
+
+ @Override
+ public Checker start(Observation observation)
+ {
+ this.observation = observation;
+ verifier.begin();
+ return this;
+ }
+
+ @Override
+ public void log(@Nullable Integer pk)
+ {
+ if (pk == null) verifier.print();
+ else verifier.print(get(pk));
+ }
+
+ private int get(int pk)
+ {
+ if (index.containsKey(pk))
+ return index.get(pk);
+ throw new IllegalArgumentException("Unknown pk=" + pk);
+ }
+
+ @Override
+ public void witnessRead(int pk, int count, int[] seq)
+ {
+ convertHistoryViolation(() -> verifier.witnessRead(get(pk), seq));
Review Comment:
I don't believe this call can throw a violation? Nor the write one.
##########
test/simulator/main/org/apache/cassandra/simulator/paxos/AbstractPairOfSequencesPaxosSimulation.java:
##########
@@ -124,101 +122,25 @@ public ActionPlan plan()
plan = plan.encapsulate(ActionPlan.setUpTearDown(
ActionList.of(
- cluster.stream().map(i -> simulated.run("Insert Partitions",
i, executeForPrimaryKeys(preInsertStmt(), primaryKeys)))
- ).andThen(
- // TODO (now): this is temporary until we have correct epoch
handling
- ActionList.of(
- cluster.stream().map(i -> simulated.run("Create Accord
Epoch", i, () -> AccordService.instance().createEpochFromConfigUnsafe()))
- )
-// ).andThen(
-// // TODO (now): this is temporary until we have
parameterisation of simulation
-// ActionList.of(
-// cluster.stream().map(i -> simulated.run("Disable Accord
Cache", i, () -> AccordService.instance.setCacheSize(0)))
-// )
+ cluster.stream().map(i -> simulated.run("Insert Partitions",
i, executeForPrimaryKeys(preInsertStmt(), primaryKeys))),
+ cluster.stream().map(i -> simulated.run("Create Accord Epoch",
i, () -> AccordService.instance().createEpochFromConfigUnsafe()))
Review Comment:
The TODO is still valid?
##########
test/simulator/main/org/apache/cassandra/simulator/paxos/Observation.java:
##########
@@ -18,21 +18,38 @@
package org.apache.cassandra.simulator.paxos;
+import org.apache.cassandra.distributed.api.SimpleQueryResult;
+
class Observation implements Comparable<Observation>
{
final int id;
- final Object[][] result;
+ private final SimpleQueryResult result;
Review Comment:
Either make them all private or none? It's package private, so doesn't seem
necessary to encapsulate, but don't mind which you pick.
##########
test/simulator/main/org/apache/cassandra/simulator/paxos/HistoryValidator.java:
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.simulator.paxos;
+
+import javax.annotation.Nullable;
+
+public interface HistoryValidator
+{
+ Checker start(Observation observation);
+
+ void log(@Nullable Integer pk);
+
+ interface Checker extends AutoCloseable
+ {
+ void witnessRead(int pk, int count, int[] seq);
+ void witnessWrite(int pk);
Review Comment:
I'm not 100% keen on this, as it disguises the fact that the `id` is what
we're logging here. I think perhaps(?) I would rather accept the `id`
redundantly and verify it is the same, or some other change I haven't thought
of yet.
##########
test/distributed/org/apache/cassandra/distributed/api/Row.java:
##########
@@ -170,26 +282,75 @@ public UUID getUUID(String name)
return (UUID) uuid;
}
+ public UUID getUUID(String name, UUID defaultValue)
+ {
+ Object uuid = get(name, defaultValue);
+ if (uuid instanceof TimeUUID)
+ return ((TimeUUID) uuid).asUUID();
+ return (UUID) uuid;
+ }
+
public Date getTimestamp(int index)
{
return get(index);
}
+ public Date getTimestamp(int index, Date defaultValue)
+ {
+ return get(index, defaultValue);
+ }
+
public Date getTimestamp(String name)
{
return get(name);
}
+ public Date getTimestamp(String name, Date defaultValue)
+ {
+ return get(name, defaultValue);
+ }
+
public <T> Set<T> getSet(int index)
{
return get(index);
}
+ public <T> Set<T> getSet(int index, Set<T> defaultValue)
+ {
+ return get(index, defaultValue);
+ }
+
public <T> Set<T> getSet(String name)
{
return get(name);
}
+ public <T> Set<T> getSet(String name, Set<T> defaultValue)
+ {
+ return get(name, defaultValue);
+ }
+
+ // HERE
Review Comment:
?
##########
test/simulator/main/org/apache/cassandra/simulator/paxos/PairOfSequencesAccordSimulation.java:
##########
@@ -281,21 +211,151 @@ protected String preInsertStmt()
}
@Override
- Operation verifying(int operationId, IInvokableInstance instance, int
primaryKey, HistoryChecker historyChecker)
+ boolean allowMultiplePartitions() { return true; }
+
+ @Override
+ BiFunction<SimulatedSystems, int[], Supplier<Action>> actionFactory()
{
- return new VerifyingOperation(operationId, instance,
serialConsistency, primaryKey, historyChecker);
+ AtomicInteger id = new AtomicInteger(0);
+
+ return (simulated, primaryKeyIndex) -> {
+ int[] partitions = IntStream.of(primaryKeyIndex).map(i ->
primaryKeys[i]).toArray();
+ return () -> accordAction(id.getAndIncrement(), simulated,
partitions);
+ };
}
- @Override
- Operation nonVerifying(int operationId, IInvokableInstance instance, int
primaryKey, HistoryChecker historyChecker)
+ private static IIsolatedExecutor.SerializableCallable<SimpleQueryResult>
query(int id, int[] partitions, int[] readOnly)
{
- return new NonVerifyingOperation(operationId, instance,
serialConsistency, primaryKey, historyChecker);
+ return () -> execute(createAccordTxn(id, partitions, readOnly), "pk",
"count", "seq");
}
- @Override
- Operation modifying(int operationId, IInvokableInstance instance, int
primaryKey, HistoryChecker historyChecker)
+ public class ReadWriteOperation extends Operation
{
- return new ModifyingOperation(operationId, instance, ANY,
serialConsistency, primaryKey, historyChecker);
+ private final IntSet allPartitions;
+ private final IntSet readOnlySet;
Review Comment:
we should also have `writeOnly`?
##########
test/simulator/main/org/apache/cassandra/simulator/paxos/HistoryChecker.java:
##########
@@ -127,16 +127,22 @@ Event setById(int id, Event event)
return byId[id] = event;
}
+ private static int eventId(int[] witnessSequence, int eventPosition)
+ {
+ return eventPosition == 0 ? -1 : witnessSequence[eventPosition - 1];
+ }
+
void witness(Observation witness, int[] witnessSequence, int start, int
end)
{
int eventPosition = witnessSequence.length;
- int eventId = eventPosition == 0 ? -1 : witnessSequence[eventPosition
- 1];
+ int eventId = eventId(witnessSequence, eventPosition);
setById(witness.id, new Event(witness.id)).log.add(new
VerboseWitness(witness.id, start, end, witnessSequence));
Event event = get(eventPosition, eventId);
recordWitness(event, witness, witnessSequence);
recordVisibleBy(event, end);
recordVisibleUntil(event, start);
+
Review Comment:
?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]