iamaleksey commented on code in PR #3662:
URL: https://github.com/apache/cassandra/pull/3662#discussion_r1836924406


##########
src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java:
##########
@@ -0,0 +1,671 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.virtual;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import accord.api.RoutingKey;
+import accord.impl.DurabilityScheduling;
+import accord.impl.progresslog.DefaultProgressLog;
+import accord.impl.progresslog.TxnStateKind;
+import accord.local.CommandStores;
+import accord.local.DurableBefore;
+import accord.local.MaxConflicts;
+import accord.local.Node;
+import accord.local.RedundantBefore;
+import accord.local.RejectBefore;
+import accord.primitives.Routable;
+import accord.primitives.Status;
+import accord.primitives.Timestamp;
+import accord.primitives.Txn;
+import accord.primitives.TxnId;
+import accord.utils.Invariants;
+import accord.utils.async.AsyncChain;
+import org.apache.cassandra.cql3.statements.schema.CreateTableStatement;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.marshal.ByteBufferAccessor;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.db.marshal.LongType;
+import org.apache.cassandra.db.marshal.TupleType;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.db.marshal.UUIDType;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.accord.AccordCommandStore;
+import org.apache.cassandra.service.accord.AccordKeyspace;
+import org.apache.cassandra.service.accord.AccordService;
+import org.apache.cassandra.service.accord.AccordStateCache;
+import org.apache.cassandra.service.accord.CommandStoreTxnBlockedGraph;
+import org.apache.cassandra.service.accord.api.AccordRoutingKey;
+import org.apache.cassandra.service.accord.api.AccordRoutingKey.TokenKey;
+import 
org.apache.cassandra.service.consensus.migration.ConsensusMigrationState;
+import org.apache.cassandra.service.consensus.migration.TableMigrationState;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.Pair;
+
+import static java.lang.String.format;
+import static java.util.Comparator.comparing;
+import static com.google.common.collect.ImmutableList.toImmutableList;
+import static accord.utils.async.AsyncChains.getBlockingAndRethrow;
+import static org.apache.cassandra.schema.SchemaConstants.VIRTUAL_ACCORD_DEBUG;
+import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
+import static org.apache.cassandra.utils.MonotonicClock.Global.approxTime;
+
+public class AccordDebugKeyspace extends VirtualKeyspace
+{
+    // {epoch, hlc, flags, node}
+    private static final TupleType TIMESTAMP_TYPE =
+        new TupleType(List.of(LongType.instance, LongType.instance, 
Int32Type.instance, Int32Type.instance));
+    private static final String TIMESTAMP_TYPE_STRING = 
TIMESTAMP_TYPE.asCQL3Type().toString();
+
+    // {epoch, hlc, kind, domain, node}
+    private static final TupleType TXN_ID_TYPE =
+        new TupleType(List.of(LongType.instance, LongType.instance, 
UTF8Type.instance, UTF8Type.instance, Int32Type.instance));
+    private static final String TXN_ID_TYPE_STRING = 
TXN_ID_TYPE.asCQL3Type().toString();
+
+    // {table_id, token} or {table_id, +Inf/-Inf}
+    private static final TupleType ROUTING_KEY_TYPE = new 
TupleType(List.of(UUIDType.instance, UTF8Type.instance));
+    private static final String ROUTING_KEY_TYPE_STRING = 
ROUTING_KEY_TYPE.asCQL3Type().toString();
+
+    public static final AccordDebugKeyspace instance = new 
AccordDebugKeyspace();
+
+    private AccordDebugKeyspace()
+    {
+        super(VIRTUAL_ACCORD_DEBUG, List.of(
+            new CommandStoreCacheTable(VIRTUAL_ACCORD_DEBUG),
+            new DurabilitySchedulingTable(VIRTUAL_ACCORD_DEBUG),
+            new DurableBeforeTable(VIRTUAL_ACCORD_DEBUG),
+            new MaxConflictsTable(VIRTUAL_ACCORD_DEBUG),
+            new MigrationStateTable(VIRTUAL_ACCORD_DEBUG),
+            new ProgressLogTable(VIRTUAL_ACCORD_DEBUG),
+            new RedundantBeforeTable(VIRTUAL_ACCORD_DEBUG),
+            new RejectBeforeTable(VIRTUAL_ACCORD_DEBUG),
+            new TxnBlockedByTable(VIRTUAL_ACCORD_DEBUG)
+        ));
+    }
+
+    public static final class CommandStoreCacheTable extends 
AbstractVirtualTable
+    {
+        private CommandStoreCacheTable(String keyspace)
+        {
+            super(parse(keyspace,
+                        "Accord Command Store Cache Metrics",
+                        "CREATE TABLE command_store_cache (\n" +
+                        "  id int,\n" +
+                        "  scope text,\n" +
+                        "  queries bigint,\n" +
+                        "  hits bigint,\n" +
+                        "  misses bigint,\n" +
+                        "  PRIMARY KEY (id, scope)" +
+                        ')'));
+        }
+
+        @Override
+        public DataSet data()
+        {
+            CommandStores stores = ((AccordService) 
AccordService.instance()).node().commandStores();
+
+            AsyncChain<List<Map<String, AccordStateCache.ImmutableStats>>> 
statsByStoreChain = stores.map(store -> {
+                Map<String, AccordStateCache.ImmutableStats> snapshots = new 
HashMap<>(3);
+                AccordCommandStore accordStore = (AccordCommandStore) 
store.commandStore();
+                snapshots.put(AccordKeyspace.COMMANDS, 
accordStore.commandCache().statsSnapshot());
+                snapshots.put(AccordKeyspace.COMMANDS_FOR_KEY, 
accordStore.commandsForKeyCache().statsSnapshot());
+                snapshots.put(AccordKeyspace.TIMESTAMPS_FOR_KEY, 
accordStore.timestampsForKeyCache().statsSnapshot());
+                return snapshots;
+            });
+
+            List<Map<String, AccordStateCache.ImmutableStats>> statsByStore = 
getBlockingAndRethrow(statsByStoreChain);
+            SimpleDataSet result = new SimpleDataSet(metadata());
+
+            for (int storeID : stores.ids())
+            {
+                Map<String, AccordStateCache.ImmutableStats> storeStats = 
statsByStore.get(storeID);
+                addRow(storeStats.get(AccordKeyspace.COMMANDS), result, 
storeID, AccordKeyspace.COMMANDS);
+                addRow(storeStats.get(AccordKeyspace.COMMANDS_FOR_KEY), 
result, storeID, AccordKeyspace.COMMANDS_FOR_KEY);
+                addRow(storeStats.get(AccordKeyspace.TIMESTAMPS_FOR_KEY), 
result, storeID, AccordKeyspace.TIMESTAMPS_FOR_KEY);
+            }
+
+            return result;
+        }
+
+        private static void addRow(AccordStateCache.ImmutableStats stats, 
SimpleDataSet result, int storeID, String scope)
+        {
+            result.row(storeID, scope);
+            result.column("queries", stats.queries);
+            result.column("hits", stats.hits);
+            result.column("misses", stats.misses);
+        }
+    }
+
+    // TODO (consider): use a different type for the three timestamps in micros
+    public static final class DurabilitySchedulingTable extends 
AbstractVirtualTable
+    {
+        private DurabilitySchedulingTable(String keyspace)
+        {
+            super(parse(keyspace,
+                        "Accord per-Range Durability Scheduling State",
+                        "CREATE TABLE durability_scheduling (\n" +
+                           format("range_start %s,\n", 
ROUTING_KEY_TYPE_STRING) +
+                           format("range_end %s,\n", ROUTING_KEY_TYPE_STRING) +
+                        "  node_offset int,\n" +
+                        "  \"index\" int,\n" +
+                        "  number_of_splits int,\n" +
+                        "  range_started_at bigint,\n" +
+                        "  cycle_started_at bigint,\n" +
+                        "  retry_delay_micros bigint,\n" +
+                        "  is_defunct boolean,\n" +
+                        "  PRIMARY KEY ((range_start, range_end))" +
+                        ')'));
+        }
+
+        @Override
+        public DataSet data()
+        {
+            DurabilityScheduling.ImmutableView view = ((AccordService) 
AccordService.instance()).durabilityScheduling();
+
+            SimpleDataSet ds = new SimpleDataSet(metadata());
+            while (view.advance())
+            {
+                ds.row(decompose(view.range().start()), 
decompose(view.range().end()))
+                  .column("node_offset", view.nodeOffset())
+                  .column("index", view.index())
+                  .column("number_of_splits", view.numberOfSplits())
+                  .column("range_started_at", view.rangeStartedAtMicros())
+                  .column("cycle_started_at", view.cycleStartedAtMicros())
+                  .column("retry_delay_micros", view.retryDelayMicros())
+                  .column("is_defunct", view.isDefunct());
+            }
+            return ds;
+        }
+    }
+
+    public static final class DurableBeforeTable extends AbstractVirtualTable
+    {
+        private DurableBeforeTable(String keyspace)
+        {
+            super(parse(keyspace,
+                        "Accord Node's DurableBefore State",
+                        "CREATE TABLE durable_before (\n" +
+                           format("range_start %s,\n", 
ROUTING_KEY_TYPE_STRING) +
+                           format("range_end %s,\n", ROUTING_KEY_TYPE_STRING) +
+                        "  majority_before text,\n" +
+                        "  universal_before text,\n" +
+                        "  PRIMARY KEY ((range_start, range_end))" +
+                        ')'));
+        }
+
+        @Override
+        public DataSet data()
+        {
+            DurableBefore durableBefore = ((AccordService) 
AccordService.instance()).node().durableBefore();
+            return durableBefore.foldlWithBounds(
+                (entry, ds, start, end) -> {
+                    ds.row(decompose(start), decompose(end))
+                      .column("majority_before", 
entry.majorityBefore.toString())
+                      .column("universal_before", 
entry.universalBefore.toString());
+                    return ds;
+                },
+                new SimpleDataSet(metadata()),
+                ignore -> false
+            );
+        }
+    }
+
+    public static final class MaxConflictsTable extends AbstractVirtualTable
+    {
+        private MaxConflictsTable(String keyspace)
+        {
+            super(parse(keyspace,
+                        "Accord per-CommandStore MaxConflicts State",
+                        "CREATE TABLE max_conflicts (\n" +
+                        "  command_store_id int,\n" +
+                           format("range_start %s,\n", 
ROUTING_KEY_TYPE_STRING) +
+                           format("range_end %s,\n", ROUTING_KEY_TYPE_STRING) +
+                           format("timestamp %s,\n", TIMESTAMP_TYPE_STRING) +
+                        "  PRIMARY KEY (command_store_id, range_start, 
range_end)" +
+                        ')'));
+        }
+
+        @Override
+        public DataSet data()
+        {
+            CommandStores stores = ((AccordService) 
AccordService.instance()).node().commandStores();
+            List<Pair<Integer, MaxConflicts>> rangeMaps =
+                getBlockingAndRethrow(stores.map(store -> 
Pair.create(store.commandStore().id(), 
store.commandStore().unsafeGetMaxConflicts())));
+            rangeMaps.sort(comparing(p -> p.left));
+
+            SimpleDataSet dataSet = new SimpleDataSet(metadata());
+            for (Pair<Integer, MaxConflicts> pair : rangeMaps)
+            {
+                int storeId = pair.left;
+                MaxConflicts maxConflicts = pair.right;
+
+                maxConflicts.foldlWithBounds(
+                    (timestamp, ds, start, end) -> ds.row(storeId, 
decompose(start), decompose(end)).column("timestamp", decompose(timestamp)),
+                    dataSet,
+                    ignore -> false
+                );
+            }
+            return dataSet;
+        }
+    }
+
+    public static final class MigrationStateTable extends AbstractVirtualTable
+    {
+        private static final Logger logger = 
LoggerFactory.getLogger(MigrationStateTable.class);
+        
+        private MigrationStateTable(String keyspace)
+        {
+            super(parse(keyspace,
+                        "Accord Consensus Migration State",
+                        "CREATE TABLE consensus_migration_state (\n" +
+                        "  keyspace_name text,\n" +
+                        "  table_name text,\n" +
+                        "  table_id uuid,\n" +
+                        "  target_protocol text,\n" +
+                        "  transactional_mode text,\n" +
+                        "  transactional_migration_from text,\n" +
+                        "  migrated_ranges frozen<list<text>>,\n" +
+                        "  migrating_ranges_by_epoch frozen<map<bigint, 
list<text>>>,\n" +
+                        "  PRIMARY KEY (keyspace_name, table_name)" +
+                        ')'));
+        }
+
+        @Override
+        public DataSet data()
+        {
+            ConsensusMigrationState snapshot = 
ClusterMetadata.current().consensusMigrationState;
+            Collection<TableMigrationState> tableStates = 
snapshot.tableStates();
+            return data(tableStates);
+        }
+
+        @Override
+        public DataSet data(DecoratedKey key)
+        {
+            String keyspaceName = UTF8Type.instance.compose(key.getKey());
+            Keyspace keyspace = 
Schema.instance.getKeyspaceInstance(keyspaceName);
+
+            if (keyspace == null)
+                throw new InvalidRequestException("Unknown keyspace: '" + 
keyspaceName + '\'');
+
+            List<TableId> tableIDs = keyspace.getColumnFamilyStores()
+                                             .stream()
+                                             
.map(ColumnFamilyStore::getTableId)
+                                             .collect(Collectors.toList());
+
+            ConsensusMigrationState snapshot = 
ClusterMetadata.current().consensusMigrationState;
+            Collection<TableMigrationState> tableStates = 
snapshot.tableStatesFor(tableIDs);
+
+            return data(tableStates);
+        }
+
+        private SimpleDataSet data(Collection<TableMigrationState> tableStates)
+        {
+            SimpleDataSet result = new SimpleDataSet(metadata());
+
+            for (TableMigrationState state : tableStates)
+            {
+                TableMetadata table = 
Schema.instance.getTableMetadata(state.tableId);
+
+                if (table == null)
+                {
+                    logger.warn("Table {}.{} (id: {}) no longer exists. It may 
have been dropped.",
+                                state.keyspaceName, state.tableName, 
state.tableId);
+                    continue;
+                }
+
+                result.row(state.keyspaceName, state.tableName);
+                result.column("table_id", state.tableId.asUUID());
+                result.column("target_protocol", 
state.targetProtocol.toString());
+                result.column("transactional_mode", 
table.params.transactionalMode.toString());
+                result.column("transactional_migration_from", 
table.params.transactionalMode.toString());
+
+                List<String> primitiveMigratedRanges = 
state.migratedRanges.stream().map(Objects::toString).collect(toImmutableList());
+                result.column("migrated_ranges", primitiveMigratedRanges);
+        
+                Map<Long, List<String>> primitiveRangesByEpoch = new 
LinkedHashMap<>();
+                for (Map.Entry<org.apache.cassandra.tcm.Epoch, 
List<Range<Token>>> entry : state.migratingRangesByEpoch.entrySet())
+                    primitiveRangesByEpoch.put(entry.getKey().getEpoch(), 
entry.getValue().stream().map(Objects::toString).collect(toImmutableList()));
+
+                result.column("migrating_ranges_by_epoch", 
primitiveRangesByEpoch);
+            }
+
+            return result;
+        }
+    }
+
+    // TODO (desired): human readable packed key tracker (but requires loading 
Txn, so might be preferable to only do conditionally)
+    public static final class ProgressLogTable extends AbstractVirtualTable
+    {
+        private ProgressLogTable(String keyspace)
+        {
+            super(parse(keyspace,
+                        "Accord per-CommandStore ProgressLog State",
+                        "CREATE TABLE progress_log (\n" +
+                        "  command_store_id int,\n" +
+                           format("txn_id %s,\n", TXN_ID_TYPE_STRING) +
+                        // Timer + BaseTxnState
+                        "  contact_everyone boolean,\n" +
+                        // WaitingState
+                        "  waiting_is_uninitialised boolean,\n" +
+                        "  waiting_blocked_until text,\n" +
+                        "  waiting_home_satisfies text,\n" +
+                        "  waiting_progress text,\n" +
+                        "  waiting_retry_counter int,\n" +
+                        "  waiting_packed_key_tracker_bits text,\n" +
+                        "  waiting_scheduled_at timestamp,\n" +
+                        // HomeState/TxnState
+                        "  home_phase text,\n" +
+                        "  home_progress text,\n" +
+                        "  home_retry_counter int,\n" +
+                        "  home_scheduled_at timestamp,\n" +
+                        "  PRIMARY KEY (command_store_id, txn_id)" +
+                        ')'));
+        }
+
+        @Override
+        public DataSet data()
+        {
+            CommandStores stores = ((AccordService) 
AccordService.instance()).node().commandStores();
+            List<DefaultProgressLog.ImmutableView> views =
+                getBlockingAndRethrow(stores.map(store -> 
((DefaultProgressLog) store.progressLog()).immutableView()));
+            views.sort(comparing(DefaultProgressLog.ImmutableView::storeId));
+
+            SimpleDataSet ds = new SimpleDataSet(metadata());
+            for (int i = 0, size = views.size(); i < size; ++i)
+            {
+                DefaultProgressLog.ImmutableView view = views.get(i);
+                while (view.advance())
+                {
+                    ds.row(view.storeId(), decompose(view.txnId()))
+                      .column("contact_everyone", view.contactEveryone())
+                      .column("waiting_is_uninitialised", 
view.isWaitingUninitialised())
+                      .column("waiting_blocked_until", 
view.waitingIsBlockedUntil().name())
+                      .column("waiting_home_satisfies", 
view.waitingHomeSatisfies().name())
+                      .column("waiting_progress", 
view.waitingProgress().name())
+                      .column("waiting_retry_counter", 
view.waitingRetryCounter())
+                      .column("waiting_packed_key_tracker_bits", 
Long.toBinaryString(view.waitingPackedKeyTrackerBits()))
+                      .column("waiting_scheduled_at", 
toTimestamp(view.timerScheduledAt(TxnStateKind.Waiting)))
+                      .column("home_phase", view.homePhase().name())
+                      .column("home_progress", view.homeProgress().name())
+                      .column("home_retry_counter", view.homeRetryCounter())
+                      .column("home_scheduled_at", 
toTimestamp(view.timerScheduledAt(TxnStateKind.Home)))
+                    ;
+                }
+            }
+            return ds;
+        }
+
+        private Date toTimestamp(Long deadline)
+        {
+            if (deadline == null)
+                return null;
+
+            long millisSinceEpoch = 
approxTime.translate().toMillisSinceEpoch(deadline * 1000L);
+            return new Date(millisSinceEpoch);
+        }
+    }
+
+    public static final class RedundantBeforeTable extends AbstractVirtualTable
+    {
+        private RedundantBeforeTable(String keyspace)
+        {
+            super(parse(keyspace,
+                        "Accord per-CommandStore RedundantBefore State",
+                        "CREATE TABLE redundant_before (\n" +
+                        "  command_store_id int,\n" +
+                           format("range_start %s,\n", 
ROUTING_KEY_TYPE_STRING) +
+                           format("range_end %s,\n", ROUTING_KEY_TYPE_STRING) +
+                        "  start_ownership_epoch bigint,\n" +
+                        "  end_ownership_epoch bigint,\n" +
+                           format("locally_applied_or_invalidated_before 
%s,\n", TXN_ID_TYPE_STRING) +
+                           
format("locally_decided_and_applied_or_invalidated_before %s,\n", 
TXN_ID_TYPE_STRING) +
+                           format("shard_applied_or_invalidated_before %s,\n", 
TXN_ID_TYPE_STRING) +
+                           format("gc_before %s,\n", TXN_ID_TYPE_STRING) +
+                           format("shard_only_applied_or_invalidated_before 
%s,\n", TXN_ID_TYPE_STRING) +
+                           format("bootstrapped_at %s,\n", TXN_ID_TYPE_STRING) 
+
+                           format("stale_until_at_least %s,\n", 
TIMESTAMP_TYPE_STRING) +
+                        "  PRIMARY KEY (command_store_id, range_start, 
range_end)" +
+                        ')'));
+        }
+
+        @Override
+        public DataSet data()
+        {
+            CommandStores stores = ((AccordService) 
AccordService.instance()).node().commandStores();
+            List<Pair<Integer, RedundantBefore>> rangeMaps =
+                getBlockingAndRethrow(stores.map(store -> 
Pair.create(store.commandStore().id(), 
store.commandStore().unsafeGetRedundantBefore())));
+            rangeMaps.sort(comparing(p -> p.left));
+
+            SimpleDataSet dataSet = new SimpleDataSet(metadata());
+            for (Pair<Integer, RedundantBefore> pair : rangeMaps)
+            {
+                int storeId = pair.left;
+                RedundantBefore redundantBefore = pair.right;
+
+                redundantBefore.foldlWithBounds(
+                    (entry, ds, start, end) -> {
+                        ds.row(storeId, decompose(start), decompose(end))
+                          .column("start_ownership_epoch", 
entry.startOwnershipEpoch)
+                          .column("end_ownership_epoch", 
entry.endOwnershipEpoch)
+                          .column("locally_applied_or_invalidated_before", 
decompose(entry.locallyAppliedOrInvalidatedBefore))
+                          
.column("locally_decided_and_applied_or_invalidated_before", 
decompose(entry.locallyDecidedAndAppliedOrInvalidatedBefore))
+                          .column("shard_applied_or_invalidated_before", 
decompose(entry.shardAppliedOrInvalidatedBefore))
+                          .column("gc_before", decompose(entry.gcBefore))
+                          .column("shard_only_applied_or_invalidated_before", 
decompose(entry.shardOnlyAppliedOrInvalidatedBefore))
+                          .column("bootstrapped_at", 
decompose(entry.bootstrappedAt))
+                          .column("stale_until_at_least", 
entry.staleUntilAtLeast != null ? decompose(entry.staleUntilAtLeast) : null);
+                        return ds;
+                    },
+                    dataSet,
+                    ignore -> false
+                );
+            }
+            return dataSet;
+        }
+    }
+
+    public static final class RejectBeforeTable extends AbstractVirtualTable
+    {
+        private RejectBeforeTable(String keyspace)
+        {
+            super(parse(keyspace,
+                        "Accord per-CommandStore RejectBefore State",
+                        "CREATE TABLE reject_before (\n" +
+                        "  command_store_id int,\n" +
+                           format("range_start %s,\n", 
ROUTING_KEY_TYPE_STRING) +
+                           format("range_end %s,\n", ROUTING_KEY_TYPE_STRING) +
+                           format("txn_id %s,\n", TXN_ID_TYPE_STRING) +
+                        "  PRIMARY KEY (command_store_id, range_start, 
range_end)" +
+                        ')'));
+        }
+
+        @Override
+        public DataSet data()
+        {
+            CommandStores stores = ((AccordService) 
AccordService.instance()).node().commandStores();
+            List<Pair<Integer, RejectBefore>> rangeMaps =
+                getBlockingAndRethrow(stores.map(store -> 
Pair.create(store.commandStore().id(), 
store.commandStore().unsafeGetRejectBefore())));
+            rangeMaps.sort(comparing(p -> p.left));
+
+            SimpleDataSet dataSet = new SimpleDataSet(metadata());
+            for (Pair<Integer, RejectBefore> pair : rangeMaps)
+            {
+                int storeId = pair.left;
+                RejectBefore rejectBefore = pair.right;
+
+                if (rejectBefore == null)
+                    continue;
+
+                rejectBefore.foldlWithBounds(
+                    (txnId, ds, start, end) -> ds.row(storeId, 
decompose(start), decompose(end)).column("txn_id", decompose(txnId)),
+                    dataSet,
+                    ignore -> false
+                );
+            }
+            return dataSet;
+        }
+    }
+
+    public static class TxnBlockedByTable extends AbstractVirtualTable
+    {
+        enum Reason { Self, Txn, Key }
+
+        protected TxnBlockedByTable(String keyspace)
+        {
+            super(parse(keyspace,
+                        "Accord Transactions Blocked By Table" ,
+                        "CREATE TABLE txn_blocked_by (\n" +
+                           format("txn_id %s,\n", TXN_ID_TYPE_STRING) +
+                        "  command_store_id int,\n" +
+                        "  depth int,\n" +
+                           format("blocked_by %s,\n", TXN_ID_TYPE_STRING) +
+                        "  reason text,\n" +
+                        "  save_status text,\n" +
+                           format("execute_at %s,\n", TIMESTAMP_TYPE_STRING) +
+                           format("key %s,\n", ROUTING_KEY_TYPE_STRING) +

Review Comment:
   Adding virtual UDTs correctly would involve:
   1. Adding a virtual table, `system_virtual_schema.types`, to expose all 
virtually-registered UDTs per virtual keyspace
   2. Modifying the drivers to read virtual UDT definitions from this table



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org

Reply via email to