dcapwell commented on code in PR #2144:
URL: https://github.com/apache/cassandra/pull/2144#discussion_r1103101457
##########
src/java/org/apache/cassandra/service/accord/async/AsyncLoader.java:
##########
@@ -57,131 +59,94 @@
private State state = State.INITIALIZED;
private final AccordCommandStore commandStore;
- private final Iterable<TxnId> txnIds;
- private final Iterable<PartitionKey> keys;
+ private final List<TxnId> txnIds;
+ private final List<RoutableKey> keys;
- protected Future<?> readFuture;
+ protected AsyncResult<?> readResult;
- public AsyncLoader(AccordCommandStore commandStore, Iterable<TxnId>
txnIds, Iterable<PartitionKey> keys)
+ public AsyncLoader(AccordCommandStore commandStore, Iterable<TxnId>
txnIds, Iterable<RoutableKey> keys)
{
this.commandStore = commandStore;
- this.txnIds = txnIds;
- this.keys = keys;
+ this.txnIds = Lists.newArrayList(txnIds);
+ this.keys = Lists.newArrayList(keys);
}
- private <K, V extends AccordState<K>> Future<?> referenceAndDispatch(K key,
-
AccordStateCache.Instance<K, V> cache,
-
Map<K, V> context,
-
Function<V, Future<?>> readFunction,
-
Object callback)
- {
- V item;
- Future<?> future = cache.getLoadFuture(key);
- if (future != null)
- {
- // if a load future exists for this, it must be present in the
cache
- item = cache.getOrNull(key);
- Preconditions.checkState(item != null);
- context.put(key, item);
- if (logger.isTraceEnabled())
- logger.trace("Existing load future found for {} while loading
for {}. ({})", item.key(), callback, item);
- return future;
- }
-
- item = cache.getOrCreate(key);
- context.put(key, item);
- if (item.isLoaded())
- {
- if (logger.isTraceEnabled())
- logger.trace("Cached item found for {} while loading for {}.
({})", item.key(), callback, item);
- return null;
- }
-
- future = readFunction.apply(item);
- cache.setLoadFuture(item.key(), future);
- if (logger.isTraceEnabled())
- logger.trace("Loading new item for {} while loading for {}. ({})",
item.key(), callback, item);
- return future;
- }
-
-
- private <K, V extends AccordState<K>> List<Future<?>>
referenceAndDispatchReads(Iterable<K> keys,
+ private <K, V extends ImmutableState> List<AsyncChain<Void>>
referenceAndDispatchReads(Iterable<K> keys,
AccordStateCache.Instance<K, V> cache,
-
Map<K, V> context,
-
Function<V, Future<?>> readFunction,
-
List<Future<?>> futures,
+
LoadFunction<K, V> loadFunction,
+
List<AsyncChain<Void>> results,
Object callback)
{
for (K key : keys)
{
- Future<?> future = referenceAndDispatch(key, cache, context,
readFunction, callback);
- if (future == null)
+ AsyncResult<Void> result = cache.referenceAndLoad(key,
loadFunction);
+ if (result == null)
continue;
- if (futures == null)
- futures = new ArrayList<>();
+ if (results == null)
+ results = new ArrayList<>();
- futures.add(future);
+ results.add(result);
}
- return futures;
+ return results;
}
@VisibleForTesting
- Function<AccordCommand, Future<?>> loadCommandFunction(Object callback)
+ LoadFunction<TxnId, Command> loadCommandFunction(Object callback)
Review Comment:
I don't see it, could you explain the case? If we don't have the tx in
memory we need to fetch it from somewhere before we can process, which
currently requires we read from the system table. If we limit how many we can
do do we not stall the transactions that depend on this one? The load function
only comes into play when the data isn't in-memory, so any operation that has
all tx state in-memory can make progress while the ones that need to fetch are
required to reschedule after that load.
If the concern is about reusing the read stage because others may be using
it, then I am cool having a new `METADATA_READ` stage which separates internal
reads from user reads
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]