ifesdjeen commented on code in PR #144:
URL: https://github.com/apache/cassandra-accord/pull/144#discussion_r1861678697
##########
accord-core/src/test/java/accord/burn/BurnTest.java:
##########
@@ -18,675 +18,13 @@
package accord.burn;
-import java.time.Duration;
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.Set;
-import java.util.TreeSet;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.BiConsumer;
-import java.util.function.Consumer;
-import java.util.function.Function;
-import java.util.function.IntSupplier;
-import java.util.function.LongSupplier;
-import java.util.function.Supplier;
-import java.util.stream.IntStream;
-import java.util.zip.CRC32;
-
import org.junit.jupiter.api.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import accord.api.Key;
-import accord.burn.random.FrequentLargeRange;
-import accord.impl.MessageListener;
-import accord.impl.PrefixedIntHashKey;
-import accord.impl.TopologyFactory;
-import accord.impl.basic.Cluster;
-import accord.impl.basic.Cluster.Stats;
-import accord.impl.basic.Packet;
-import accord.impl.basic.PendingQueue;
-import accord.impl.basic.PendingRunnable;
-import accord.impl.basic.MonitoredPendingQueue;
-import accord.impl.basic.RandomDelayQueue;
-import accord.impl.basic.RandomDelayQueue.Factory;
-import accord.impl.basic.SimulatedDelayedExecutorService;
-import accord.impl.list.ListAgent;
-import accord.impl.list.ListQuery;
-import accord.impl.list.ListRead;
-import accord.impl.list.ListRequest;
-import accord.impl.list.ListResult;
-import accord.impl.list.ListUpdate;
-import accord.local.CommandStore;
-import accord.local.Node;
-import accord.local.Node.Id;
-import accord.messages.MessageType;
-import accord.messages.Reply;
-import accord.primitives.Keys;
-import accord.primitives.Range;
-import accord.primitives.Ranges;
-import accord.primitives.Timestamp;
-import accord.primitives.Txn;
-import accord.topology.Shard;
-import accord.topology.Topology;
-import accord.utils.DefaultRandom;
-import accord.utils.Gen;
-import accord.utils.Gens;
-import accord.utils.RandomSource;
-import accord.utils.Utils;
-import accord.utils.async.AsyncExecutor;
-import accord.utils.async.TimeoutUtils;
-import accord.verify.CompositeVerifier;
-import accord.verify.ElleVerifier;
-import accord.verify.StrictSerializabilityVerifier;
-import accord.verify.Verifier;
-import org.agrona.collections.Int2ObjectHashMap;
-import org.agrona.collections.IntHashSet;
-
-import static accord.impl.PrefixedIntHashKey.forHash;
-import static accord.impl.PrefixedIntHashKey.range;
-import static accord.impl.PrefixedIntHashKey.ranges;
-import static accord.primitives.Txn.Kind.EphemeralRead;
-import static accord.utils.Invariants.illegalArgument;
-import static accord.utils.Utils.toArray;
-public class BurnTest
+public class BurnTest extends BurnTestBase
{
- private static final Logger logger =
LoggerFactory.getLogger(BurnTest.class);
-
- /**
- * Min hash value for the test domain, this value must be respected by the
hash function
- * @see {@link BurnTest#hash(int)}
- */
- public static final int HASH_RANGE_START = 0;
- /**
- * Max hash value for the test domain, this value must be respected by the
hash function
- * @see {@link BurnTest#hash(int)}
- */
- public static final int HASH_RANGE_END = 1 << 16;
- private static final Range[] EMPTY_RANGES = new Range[0];
-
- static List<Packet> generate(RandomSource random, MessageListener
listener, Function<? super CommandStore, AsyncExecutor> executor, List<Id>
clients, List<Id> nodes, int[] keys, int operations)
- {
- List<Packet> packets = new ArrayList<>();
- Int2ObjectHashMap<int[]> prefixKeyUpdates = new Int2ObjectHashMap<>();
- double readInCommandStore = random.nextDouble();
- Function<int[], Range> nextRange = randomRanges(random);
-
- for (int count = 0 ; count < operations ; ++count)
- {
- int finalCount = count;
- Id client = clients.get(random.nextInt(clients.size()));
- Id node = nodes.get(random.nextInt(nodes.size()));
-
- boolean isRangeQuery = random.nextBoolean();
- String description;
- Function<Node, Txn> txnGenerator;
- if (isRangeQuery)
- {
- description = "range";
- txnGenerator = n -> {
- int[] prefixes = prefixes(n.topology().current());
-
- int rangeCount = 1 + random.nextInt(2);
- List<Range> requestRanges = new ArrayList<>();
- while (--rangeCount >= 0)
- requestRanges.add(nextRange.apply(prefixes));
- Ranges ranges =
Ranges.of(requestRanges.toArray(EMPTY_RANGES));
- ListRead read = new
ListRead(random.decide(readInCommandStore) ? Function.identity() : executor,
false, ranges, ranges);
- ListQuery query = new ListQuery(client, finalCount, false);
- return new Txn.InMemory(ranges, read, query);
- };
- }
- else
- {
- description = "key";
- txnGenerator = n -> {
- int[] prefixes = prefixes(n.topology().current());
-
- boolean isWrite = random.nextBoolean();
- int readCount = 1 + random.nextInt(2);
- int writeCount = isWrite ? 1 + random.nextInt(2) : 0;
- Txn.Kind kind = isWrite ? Txn.Kind.Write : readCount == 1
? EphemeralRead : Txn.Kind.Read;
-
- TreeSet<Key> requestKeys = new TreeSet<>();
- IntHashSet readValues = new IntHashSet();
- while (readCount-- > 0)
- requestKeys.add(randomKey(random, prefixes, keys,
readValues));
-
- ListUpdate update = isWrite ? new ListUpdate(executor) :
null;
- IntHashSet writeValues = isWrite ? new IntHashSet() : null;
- while (writeCount-- > 0)
- {
- PrefixedIntHashKey.Key key = randomKey(random,
prefixes, keys, writeValues);
- int i = Arrays.binarySearch(keys, key.key);
- int[] keyUpdateCounter =
prefixKeyUpdates.computeIfAbsent(key.prefix, ignore -> new int[keys.length]);
- update.put(key, ++keyUpdateCounter[i]);
- }
-
- Keys readKeys = new Keys(requestKeys);
- if (isWrite)
- requestKeys.addAll(update.keySet());
- ListRead read = new
ListRead(random.decide(readInCommandStore) ? Function.identity() : executor,
kind == EphemeralRead, readKeys, new Keys(requestKeys));
- ListQuery query = new ListQuery(client, finalCount, kind
== EphemeralRead);
- return new Txn.InMemory(kind, new Keys(requestKeys), read,
query, update);
- };
- }
- packets.add(new Packet(client, node, Long.MAX_VALUE, count, new
ListRequest(description, txnGenerator, listener)));
- }
-
- return packets;
- }
-
- private static int[] prefixes(Topology topology)
- {
- IntHashSet uniq = new IntHashSet();
- for (Shard shard : topology.shards())
- uniq.add(((PrefixedIntHashKey) shard.range.start()).prefix);
- int[] prefixes = new int[uniq.size()];
- IntHashSet.IntIterator it = uniq.iterator();
- for (int i = 0; it.hasNext(); i++)
- prefixes[i] = it.nextValue();
- Arrays.sort(prefixes);
- return prefixes;
- }
-
- private static Function<int[], Range> randomRanges(RandomSource rs)
- {
- int selection = rs.nextInt(0, 2);
- switch (selection)
- {
- case 0: // uniform
- return (prefixes) -> randomRange(rs, prefixes, () ->
rs.nextInt(0, 1 << 13) + 1);
- case 1: // zipf
- int domain = HASH_RANGE_END - HASH_RANGE_START + 1;
- int splitSize = 100;
- int interval = domain / splitSize;
- int[] splits = new int[6];
- for (int i = 0; i < splits.length; i++)
- splits[i] = i == 0 ? interval : splits[i - 1] * 2;
- int[] splitsToPick = splits;
- int bias = rs.nextInt(0, 3); // small, large, random
- if (bias != 0)
- splitsToPick = Arrays.copyOf(splits, splits.length);
- if (bias == 1)
- Utils.reverse(splitsToPick);
- else if (bias == 2)
- Utils.shuffle(splitsToPick, rs);
- Gen.IntGen zipf = Gens.pickZipf(splitsToPick);
- return (prefixes) -> randomRange(rs, prefixes, () -> {
- int value = zipf.nextInt(rs);
- int idx = Arrays.binarySearch(splits, value);
- int min = idx == 0 ? 0 : splits[idx - 1];
- return rs.nextInt(min, value) + 1;
- });
- default:
- throw new AssertionError("Unexpected value: " + selection);
- }
- }
-
- private static Range randomRange(RandomSource random, int[] prefixes,
IntSupplier rangeSizeFn)
- {
- int prefix = random.pickInt(prefixes);
- int i = random.nextInt(HASH_RANGE_START, HASH_RANGE_END);
- int rangeSize = rangeSizeFn.getAsInt();
- int j = i + rangeSize;
- if (j > HASH_RANGE_END)
- {
- int delta = j - HASH_RANGE_END;
- j = HASH_RANGE_END;
- i -= delta;
- // saftey check, this shouldn't happen unless the configs were
changed in an unsafe way
- if (i < HASH_RANGE_START)
- i = HASH_RANGE_START;
- }
- return range(forHash(prefix, i), forHash(prefix, j));
- }
-
- private static PrefixedIntHashKey.Key randomKey(RandomSource random, int[]
prefixes, int[] keys, Set<Integer> seen)
- {
- int prefix = random.pickInt(prefixes);
- int key;
- do
- {
- key = random.pickInt(keys);
- }
- while (!seen.add(key));
- return PrefixedIntHashKey.key(prefix, key, hash(key));
- }
-
- /**
- * This class uses a limited range than the default for the following
reasons:
- *
- * 1) easier to debug smaller numbers
- * 2) adds hash collisions (multiple keys map to the same hash)
- */
- private static int hash(int key)
- {
- CRC32 crc32c = new CRC32();
- crc32c.update(key);
- crc32c.update(key >> 8);
- crc32c.update(key >> 16);
- crc32c.update(key >> 24);
- return (int) crc32c.getValue() & 0xffff;
- }
-
- @SuppressWarnings("unused")
- static void reconcile(long seed, TopologyFactory topologyFactory, List<Id>
clients, List<Id> nodes, int keyCount, int prefixCount, int operations, int
concurrency) throws ExecutionException, InterruptedException
- {
- RandomSource random1 = new DefaultRandom(), random2 = new
DefaultRandom();
-
- random1.setSeed(seed);
- random2.setSeed(seed);
- ExecutorService exec = Executors.newFixedThreadPool(2);
- RandomDelayQueue.ReconcilingQueueFactory factory = new
RandomDelayQueue.ReconcilingQueueFactory(seed);
- Future<?> f1 = exec.submit(() -> burn(random1, topologyFactory,
clients, nodes, keyCount, prefixCount, operations, concurrency,
factory.get(true)));
- Future<?> f2 = exec.submit(() -> burn(random2, topologyFactory,
clients, nodes, keyCount, prefixCount, operations, concurrency,
factory.get(false)));
- exec.shutdown();
- f1.get();
- f2.get();
- }
-
- static void burn(RandomSource random, TopologyFactory topologyFactory,
List<Id> clients, List<Id> nodes, int keyCount, int prefixCount, int
operations, int concurrency, PendingQueue pendingQueue)
- {
- List<Throwable> failures = Collections.synchronizedList(new
ArrayList<>());
- AtomicLong progress = new AtomicLong();
- MonitoredPendingQueue queue = new MonitoredPendingQueue(failures,
progress, 5L, TimeUnit.MINUTES, pendingQueue);
- long startNanos = System.nanoTime();
- long startLogicalMillis = queue.nowInMillis();
- Consumer<Runnable> retryBootstrap;
- {
- RandomSource retryRandom = random.fork();
- retryBootstrap = retry -> {
- long delay = retryRandom.nextInt(1, 15);
- queue.add(PendingRunnable.create(retry::run), delay,
TimeUnit.SECONDS);
- };
- }
- IntSupplier coordinationDelays, progressDelays, timeoutDelays;
- {
- RandomSource rnd = random.fork();
- coordinationDelays = delayGenerator(rnd, 1, 100, 100, 1000);
- progressDelays = delayGenerator(rnd, 1, 100, 100, 1000);
- timeoutDelays = delayGenerator(rnd, 500, 800, 1000, 10000);
- }
- Function<BiConsumer<Timestamp, Ranges>, ListAgent> agentSupplier =
onStale -> new ListAgent(random.fork(), 1000L, failures::add, retryBootstrap,
onStale, coordinationDelays, progressDelays, timeoutDelays,
pendingQueue::nowInMillis);
-
- Supplier<LongSupplier> nowSupplier = () -> {
- RandomSource forked = random.fork();
- // TODO (expected): meta-randomise scale of clock drift
- return FrequentLargeRange.builder(forked)
- .ratio(1, 5)
- .small(50, 5000,
TimeUnit.MICROSECONDS)
- .large(1, 10,
TimeUnit.MILLISECONDS)
- .build()
- .mapAsLong(j -> Math.max(0,
queue.nowInMillis() + TimeUnit.NANOSECONDS.toMillis(j)))
- .asLongSupplier(forked);
- };
-
- SimulatedDelayedExecutorService globalExecutor = new
SimulatedDelayedExecutorService(queue, new ListAgent(random.fork(), 1000L,
failures::add, retryBootstrap, (i1, i2) -> {
- throw new IllegalAccessError("Global executor should enver get a
stale event");
- }, coordinationDelays, progressDelays, timeoutDelays,
queue::nowInMillis));
- Verifier verifier = createVerifier(keyCount * prefixCount);
- Function<CommandStore, AsyncExecutor> executor = ignore ->
globalExecutor;
-
- MessageListener listener = MessageListener.get();
-
- int[] prefixes = IntStream.concat(IntStream.of(0),
IntStream.generate(() ->
random.nextInt(1024))).distinct().limit(prefixCount).toArray();
- int[] newPrefixes = Arrays.copyOfRange(prefixes, 1, prefixes.length);
- Arrays.sort(prefixes);
-
- int[] keys = IntStream.range(0, keyCount).toArray();
- Packet[] requests = toArray(generate(random, listener, executor,
clients, nodes, keys, operations), Packet[]::new);
- int[] starts = new int[requests.length];
- Packet[] replies = new Packet[requests.length];
-
- AtomicInteger acks = new AtomicInteger();
- AtomicInteger nacks = new AtomicInteger();
- AtomicInteger lost = new AtomicInteger();
- AtomicInteger truncated = new AtomicInteger();
- AtomicInteger recovered = new AtomicInteger();
- AtomicInteger failedToCheck = new AtomicInteger();
- AtomicInteger clock = new AtomicInteger();
- AtomicInteger requestIndex = new AtomicInteger();
- Queue<Packet> initialRequests = new ArrayDeque<>();
- for (int max = Math.min(concurrency, requests.length) ;
requestIndex.get() < max ; )
- {
- int i = requestIndex.getAndIncrement();
- starts[i] = clock.incrementAndGet();
- initialRequests.add(requests[i]);
- }
-
- // not used for atomicity, just for encapsulation
- AtomicReference<Runnable> onSubmitted = new AtomicReference<>();
- Consumer<Packet> responseSink = packet -> {
- if (replies[(int)packet.replyId] != null)
- return;
-
- if (requestIndex.get() < requests.length)
- {
- int i = requestIndex.getAndIncrement();
- starts[i] = clock.incrementAndGet();
- queue.addNoDelay(requests[i]);
- if (i == requests.length - 1)
- onSubmitted.get().run();
- }
- if (packet.message instanceof Reply.FailureReply)
- {
- failures.add(new AssertionError("Unexpected failure in list
reply", ((Reply.FailureReply) packet.message).failure));
- return;
- }
- ListResult reply = (ListResult) packet.message;
-
- try
- {
- if (!reply.isSuccess() && reply.status() ==
ListResult.Status.HeartBeat)
- return; // interrupted; will fetch our actual reply once
rest of simulation is finished (but wanted to send another request to keep
correct number in flight)
-
- int start = starts[(int)packet.replyId];
- int end = clock.incrementAndGet();
- logger.debug("{} at [{}, {}]", reply, start, end);
- replies[(int)packet.replyId] = packet;
-
- if (!reply.isSuccess())
- {
- switch (reply.status())
- {
- case Lost: lost.incrementAndGet();
break;
- case Invalidated: nacks.incrementAndGet();
break;
- case Failure: failedToCheck.incrementAndGet();
break;
- case Truncated: truncated.incrementAndGet();
break;
- // txn was applied?, but client saw a timeout, so
response isn't known
- case Other:
break;
- default: throw new
AssertionError("Unexpected fault: " + reply.status());
- }
- return;
- }
-
- progress.incrementAndGet();
- switch (reply.status())
- {
- default: throw new AssertionError("Unhandled status: " +
reply.status());
- case Applied: acks.incrementAndGet(); break;
- case RecoveryApplied: recovered.incrementAndGet(); //
NOTE: technically this might have been applied by the coordinator and it simply
timed out
- }
- // TODO (correctness): when a keyspace is removed, the
history/validator isn't cleaned up...
- // the current logic for add keyspace only knows what is
there, so a ABA problem exists where keyspaces
- // may come back... logically this is a problem as the history
doesn't get reset, but practically that
- // is fine as the backing map and the validator are consistent
- Verifier.Checker check = verifier.witness(start, end);
- for (int i = 0 ; i < reply.read.length ; ++i)
- {
- Key key = reply.responseKeys.get(i);
- int prefix = prefix(key);
- int keyValue = key(key);
- int k = Arrays.binarySearch(keys, keyValue);
-
- int[] read = reply.read[i];
- int write = reply.update == null ? -1 :
reply.update.getOrDefault(key, -1);
-
- int prefixIndex = Arrays.binarySearch(prefixes, prefix);
- int index = keyCount * prefixIndex + k;
- if (read != null)
- check.read(index, read);
- if (write >= 0)
- check.write(index, write);
- }
- check.close();
- }
- catch (Throwable t)
- {
- failures.add(t);
- }
- };
-
- Map<MessageType, Stats> messageStatsMap;
- try
- {
- messageStatsMap = Cluster.run(toArray(nodes, Id[]::new),
newPrefixes, listener, () -> queue,
- (id, onStale) ->
globalExecutor.withAgent(agentSupplier.apply(onStale)),
- queue::checkFailures,
- responseSink, random::fork,
nowSupplier,
- topologyFactory,
initialRequests::poll,
- onSubmitted::set,
- ignore -> {}
- );
- verifier.close();
- }
- catch (Throwable t)
- {
- logger.info("Keys: " + Arrays.toString(keys));
- logger.info("Prefixes: " + Arrays.toString(prefixes));
- for (int i = 0 ; i < requests.length ; ++i)
- {
- logger.info("{}", requests[i]);
- logger.info("\t\t" + replies[i]);
- }
- throw t;
- }
-
- int observedOperations = acks.get() + recovered.get() + nacks.get() +
lost.get() + truncated.get();
- logger.info("nodes: {}, rf: {}. Received {} acks, {} recovered, {}
nacks, {} lost, {} truncated ({} total) to {} operations", nodes.size(),
topologyFactory.rf, acks.get(), recovered.get(), nacks.get(), lost.get(),
truncated.get(), observedOperations, operations);
- logger.info("Message counts: {}", statsInDescOrder(messageStatsMap));
- logger.info("Took {} and in logical time of {}",
Duration.ofNanos(System.nanoTime() - startNanos),
Duration.ofMillis(queue.nowInMillis() - startLogicalMillis));
- if (clock.get() != operations * 2 || observedOperations != operations)
- {
- StringBuilder sb = new StringBuilder();
- for (int i = 0 ; i < requests.length ; ++i)
- {
- // since this only happens when operations are lost, only log
the ones without a reply to lower the amount of noise
- if (replies[i] == null)
- {
- sb.setLength(0);
- sb.append(requests[i]).append("\n\t\t").append(replies[i]);
- logger.info(sb.toString());
- }
- }
- if (clock.get() != operations * 2) throw new
AssertionError("Incomplete set of responses; clock=" + clock.get() + ",
expected operations=" + (operations * 2));
- else throw new AssertionError("Incomplete set of responses;
ack+recovered+other+nacks+lost+truncated=" + observedOperations + ", expected
operations=" + (operations * 2));
- }
- }
-
- private static IntSupplier delayGenerator(RandomSource rnd, int
absoluteMin, int absoluteMaxMin, int absoluteMinMax, int asoluteMax)
- {
- int minDelay = rnd.nextInt(absoluteMin, absoluteMaxMin);
- int maxDelay = rnd.nextInt(Math.max(absoluteMinMax, minDelay),
asoluteMax);
- if (rnd.nextBoolean())
- {
- int medianDelay = rnd.nextInt(minDelay, maxDelay);
- return () -> rnd.nextBiasedInt(minDelay, medianDelay, maxDelay);
- }
- return () -> rnd.nextInt(minDelay, maxDelay);
- }
-
- private static String statsInDescOrder(Map<MessageType, Stats> statsMap)
- {
- List<Stats> stats = new ArrayList<>(statsMap.values());
- stats.sort(Comparator.comparingInt(s -> -s.count()));
- return stats.toString();
- }
-
- private static Verifier createVerifier(int keyCount)
- {
- if (!ElleVerifier.Support.allowed())
- return new StrictSerializabilityVerifier("", keyCount);
- return CompositeVerifier.create(new StrictSerializabilityVerifier("",
keyCount),
- new ElleVerifier());
- }
-
- public static void main(String[] args)
Review Comment:
Moved back.
##########
accord-core/src/test/java/accord/impl/mock/MockConfigurationService.java:
##########
@@ -18,27 +18,27 @@
package accord.impl.mock;
+import accord.api.ConfigurationService;
import accord.api.MessageSink;
import accord.api.TestableConfigurationService;
import accord.local.Node;
import accord.primitives.Ranges;
import accord.topology.Topology;
import accord.utils.EpochFunction;
+import accord.utils.Invariants;
import accord.utils.async.AsyncChains;
import accord.utils.async.AsyncResult;
import accord.utils.async.AsyncResults;
-import org.junit.jupiter.api.Assertions;
-
import java.util.*;
public class MockConfigurationService implements TestableConfigurationService
{
private final MessageSink messageSink;
private final List<Topology> epochs = new ArrayList<>();
- private final Map<Long, EpochReady> acks = new HashMap<>();
+ private final Map<Long, ConfigurationService.EpochReady> acks = new
HashMap<>();
Review Comment:
Reverted all changes that are not related directly to the patch, including
this one.
##########
accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java:
##########
@@ -153,7 +155,8 @@ public void validateRead(Command current)
return;
// Journal will not have result persisted. This part is here for
test purposes and ensuring that we have strict object equality.
- Command reconstructed = journal.reconstruct(id, current.txnId());
+ // TODO: redundant/durable before
Review Comment:
Fixed this, you are right, should not have left it for TODO.
##########
accord-core/src/test/java/accord/impl/mock/MockConfigurationService.java:
##########
@@ -18,27 +18,27 @@
package accord.impl.mock;
+import accord.api.ConfigurationService;
import accord.api.MessageSink;
import accord.api.TestableConfigurationService;
import accord.local.Node;
import accord.primitives.Ranges;
import accord.topology.Topology;
import accord.utils.EpochFunction;
+import accord.utils.Invariants;
import accord.utils.async.AsyncChains;
import accord.utils.async.AsyncResult;
import accord.utils.async.AsyncResults;
-import org.junit.jupiter.api.Assertions;
-
import java.util.*;
public class MockConfigurationService implements TestableConfigurationService
{
private final MessageSink messageSink;
private final List<Topology> epochs = new ArrayList<>();
- private final Map<Long, EpochReady> acks = new HashMap<>();
+ private final Map<Long, ConfigurationService.EpochReady> acks = new
HashMap<>();
private final List<AsyncResult<Void>> syncs = new ArrayList<>();
- private final List<Listener> listeners = new ArrayList<>();
+ private final List<ConfigurationService.Listener> listeners = new
ArrayList<>();
Review Comment:
I will double check on commit / when sequashing, I think these are artifact
of moving files back and forth.
##########
accord-core/src/test/java/accord/impl/basic/Cluster.java:
##########
@@ -778,7 +780,7 @@ private static void
verifyConsistentRestore(Int2ObjectHashMap<NavigableMap<TxnId
if (!store.unsafeCommands().containsKey(txnId))
{
Command beforeCommand = before.get(txnId);
- if (beforeCommand.saveStatus() == SaveStatus.Erased)
Review Comment:
Agreed, this should not be the case; fixed now
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]