dcapwell commented on code in PR #2339:
URL: https://github.com/apache/cassandra/pull/2339#discussion_r1195696532


##########
src/java/org/apache/cassandra/service/accord/AccordKeyspace.java:
##########
@@ -595,13 +653,257 @@ public static UntypedResultSet 
loadCommandRow(CommandStore commandStore, TxnId t
     {
         String cql = "SELECT * FROM %s.%s " +
                      "WHERE store_id = ? " +
+                     "AND domain = ? " +
                      "AND txn_id=(?, ?, ?)";
 
         return executeInternal(String.format(cql, ACCORD_KEYSPACE_NAME, 
COMMANDS),
                                commandStore.id(),
+                               txnId.domain().ordinal(),
                                txnId.msb, txnId.lsb, txnId.node.id);
     }
 
+    public static void findAllCommandsByDomain(int commandStore, 
Routable.Domain domain, Set<String> columns, Observable<UntypedResultSet.Row> 
callback)
+    {
+        WalkCommandsForDomain work = new WalkCommandsForDomain(commandStore, 
domain, columns, Stage.READ.executor(), callback);
+        work.schedule();
+    }
+
+    private static abstract class TableWalk implements Runnable, DebuggableTask
+    {
+        private final long creationTimeNanos = Clock.Global.nanoTime();
+        private final Executor executor;
+        private final Observable<UntypedResultSet.Row> callback;
+        private long startTimeNanos = -1;
+        private int numQueries = 0;
+        private UntypedResultSet.Row lastSeen = null;
+
+        private TableWalk(Executor executor, Observable<UntypedResultSet.Row> 
callback)
+        {
+            this.executor = executor;
+            this.callback = callback;
+        }
+
+        protected boolean shouldContinue(UntypedResultSet.Row lastRow)
+        {
+            return true;
+        }
+
+        protected abstract UntypedResultSet query(UntypedResultSet.Row 
lastSeen);
+
+        public final void schedule()
+        {
+            executor.execute(this);
+        }
+
+        @Override
+        public final void run()
+        {
+            try
+            {
+                if (startTimeNanos == -1)
+                    startTimeNanos = Clock.Global.nanoTime();
+                numQueries++;
+                UntypedResultSet result = query(lastSeen);
+                if (result.isEmpty())
+                {
+                    callback.onCompleted();
+                    return;
+                }
+                UntypedResultSet.Row lastRow = null;
+                for (UntypedResultSet.Row row : result)
+                {
+                    callback.onNext(row);
+                    lastRow = row;
+                }
+                if (shouldContinue(lastRow))
+                {
+                    lastSeen = lastRow;
+                    schedule();
+                }
+                else
+                {
+                    lastSeen = null;
+                }
+            }
+            catch (Throwable t)
+            {
+                callback.onError(t);
+            }
+        }
+
+        @Override
+        public long creationTimeNanos()
+        {
+            return creationTimeNanos;
+        }
+
+        @Override
+        public long startTimeNanos()
+        {
+            return startTimeNanos;
+        }
+
+        @Override
+        public String description()
+        {
+            return String.format("Table Walker for %s; queries = %d", 
getClass().getSimpleName(), numQueries);
+        }
+    }
+
+    private static String selection(TableMetadata metadata, Set<String> 
requiredColumns, Set<String> forIteration)
+    {
+        StringBuilder selection = new StringBuilder();
+        if (requiredColumns.isEmpty())
+            selection.append("*");
+        else
+        {
+            Sets.SetView<String> other = Sets.difference(requiredColumns, 
forIteration);
+            for (String name : other)
+            {
+                ColumnMetadata meta = metadata.getColumn(new 
ColumnIdentifier(name, true));
+                if (meta == null)
+                    throw new IllegalArgumentException("Unknown column: " + 
name);
+            }
+            List<String> names = new ArrayList<>(forIteration.size() + 
other.size());
+            names.addAll(forIteration);
+            names.addAll(other);
+            // this sort is to make sure the CQL is determanistic
+            Collections.sort(names);
+            for (int i = 0; i < names.size(); i++)
+            {
+                if (i > 0)
+                    selection.append(", ");
+                selection.append(names.get(i));
+            }
+        }
+        return selection.toString();
+    }
+
+    private static class WalkCommandsForDomain extends TableWalk
+    {
+        private static final Set<String> COLUMNS_FOR_ITERATION = 
ImmutableSet.of("txn_id", "store_id", "domain");
+        private final String cql;
+        private final int storeId, domain;
+
+        private WalkCommandsForDomain(int commandStore, Routable.Domain 
domain, Set<String> requiredColumns, Executor executor, 
Observable<UntypedResultSet.Row> callback)
+        {
+            super(executor, callback);
+            this.storeId = commandStore;
+            this.domain = domain.ordinal();
+            cql = String.format("SELECT %s " +
+                                "FROM %s " +
+                                "WHERE store_id = ? " +
+                                "      AND domain = ? " +
+                                "      AND token(store_id, domain, txn_id) > 
token(?, ?, (?, ?, ?)) " +

Review Comment:
   since we know this table has a token layout that matches the data (local 
partitioner) this helps make sure that the exec engine does what we want.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to