belliottsmith commented on code in PR #2144:
URL: https://github.com/apache/cassandra/pull/2144#discussion_r1103375136
##########
src/java/org/apache/cassandra/service/accord/async/AsyncLoader.java:
##########
@@ -57,131 +59,94 @@
private State state = State.INITIALIZED;
private final AccordCommandStore commandStore;
- private final Iterable<TxnId> txnIds;
- private final Iterable<PartitionKey> keys;
+ private final List<TxnId> txnIds;
+ private final List<RoutableKey> keys;
- protected Future<?> readFuture;
+ protected AsyncResult<?> readResult;
- public AsyncLoader(AccordCommandStore commandStore, Iterable<TxnId>
txnIds, Iterable<PartitionKey> keys)
+ public AsyncLoader(AccordCommandStore commandStore, Iterable<TxnId>
txnIds, Iterable<RoutableKey> keys)
{
this.commandStore = commandStore;
- this.txnIds = txnIds;
- this.keys = keys;
+ this.txnIds = Lists.newArrayList(txnIds);
+ this.keys = Lists.newArrayList(keys);
}
- private <K, V extends AccordState<K>> Future<?> referenceAndDispatch(K key,
-
AccordStateCache.Instance<K, V> cache,
-
Map<K, V> context,
-
Function<V, Future<?>> readFunction,
-
Object callback)
- {
- V item;
- Future<?> future = cache.getLoadFuture(key);
- if (future != null)
- {
- // if a load future exists for this, it must be present in the
cache
- item = cache.getOrNull(key);
- Preconditions.checkState(item != null);
- context.put(key, item);
- if (logger.isTraceEnabled())
- logger.trace("Existing load future found for {} while loading
for {}. ({})", item.key(), callback, item);
- return future;
- }
-
- item = cache.getOrCreate(key);
- context.put(key, item);
- if (item.isLoaded())
- {
- if (logger.isTraceEnabled())
- logger.trace("Cached item found for {} while loading for {}.
({})", item.key(), callback, item);
- return null;
- }
-
- future = readFunction.apply(item);
- cache.setLoadFuture(item.key(), future);
- if (logger.isTraceEnabled())
- logger.trace("Loading new item for {} while loading for {}. ({})",
item.key(), callback, item);
- return future;
- }
-
-
- private <K, V extends AccordState<K>> List<Future<?>>
referenceAndDispatchReads(Iterable<K> keys,
+ private <K, V extends ImmutableState> List<AsyncChain<Void>>
referenceAndDispatchReads(Iterable<K> keys,
AccordStateCache.Instance<K, V> cache,
-
Map<K, V> context,
-
Function<V, Future<?>> readFunction,
-
List<Future<?>> futures,
+
LoadFunction<K, V> loadFunction,
+
List<AsyncChain<Void>> results,
Object callback)
{
for (K key : keys)
{
- Future<?> future = referenceAndDispatch(key, cache, context,
readFunction, callback);
- if (future == null)
+ AsyncResult<Void> result = cache.referenceAndLoad(key,
loadFunction);
+ if (result == null)
continue;
- if (futures == null)
- futures = new ArrayList<>();
+ if (results == null)
+ results = new ArrayList<>();
- futures.add(future);
+ results.add(result);
}
- return futures;
+ return results;
}
@VisibleForTesting
- Function<AccordCommand, Future<?>> loadCommandFunction(Object callback)
+ LoadFunction<TxnId, Command> loadCommandFunction(Object callback)
Review Comment:
So, firstly, I'm absolutely a hard -1 on introducing another stage for
reading from disk. We're performing a read, it should go on the read stage
which is currently our only mechanism for controlling read parallelism from
disk.
Secondly, sure, we "stall" transactions that depend on it. But the
alternative is stalling all other transactions (and non transactions)?
There's essentially no need to permit an arbitrary level of fanout. Reading
a few records sequentially before processing a transaction is totally
acceptable. Certainly forbidding more than (some) N in parallel is just...
sensible? Why would you want the whole system to stop processing other work
because somebody submitted something that reads 500 keys with lots of data?
Also, those transactions that depend on it are more likely to be waiting for
the exact same data to be read from disk.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]