belliottsmith commented on code in PR #2144:
URL: https://github.com/apache/cassandra/pull/2144#discussion_r1124334649
##########
src/java/org/apache/cassandra/service/accord/AccordStateCache.java:
##########
@@ -34,101 +34,97 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import accord.api.Data;
+import accord.utils.Invariants;
+import accord.utils.async.AsyncChains;
+import accord.utils.async.AsyncResult;
import org.apache.cassandra.utils.ObjectSizes;
-import org.apache.cassandra.utils.concurrent.Future;
-import org.apache.cassandra.utils.concurrent.FutureCombiner;
/**
* Cache for AccordCommand and AccordCommandsForKey, available memory is
shared between the two object types.
*
* Supports dynamic object sizes. After each acquire/free cycle, the cacheable
objects size is recomputed to
* account for data added/removed during txn processing if it's modified flag
is set
- *
- * TODO: explain how items move to and from the active pool and are evicted
*/
public class AccordStateCache
{
private static final Logger logger =
LoggerFactory.getLogger(AccordStateCache.class);
- private static class WriteOnlyGroup<K, V extends AccordState<K>>
+ public static class Node<K, V> extends AccordLoadingState<K, V>
{
- private boolean locked = false;
- private List<AccordState.WriteOnly<K, V>> items = new ArrayList<>();
+ static final long EMPTY_SIZE = ObjectSizes.measure(new
AccordStateCache.Node(null));
- @Override
- public String toString()
+ private Node<?, ?> prev;
+ private Node<?, ?> next;
+ private int references = 0;
+ private long lastQueriedEstimatedSizeOnHeap = 0;
+
+ public Node(K key)
{
- return "WriteOnlyGroup{" +
- "locked=" + locked +
- ", items=" + items +
- '}';
+ super(key);
}
- void lock()
+ public int referenceCount()
{
- locked = true;
+ return references;
}
- void add(AccordState.WriteOnly<K, V> item)
+ boolean isLoaded()
{
- items.add(item);
+ return state() == LoadingState.LOADED;
}
- void purge()
+ public boolean isComplete()
{
- if (locked)
- return;
-
- while (!items.isEmpty())
+ switch (state())
{
- AccordState.WriteOnly<K, V> item = items.get(0);
-
- // we can't remove items out of order, so if we encounter a
write is still pending, we stop
- if (item.future() == null || !item.future().isDone())
- break;
-
- items.remove(0);
+ case PENDING:
+ case NOT_FOUND:
+ return false;
+ case FAILED:
+ case LOADED:
+ return true;
+ default: throw new UnsupportedOperationException("Unknown
state: " + state());
}
}
- boolean isEmpty()
+ private boolean isUnlinked()
{
- return items.isEmpty();
+ return prev == null && next == null;
}
- }
- static class Node<K, V extends AccordState<K>>
- {
- static final long EMPTY_SIZE = ObjectSizes.measure(new
AccordStateCache.Node<>(null));
-
- final V value;
- private Node<?, ?> prev;
- private Node<?, ?> next;
- private int references = 0;
- private long lastQueriedEstimatedSizeOnHeap = 0;
+ long estimatedSizeOnHeap(ToLongFunction<V> estimator)
+ {
+ long result = EMPTY_SIZE;
+ V v;
+ if (isLoaded() && (v = value()) != null)
+ result += estimator.applyAsLong(v);
Review Comment:
why +=?
##########
src/java/org/apache/cassandra/service/accord/AccordStateCache.java:
##########
@@ -34,101 +34,97 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import accord.api.Data;
+import accord.utils.Invariants;
+import accord.utils.async.AsyncChains;
+import accord.utils.async.AsyncResult;
import org.apache.cassandra.utils.ObjectSizes;
-import org.apache.cassandra.utils.concurrent.Future;
-import org.apache.cassandra.utils.concurrent.FutureCombiner;
/**
* Cache for AccordCommand and AccordCommandsForKey, available memory is
shared between the two object types.
*
* Supports dynamic object sizes. After each acquire/free cycle, the cacheable
objects size is recomputed to
* account for data added/removed during txn processing if it's modified flag
is set
- *
- * TODO: explain how items move to and from the active pool and are evicted
*/
public class AccordStateCache
{
private static final Logger logger =
LoggerFactory.getLogger(AccordStateCache.class);
- private static class WriteOnlyGroup<K, V extends AccordState<K>>
+ public static class Node<K, V> extends AccordLoadingState<K, V>
{
- private boolean locked = false;
- private List<AccordState.WriteOnly<K, V>> items = new ArrayList<>();
+ static final long EMPTY_SIZE = ObjectSizes.measure(new
AccordStateCache.Node(null));
- @Override
- public String toString()
+ private Node<?, ?> prev;
+ private Node<?, ?> next;
+ private int references = 0;
+ private long lastQueriedEstimatedSizeOnHeap = 0;
+
+ public Node(K key)
{
- return "WriteOnlyGroup{" +
- "locked=" + locked +
- ", items=" + items +
- '}';
+ super(key);
}
- void lock()
+ public int referenceCount()
{
- locked = true;
+ return references;
}
- void add(AccordState.WriteOnly<K, V> item)
+ boolean isLoaded()
{
- items.add(item);
+ return state() == LoadingState.LOADED;
}
- void purge()
+ public boolean isComplete()
{
- if (locked)
- return;
-
- while (!items.isEmpty())
+ switch (state())
{
- AccordState.WriteOnly<K, V> item = items.get(0);
-
- // we can't remove items out of order, so if we encounter a
write is still pending, we stop
- if (item.future() == null || !item.future().isDone())
- break;
-
- items.remove(0);
+ case PENDING:
+ case NOT_FOUND:
+ return false;
+ case FAILED:
+ case LOADED:
+ return true;
+ default: throw new UnsupportedOperationException("Unknown
state: " + state());
}
}
- boolean isEmpty()
+ private boolean isUnlinked()
Review Comment:
isEligibleForEviction?
##########
src/java/org/apache/cassandra/service/accord/AccordStateCache.java:
##########
@@ -346,226 +377,157 @@ public int hashCode()
return Objects.hash(keyClass, valClass);
}
- private V getOrCreate(K key, boolean createIfAbsent)
+ private Node<K, V> reference(K key, boolean createIfAbsent)
{
stats.queries++;
AccordStateCache.this.stats.queries++;
- Node<K, V> node = (Node<K, V>) active.get(key);
- if (node != null)
- {
- stats.hits++;
- AccordStateCache.this.stats.hits++;
- node.references++;
- return node.value;
- }
-
- node = (Node<K, V>) cache.remove(key);
-
+ Node<K, V> node = (Node<K, V>) cache.get(key);
if (node == null)
{
stats.misses++;
AccordStateCache.this.stats.misses++;
if (!createIfAbsent)
return null;
- V value = factory.apply(key);
- node = new Node<>(value);
- updateSize(node);
+ node = new Node<>(key);
+ // need to store ref right away, so eviction can not remove
+ node.references++;
+ cache.put(key, node);
+ updateSize(node, heapEstimator);
+ maybeEvict();
}
else
{
+ if (node.state() == AccordLoadingState.LoadingState.FAILED)
Review Comment:
static import?
##########
src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java:
##########
@@ -47,10 +52,30 @@
private static final String ASYNC_OPERATION = "async_op";
}
+ static class Context
+ {
+ final HashMap<TxnId, AccordSafeCommand> commands = new HashMap<>();
+ final HashMap<RoutableKey, AccordSafeCommandsForKey> commandsForKeys =
new HashMap<>();
+
+ void releaseResources(AccordCommandStore commandStore)
+ {
+ commands.values().forEach(commandStore.commandCache()::release);
+
commandsForKeys.values().forEach(commandStore.commandsForKeyCache()::release);
+ }
+
+ void revertChanges()
+ {
+ commands.values().forEach(AccordSafeState::revert);
+ commandsForKeys.values().forEach(AccordSafeState::revert);
+ }
+ }
+
enum State
{
INITIALIZED,
+ SUBMITTED,
Review Comment:
Unused? Would be good in general to comment these, at least the not 100%
obvious ones like difference between SAVING and AWAITING_SAVE, LOADING and
PREPARING.
##########
src/java/org/apache/cassandra/service/accord/AccordStateCache.java:
##########
@@ -220,20 +231,21 @@ private void push(Node<?, ?> node)
head = node;
tail = node;
}
+ linked++;
}
- private void updateSize(Node<?, ?> node)
+ private <K, V> void updateSize(Node<K, V> node, ToLongFunction<V>
estimator)
{
- bytesCached += node.estimatedSizeOnHeapDelta();
+ bytesCached += node.estimatedSizeOnHeapDelta(estimator);
}
- // don't evict if there's an outstanding save future. If an item is
evicted then reloaded
+ // don't evict if there's an outstanding save result. If an item is
evicted then reloaded
// before it's mutation is applied, out of date info will be loaded
- private boolean canEvict(Object key)
+ private boolean canEvict(Node<?, ?> node)
{
- // getFuture only returns a future if it is running, so don't need to
check if its still running
- Future<?> future = getFuture(saveFutures, key);
- return future == null;
+ return node.references == 0 &&
+ node.isLoaded() &&
Review Comment:
Why can't we evict if we aren't loaded? Not sure if this ever comes up, but
since we don't account for any memory prior to loading, I don't think we gain
any greater accuracy by requiring the node is loaded before evicting it, if for
some reason the operation is abandoned. Ideally we'd be able to interrupt the
load, but that is a separate matter.
##########
src/java/org/apache/cassandra/service/accord/AccordStateCache.java:
##########
@@ -149,17 +145,12 @@ public NamedMap(String name)
}
}
- public final Map<Object, Node<?, ?>> active = new HashMap<>();
private final Map<Object, Node<?, ?>> cache = new HashMap<>();
- private final Map<Object, WriteOnlyGroup<?, ?>> pendingWriteOnly = new
HashMap<>();
- private final Set<Instance<?, ?>> instances = new HashSet<>();
-
- private final NamedMap<Object, Future<?>> loadFutures = new
NamedMap<>("loadFutures");
- private final NamedMap<Object, Future<?>> saveFutures = new
NamedMap<>("saveFutures");
+ private final Set<Instance<?, ?, ?>> instances = new HashSet<>();
- private final NamedMap<Object, Future<Data>> readFutures = new
NamedMap<>("readFutures");
- private final NamedMap<Object, Future<?>> writeFutures = new
NamedMap<>("writeFutures");
+ private final NamedMap<Object, AsyncResult<Void>> saveResults = new
NamedMap<>("saveResults");
+ private int linked = 0;
Review Comment:
eligibleForEviction?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]