dcapwell commented on code in PR #2339:
URL: https://github.com/apache/cassandra/pull/2339#discussion_r1204849063
##########
src/java/org/apache/cassandra/service/accord/AccordCommandStore.java:
##########
@@ -94,17 +120,67 @@ public AccordCommandStore(int id,
this.commandCache = stateCache.instance(TxnId.class,
accord.local.Command.class, AccordSafeCommand::new, AccordObjectSizes::command);
this.commandsForKeyCache = stateCache.instance(RoutableKey.class,
CommandsForKey.class, AccordSafeCommandsForKey::new,
AccordObjectSizes::commandsForKey);
executor.execute(() -> CommandStore.register(this));
+ executor.execute(this::loadRangesToCommands);
}
- @Override
- public boolean inStore()
+ private void loadRangesToCommands()
{
- return Thread.currentThread().getId() == threadId;
+ AsyncPromise<CommandsForRanges> future = new AsyncPromise<>();
+ AccordKeyspace.findAllCommandsByDomain(id, Routable.Domain.Range,
ImmutableSet.of("txn_id", "status", "txn", "execute_at", "dependencies"), new
Observable<UntypedResultSet.Row>()
+ {
+ private CommandsForRanges.Builder builder = new
CommandsForRanges.Builder();
+ @Override
+ public void onNext(UntypedResultSet.Row row) throws Exception
+ {
+ TxnId txnId = AccordKeyspace.deserializeTxnId(row);
Review Comment:
just thought about this... the listener isn't durable, so I might need to
load them to re-create the listeners... :sigh:
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]