iamaleksey commented on code in PR #2256:
URL: https://github.com/apache/cassandra/pull/2256#discussion_r1184895444
##########
src/java/org/apache/cassandra/service/accord/AccordCommandStores.java:
##########
@@ -15,28 +15,102 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.cassandra.service.accord;
import accord.api.Agent;
import accord.api.DataStore;
import accord.api.ProgressLog;
import accord.local.CommandStores;
import accord.local.NodeTimeService;
+import accord.local.PreLoadContext;
+import accord.local.SafeCommandStore;
import accord.local.ShardDistributor;
+import accord.primitives.Routables;
import accord.topology.Topology;
+import accord.utils.MapReduceConsume;
import accord.utils.RandomSource;
+import org.apache.cassandra.concurrent.ExecutionFailure;
+import org.apache.cassandra.concurrent.ImmediateExecutor;
+import org.apache.cassandra.journal.AsyncWriteCallback;
public class AccordCommandStores extends CommandStores<AccordCommandStore>
{
- private long cacheSize;
+ private final AccordJournal journal;
+
AccordCommandStores(NodeTimeService time, Agent agent, DataStore store,
RandomSource random,
- ShardDistributor shardDistributor, ProgressLog.Factory
progressLogFactory)
+ ShardDistributor shardDistributor, ProgressLog.Factory
progressLogFactory, AccordJournal journal)
{
super(time, agent, store, random, shardDistributor,
progressLogFactory, AccordCommandStore::new);
+ this.journal = journal;
setCacheSize(maxCacheSize());
}
+ static Factory factory(AccordJournal journal)
+ {
+ return (time, agent, store, random, shardDistributor,
progressLogFactory) ->
+ new AccordCommandStores(time, agent, store, random,
shardDistributor, progressLogFactory, journal);
+ }
+
+ @Override
+ public synchronized void shutdown()
+ {
+ super.shutdown();
+ journal.shutdown();
+ //TODO shutdown isn't useful by itself, we need a way to "wait" as
well. Should be AutoCloseable or offer awaitTermination as well (think
Shutdownable interface)
+ }
+
+ @Override
+ protected <O> void mapReduceConsume(
+ PreLoadContext context,
+ Routables<?, ?> keys,
+ long minEpoch,
+ long maxEpoch,
+ MapReduceConsume<? super SafeCommandStore, O> mapReduceConsume)
+ {
+ // append PreAccept, Accept, Commit, and Apply messages durably to
AccordJournal before processing
+ if (journal.mustMakeDurable(context))
+ mapReduceConsumeDurable(context, keys, minEpoch, maxEpoch,
mapReduceConsume);
+ else
+ super.mapReduceConsume(context, keys, minEpoch, maxEpoch,
mapReduceConsume);
+ }
+
+ private <O> void mapReduceConsumeDurable(
+ PreLoadContext context,
+ Routables<?, ?> keys,
+ long minEpoch,
+ long maxEpoch,
+ MapReduceConsume<? super SafeCommandStore, O> mapReduceConsume)
+ {
+ journal.append(context, ImmediateExecutor.INSTANCE, new
AsyncWriteCallback()
+ {
+ @Override
+ public void run()
+ {
+ // TODO (performance, expected): do not retain references to
messages beyond a certain total
+ // cache threshold; in case of flush lagging behind, read
the messages from journal and
+ // deserialize instead before processing, to prevent
memory pressure buildup from messages
+ // pending flush to disk.
+ AccordCommandStores.super.mapReduceConsume(context, keys,
minEpoch, maxEpoch, mapReduceConsume);
+ }
+
+ @Override
+ public void onFailure(Throwable error)
+ {
+ // should we invoke Agent#onUncaughtException() instead?
+ ExecutionFailure.handle(error);
Review Comment:
Good call, incorporated.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]