aweisberg commented on code in PR #2339:
URL: https://github.com/apache/cassandra/pull/2339#discussion_r1203057946
##########
test/unit/org/apache/cassandra/cql3/PreparedStatementsTest.java:
##########
@@ -772,8 +773,11 @@ private static List<String> columnNames(ResultSet rs)
return rs.getColumnDefinitions().asList().stream().map(d ->
d.getName()).collect(Collectors.toList());
}
- private static void updateTxnState()
+ private void updateTxnState()
{
+ // this class keeps dropping tables, so the commands_for_keys cache
points to stale data
+ AccordKeyspace.truncateTables();
Review Comment:
This should be removed at some point since Accord should be able to handle
that? Do we need a TODO to remember to remove this?
##########
src/java/org/apache/cassandra/service/accord/AccordKeyspace.java:
##########
@@ -595,13 +664,230 @@ public static UntypedResultSet
loadCommandRow(CommandStore commandStore, TxnId t
{
String cql = "SELECT * FROM %s.%s " +
"WHERE store_id = ? " +
+ "AND domain = ? " +
"AND txn_id=(?, ?, ?)";
return executeInternal(String.format(cql, ACCORD_KEYSPACE_NAME,
COMMANDS),
commandStore.id(),
+ txnId.domain().ordinal(),
txnId.msb, txnId.lsb, txnId.node.id);
}
+ public static void findAllCommandsByDomain(int commandStore,
Routable.Domain domain, Set<String> columns, Observable<UntypedResultSet.Row>
callback)
+ {
+ WalkCommandsForDomain work = new WalkCommandsForDomain(commandStore,
domain, columns, Stage.READ.executor(), callback);
+ work.schedule();
+ }
+
+ private static abstract class TableWalk implements Runnable, DebuggableTask
Review Comment:
I deleted most of my comments on this CQL stuff since it seems temporary
anyways.
##########
src/java/org/apache/cassandra/service/accord/AccordCommandStore.java:
##########
@@ -252,6 +341,47 @@ public void completeOperation(AccordSafeCommandStore store,
current = null;
}
+ <O> O mapReduceForRange(Routables<?, ?> keysOrRanges, Ranges slice,
BiFunction<CommandTimeseriesHolder, O, O> map, O accumulate, O terminalValue)
+ {
+ keysOrRanges = keysOrRanges.slice(slice, Routables.Slice.Minimal);
+ switch (keysOrRanges.domain())
+ {
+ case Key:
+ {
+ AbstractKeys<Key, ?> keys = (AbstractKeys<Key, ?>)
keysOrRanges;
+ for (CommandTimeseriesHolder summary :
commandsForRanges.search(keys))
+ {
+ accumulate = map.apply(summary, accumulate);
+ if (accumulate.equals(terminalValue))
+ return accumulate;
+ }
+ }
+ break;
+ case Range:
+ {
+ AbstractRanges<?> ranges = (AbstractRanges<?>) keysOrRanges;
+ for (Range range : ranges)
+ {
+ CommandTimeseriesHolder summary =
commandsForRanges.search(range);
+ if (summary == null)
+ continue;
+ accumulate = map.apply(summary, accumulate);
+ if (accumulate.equals(terminalValue))
+ return accumulate;
+ }
+ }
+ break;
+ default:
+ throw new AssertionError("Unknown domain: " +
keysOrRanges.domain());
+ }
+ return accumulate;
+ }
+
+ CommandsForRanges.Builder unbuild()
Review Comment:
Pedantic, but this doesn't `unbuild` the `CommandStore` it unbuilds a
`CommandsForRanges`.
##########
src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java:
##########
@@ -57,7 +57,7 @@
static class Context
{
final HashMap<TxnId, AccordSafeCommand> commands = new HashMap<>();
- final HashMap<RoutableKey, AccordSafeCommandsForKey> commandsForKeys =
new HashMap<>();
+ final TreeMap<RoutableKey, AccordSafeCommandsForKey> commandsForKeys =
new TreeMap<>();
Review Comment:
What does this need to be sorted for now?
##########
src/java/org/apache/cassandra/service/accord/CommandsForRanges.java:
##########
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import javax.annotation.Nullable;
+
+import com.google.common.collect.AbstractIterator;
+
+import accord.api.Key;
+import accord.api.RoutingKey;
+import accord.impl.CommandTimeseries;
+import accord.impl.CommandTimeseriesHolder;
+import accord.local.Command;
+import accord.local.SaveStatus;
+import accord.primitives.AbstractKeys;
+import accord.primitives.Range;
+import accord.primitives.Ranges;
+import accord.primitives.RoutableKey;
+import accord.primitives.Seekable;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
+import org.apache.cassandra.service.accord.api.AccordRoutingKey;
+import org.apache.cassandra.service.accord.api.AccordRoutingKey.TokenKey;
+import org.apache.cassandra.service.accord.api.PartitionKey;
+import org.apache.cassandra.utils.Interval;
+import org.apache.cassandra.utils.IntervalTree;
+
+public class CommandsForRanges
+{
+ private static final class RangeCommandSummary
+ {
+ public final TxnId txnId;
+ public final SaveStatus status;
+ public final @Nullable Timestamp executeAt;
+ public final List<TxnId> deps;
+
+ RangeCommandSummary(TxnId txnId, SaveStatus status, @Nullable
Timestamp executeAt, List<TxnId> deps)
+ {
+ this.txnId = txnId;
+ this.status = status;
+ this.executeAt = executeAt;
+ this.deps = deps;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ RangeCommandSummary that = (RangeCommandSummary) o;
+ return txnId.equals(that.txnId);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(txnId);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "RangeCommandSummary{" +
+ "txnId=" + txnId +
+ ", status=" + status +
+ '}';
+ }
+ }
+
+ private enum RangeCommandSummaryLoader implements
CommandTimeseries.CommandLoader<RangeCommandSummary>
+ {
+ INSTANCE;
+
+ @Override
+ public RangeCommandSummary saveForCFK(Command command)
+ {
+ //TODO split write from read?
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public TxnId txnId(RangeCommandSummary data)
+ {
+ return data.txnId;
+ }
+
+ @Override
+ public Timestamp executeAt(RangeCommandSummary data)
+ {
+ return data.executeAt;
+ }
+
+ @Override
+ public SaveStatus saveStatus(RangeCommandSummary data)
+ {
+ return data.status;
+ }
+
+ @Override
+ public List<TxnId> depsIds(RangeCommandSummary data)
+ {
+ return data.deps;
+ }
+ }
+
+ public class Builder
+ {
+ private final IntervalTree.Builder<RoutableKey, RangeCommandSummary,
Interval<RoutableKey, RangeCommandSummary>> builder;
+
+ private Builder(IntervalTree.Builder<RoutableKey, RangeCommandSummary,
Interval<RoutableKey, RangeCommandSummary>> builder)
+ {
+ this.builder = builder;
+ }
+
+ public Builder put(Ranges ranges, TxnId txnId, SaveStatus status,
Timestamp execteAt, List<TxnId> dependsOn)
+ {
+ remove(txnId);
+ return put(ranges, new RangeCommandSummary(txnId, status,
execteAt, dependsOn));
+ }
+
+ private Builder put(Ranges ranges, RangeCommandSummary summary)
+ {
+ for (Range range : ranges)
+ put(range, summary);
+ return this;
+ }
+
+ private Builder put(Range range, RangeCommandSummary summary)
+ {
+ builder.add(Interval.create(normalize(range.start(),
range.startInclusive(), true),
+ normalize(range.end(),
range.endInclusive(), false),
+ summary));
+ return this;
+ }
+
+ private Builder remove(TxnId txnId)
+ {
+ return removeIf(data -> data.txnId.equals(txnId));
+ }
+
+ private Builder removeIf(Predicate<RangeCommandSummary> predicate)
+ {
+ return removeIf((i1, i2, data) -> predicate.test(data));
+ }
+
+ private Builder
removeIf(IntervalTree.Builder.TriPredicate<RoutableKey, RoutableKey,
RangeCommandSummary> predicate)
+ {
+ builder.removeIf(predicate);
+ return this;
+ }
+
+ public void apply()
+ {
+ rangesToCommands = builder.build();
+ }
+ }
+
+ private IntervalTree<RoutableKey, RangeCommandSummary,
Interval<RoutableKey, RangeCommandSummary>> rangesToCommands =
IntervalTree.emptyTree();
+
+ public Iterable<CommandTimeseriesHolder> search(AbstractKeys<Key, ?> keys)
+ {
+ // group by the keyspace, as ranges are based off TokenKey, which is
scoped to a range
+ Map<String, List<Key>> groupByKeyspace = new TreeMap<>();
+ for (Key key : keys)
+ groupByKeyspace.computeIfAbsent(((PartitionKey) key).keyspace(),
ignore -> new ArrayList<>()).add(key);
+ return () -> new AbstractIterator<CommandTimeseriesHolder>()
+ {
+ Iterator<String> ksIt = groupByKeyspace.keySet().iterator();
+ Iterator<Map.Entry<Range, Set<RangeCommandSummary>>> rangeIt;
+
+ @Override
+ protected CommandTimeseriesHolder computeNext()
+ {
+ while (true)
+ {
+ if (rangeIt != null && rangeIt.hasNext())
+ {
+ Map.Entry<Range, Set<RangeCommandSummary>> next =
rangeIt.next();
+ return result(next.getKey(), next.getValue());
+ }
+ rangeIt = null;
+ if (!ksIt.hasNext())
+ {
+ ksIt = null;
+ return endOfData();
+ }
+ String ks = ksIt.next();
+ List<Key> keys = groupByKeyspace.get(ks);
+ Map<Range, Set<RangeCommandSummary>> groupByRange = new
TreeMap<>(Range::compare);
Review Comment:
Seems like this could just be a list since keys should already be sorted?
##########
src/java/org/apache/cassandra/service/accord/AccordKeyspace.java:
##########
@@ -231,6 +276,7 @@ private static ColumnMetadata getColumn(TableMetadata
metadata, String name)
"accord commands per key",
"CREATE TABLE %s ("
+ "store_id int, "
+ + "key_hash blob, " // can't use "token" as this is restricted
word in CQL
Review Comment:
Nit, since we generally call them `Token` it might be clearer to call it
`key_token`?
##########
src/java/org/apache/cassandra/dht/Token.java:
##########
@@ -41,6 +42,16 @@ public abstract class Token implements RingPosition<Token>,
Serializable
public abstract ByteBuffer toByteArray(Token token);
public abstract Token fromByteArray(ByteBuffer bytes);
+ public byte[] toOrderedByteArray(Token token, ByteComparable.Version
version)
+ {
+ return ByteSourceInverse.readBytes(asComparableBytes(token,
version));
+ }
+
+ public Token fromOrderedByteArray(byte[] bytes, ByteComparable.Version
version)
Review Comment:
Happens to be unused, but fine.
##########
src/java/org/apache/cassandra/service/accord/AccordCommandStore.java:
##########
@@ -105,6 +191,9 @@ public boolean inStore()
@Override
protected void registerHistoricalTransactions(Deps deps)
{
+ // TODO (impl) : Its not clear why this exists
+ // accord.coordinate.CoordinateSyncPoint.coordinate(accord.local.Node,
accord.primitives.TxnId, accord.primitives.Seekables<?,?>)
Review Comment:
As mentioned later it seems like for bootstrap we want to load in some
historical information about what transactions existed? @belliottsmith can you
explain?
I don't understand the cases where it is skipping keys either.
##########
src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java:
##########
@@ -174,19 +187,34 @@ private <O> O mapReduceForKey(Routables<?, ?>
keysOrRanges, Ranges slice, BiFunc
if (accumulate.equals(terminalValue))
return accumulate;
}
- break;
+ }
+ break;
case Range:
- // TODO (required): implement
- throw new UnsupportedOperationException();
+ {
+ // TODO (correctness): if a range is used, then the range must
exist in the PreLoadContext, else the commandsForKeys won't be in-sync... can
this be detected?
Review Comment:
It can, it's a safe command store so it still has the preload context?
##########
src/java/org/apache/cassandra/service/accord/AccordVerbHandler.java:
##########
@@ -43,6 +43,19 @@ public AccordVerbHandler(Node node)
public void doVerb(Message<T> message) throws IOException
{
logger.debug("Receiving {} from {}", message.payload, message.from());
- message.payload.process(node, EndpointMapping.getId(message.from()),
message);
+ T request = message.payload;
+ Node.Id from = EndpointMapping.getId(message.from());
Review Comment:
Move this into the innermost `if` since it is unused outside.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]