This is an automated email from the ASF dual-hosted git repository.

ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 61864e4634 IGNITE-19946 Implement batch log updates for RAFT. (#2273)
61864e4634 is described below

commit 61864e46340478afb0faf13cce28b509df68cfe1
Author: Ivan Bessonov <bessonov...@gmail.com>
AuthorDate: Tue Jul 11 12:59:39 2023 +0300

    IGNITE-19946 Implement batch log updates for RAFT. (#2273)
---
 .../internal/raft/server/impl/JraftServerImpl.java |  20 +-
 .../storage/impl/DefaultLogStorageFactory.java     |  43 +++-
 .../raft/storage/impl/RocksDbSharedLogStorage.java | 140 +++++------
 .../raft/storage/impl/StripeAwareLogManager.java   | 260 +++++++++++++++++++++
 .../apache/ignite/raft/jraft/core/NodeImpl.java    |  31 ++-
 .../raft/jraft/disruptor/StripedDisruptor.java     |  17 +-
 .../ignite/raft/jraft/option/NodeOptions.java      |  17 +-
 .../raft/jraft/storage/impl/LogManagerImpl.java    |  43 ++--
 .../ignite/disruptor/StripedDisruptorTest.java     |   6 +-
 .../ignite/raft/jraft/core/FSMCallerTest.java      |   3 +-
 .../raft/jraft/core/ReadOnlyServiceTest.java       |   3 +-
 .../raft/jraft/storage/impl/LogManagerTest.java    |   3 +-
 .../ItRaftCommandLeftInLogUntilRestartTest.java    |   3 +-
 13 files changed, 478 insertions(+), 111 deletions(-)

diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
index ce3285c841..7619821e8d 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
@@ -37,6 +37,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import java.util.function.BiPredicate;
+import java.util.stream.IntStream;
 import org.apache.ignite.internal.raft.Peer;
 import org.apache.ignite.internal.raft.PeersAndLearners;
 import org.apache.ignite.internal.raft.RaftGroupEventsListener;
@@ -50,6 +51,7 @@ import 
org.apache.ignite.internal.raft.service.RaftGroupListener;
 import org.apache.ignite.internal.raft.storage.LogStorageFactory;
 import org.apache.ignite.internal.raft.storage.impl.DefaultLogStorageFactory;
 import org.apache.ignite.internal.raft.storage.impl.IgniteJraftServiceFactory;
+import 
org.apache.ignite.internal.raft.storage.impl.StripeAwareLogManager.Stripe;
 import org.apache.ignite.internal.raft.util.ThreadLocalOptimizedMarshaller;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.thread.NamedThreadFactory;
@@ -245,7 +247,9 @@ public class JraftServerImpl implements RaftServer {
                     NamedThreadFactory.threadPrefix(opts.getServerName(), 
"JRaft-FSMCaller-Disruptor"),
                     opts.getRaftOptions().getDisruptorBufferSize(),
                     ApplyTask::new,
-                    opts.getStripes()));
+                    opts.getStripes(),
+                    false
+            ));
         }
 
         if (opts.getNodeApplyDisruptor() == null) {
@@ -253,7 +257,9 @@ public class JraftServerImpl implements RaftServer {
                     NamedThreadFactory.threadPrefix(opts.getServerName(), 
"JRaft-NodeImpl-Disruptor"),
                     opts.getRaftOptions().getDisruptorBufferSize(),
                     LogEntryAndClosure::new,
-                    opts.getStripes()));
+                    opts.getStripes(),
+                    false
+            ));
         }
 
         if (opts.getReadOnlyServiceDisruptor() == null) {
@@ -261,7 +267,9 @@ public class JraftServerImpl implements RaftServer {
                     NamedThreadFactory.threadPrefix(opts.getServerName(), 
"JRaft-ReadOnlyService-Disruptor"),
                     opts.getRaftOptions().getDisruptorBufferSize(),
                     ReadIndexEvent::new,
-                    opts.getStripes()));
+                    opts.getStripes(),
+                    false
+            ));
         }
 
         if (opts.getLogManagerDisruptor() == null) {
@@ -269,7 +277,11 @@ public class JraftServerImpl implements RaftServer {
                     NamedThreadFactory.threadPrefix(opts.getServerName(), 
"JRaft-LogManager-Disruptor"),
                     opts.getRaftOptions().getDisruptorBufferSize(),
                     StableClosureEvent::new,
-                    opts.getStripes()));
+                    opts.getStripes(),
+                    true
+            ));
+
+            opts.setLogStripes(IntStream.range(0, 
opts.getStripes()).mapToObj(i -> new Stripe()).collect(toList()));
         }
 
         logStorageFactory.start();
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/DefaultLogStorageFactory.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/DefaultLogStorageFactory.java
index f4b62d10b3..7558b2aafb 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/DefaultLogStorageFactory.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/DefaultLogStorageFactory.java
@@ -45,6 +45,7 @@ import org.rocksdb.DBOptions;
 import org.rocksdb.Env;
 import org.rocksdb.Priority;
 import org.rocksdb.RocksDB;
+import org.rocksdb.WriteBatch;
 import org.rocksdb.util.SizeUnit;
 
 /** Implementation of the {@link LogStorageFactory} that creates {@link 
RocksDbSharedLogStorage}s. */
@@ -69,6 +70,15 @@ public class DefaultLogStorageFactory implements 
LogStorageFactory {
     /** Data column family handle. */
     private ColumnFamilyHandle dataHandle;
 
+    /**
+     * Thread-local batch instance, used by {@link 
RocksDbSharedLogStorage#appendEntriesToBatch(List)} and
+     * {@link RocksDbSharedLogStorage#commitWriteBatch()}.
+     * <br>
+     * Shared between instances to provide more efficient way of executing 
batch updates.
+     */
+    @SuppressWarnings("ThreadLocalNotStaticFinal")
+    private final ThreadLocal<WriteBatch> threadLocalWriteBatch = new 
ThreadLocal<>();
+
     /**
      * Constructor.
      *
@@ -77,8 +87,7 @@ public class DefaultLogStorageFactory implements 
LogStorageFactory {
     public DefaultLogStorageFactory(Path path) {
         this.path = path;
 
-        executorService = Executors.newFixedThreadPool(
-                Runtime.getRuntime().availableProcessors() * 2,
+        executorService = Executors.newSingleThreadExecutor(
                 new NamedThreadFactory("raft-shared-log-storage-pool", LOG)
         );
     }
@@ -136,7 +145,35 @@ public class DefaultLogStorageFactory implements 
LogStorageFactory {
     /** {@inheritDoc} */
     @Override
     public LogStorage createLogStorage(String groupId, RaftOptions 
raftOptions) {
-        return new RocksDbSharedLogStorage(db, confHandle, dataHandle, 
groupId, raftOptions, executorService);
+        return new RocksDbSharedLogStorage(this, db, confHandle, dataHandle, 
groupId, raftOptions, executorService);
+    }
+
+    /**
+     * Returns a thread-local {@link WriteBatch} instance, attached to current 
factory, append data from multiple storages at the same time.
+     */
+    WriteBatch getOrCreateThreadLocalWriteBatch() {
+        WriteBatch writeBatch = threadLocalWriteBatch.get();
+
+        if (writeBatch == null) {
+            writeBatch = new WriteBatch();
+
+            threadLocalWriteBatch.set(writeBatch);
+        }
+
+        return writeBatch;
+    }
+
+    /**
+     * Clears {@link WriteBatch} returned by {@link 
#getOrCreateThreadLocalWriteBatch()}.
+     */
+    void clearThreadLocalWriteBatch() {
+        WriteBatch writeBatch = threadLocalWriteBatch.get();
+
+        if (writeBatch != null) {
+            writeBatch.close();
+
+            threadLocalWriteBatch.set(null);
+        }
     }
 
     /**
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/RocksDbSharedLogStorage.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/RocksDbSharedLogStorage.java
index 15f3f7b5a2..8d989f1596 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/RocksDbSharedLogStorage.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/RocksDbSharedLogStorage.java
@@ -68,13 +68,6 @@ public class RocksDbSharedLogStorage implements LogStorage, 
Describer {
         RocksDB.loadLibrary();
     }
 
-    /**
-     * An empty write context.
-     */
-    private static class EmptyWriteContext implements WriteContext {
-        static EmptyWriteContext INSTANCE = new EmptyWriteContext();
-    }
-
     /**
      * VarHandle that gives the access to the elements of a {@code byte[]} 
array viewed as if it was a {@code long[]}
      * array.
@@ -89,6 +82,9 @@ public class RocksDbSharedLogStorage implements LogStorage, 
Describer {
      */
     private static final byte[] FIRST_LOG_IDX_KEY = 
Utils.getBytes("meta/firstLogIndex");
 
+    /** Log factory instance, that created current log storage. */
+    private final DefaultLogStorageFactory logStorageFactory;
+
     /** Shared db instance. */
     private final RocksDB db;
 
@@ -142,6 +138,7 @@ public class RocksDbSharedLogStorage implements LogStorage, 
Describer {
 
     /** Constructor. */
     RocksDbSharedLogStorage(
+            DefaultLogStorageFactory logStorageFactory,
             RocksDB db,
             ColumnFamilyHandle confHandle,
             ColumnFamilyHandle dataHandle,
@@ -163,6 +160,7 @@ public class RocksDbSharedLogStorage implements LogStorage, 
Describer {
                 "Raft group id " + groupId + " must not contain char(1)"
         );
 
+        this.logStorageFactory = logStorageFactory;
         this.db = db;
         this.confHandle = confHandle;
         this.dataHandle = dataHandle;
@@ -176,6 +174,13 @@ public class RocksDbSharedLogStorage implements 
LogStorage, Describer {
         this.writeOptions.setSync(raftOptions.isSync());
     }
 
+    /**
+     * Returns the log factory instance, that created current log storage.
+     */
+    DefaultLogStorageFactory getLogStorageFactory() {
+        return logStorageFactory;
+    }
+
     /** {@inheritDoc} */
     @Override
     public boolean init(LogStorageOptions opts) {
@@ -403,23 +408,17 @@ public class RocksDbSharedLogStorage implements 
LogStorage, Describer {
                     return false;
                 }
 
-                WriteContext writeCtx = newWriteContext();
                 long logIndex = entry.getId().getIndex();
                 byte[] valueBytes = this.logEntryEncoder.encode(entry);
-                byte[] newValueBytes = onDataAppend(logIndex, valueBytes, 
writeCtx);
-                writeCtx.startJob();
+                byte[] newValueBytes = onDataAppend(logIndex, valueBytes);
                 this.db.put(this.dataHandle, this.writeOptions, 
createKey(logIndex), newValueBytes);
-                writeCtx.joinAll();
                 if (newValueBytes != valueBytes) {
                     doSync();
                 }
                 return true;
-            } catch (RocksDBException | IOException e) {
+            } catch (RocksDBException e) {
                 LOG.error("Fail to append entry.", e);
                 return false;
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-                return false;
             } finally {
                 this.useLock.unlock();
             }
@@ -436,18 +435,14 @@ public class RocksDbSharedLogStorage implements 
LogStorage, Describer {
         int entriesCount = entries.size();
 
         boolean ret = executeBatch(batch -> {
-            WriteContext writeCtx = newWriteContext();
-
             for (LogEntry entry : entries) {
                 if (entry.getType() == 
EnumOutter.EntryType.ENTRY_TYPE_CONFIGURATION) {
                     addConfBatch(entry, batch);
                 } else {
-                    writeCtx.startJob();
-                    addDataBatch(entry, batch, writeCtx);
+                    addDataBatch(entry, batch);
                 }
             }
 
-            writeCtx.joinAll();
             doSync();
         });
 
@@ -458,6 +453,56 @@ public class RocksDbSharedLogStorage implements 
LogStorage, Describer {
         }
     }
 
+    /**
+     * Appends log entries to the batch, received from {@link 
DefaultLogStorageFactory#getOrCreateThreadLocalWriteBatch()}. This batch is
+     * shared between all instances of log, that belong to the given factory.
+     */
+    boolean appendEntriesToBatch(List<LogEntry> entries) {
+        if (entries == null || entries.isEmpty()) {
+            return true;
+        }
+
+        useLock.lock();
+
+        try {
+            WriteBatch writeBatch = 
logStorageFactory.getOrCreateThreadLocalWriteBatch();
+
+            for (LogEntry entry : entries) {
+                if (entry.getType() == 
EnumOutter.EntryType.ENTRY_TYPE_CONFIGURATION) {
+                    addConfBatch(entry, writeBatch);
+                } else {
+                    addDataBatch(entry, writeBatch);
+                }
+            }
+
+            return true;
+        } catch (RocksDBException e) {
+            LOG.error("Execute batch failed with rocksdb exception.", e);
+
+            return false;
+        } finally {
+            useLock.unlock();
+        }
+    }
+
+    /**
+     * Writes batch, previously filled by {@link #appendEntriesToBatch(List)} 
calls, into a rocksdb storage and clears the batch by calling
+     * {@link DefaultLogStorageFactory#clearThreadLocalWriteBatch()}.
+     */
+    void commitWriteBatch() {
+        try {
+            WriteBatch writeBatch = 
logStorageFactory.getOrCreateThreadLocalWriteBatch();
+
+            if (writeBatch.count() > 0) {
+                db.write(this.writeOptions, writeBatch);
+            }
+        } catch (RocksDBException e) {
+            LOG.error("Execute batch failed with rocksdb exception.", e);
+        } finally {
+            logStorageFactory.clearThreadLocalWriteBatch();
+        }
+    }
+
     /** {@inheritDoc} */
     @Override
     public boolean truncateSuffix(long lastIndexKept) {
@@ -573,11 +618,10 @@ public class RocksDbSharedLogStorage implements 
LogStorage, Describer {
         return true;
     }
 
-    private void addDataBatch(LogEntry entry, WriteBatch batch,
-            WriteContext ctx) throws RocksDBException, IOException, 
InterruptedException {
+    private void addDataBatch(LogEntry entry, WriteBatch batch) throws 
RocksDBException {
         long logIndex = entry.getId().getIndex();
         byte[] content = this.logEntryEncoder.encode(entry);
-        batch.put(this.dataHandle, createKey(logIndex), onDataAppend(logIndex, 
content, ctx));
+        batch.put(this.dataHandle, createKey(logIndex), onDataAppend(logIndex, 
content));
     }
 
     private void truncatePrefixInBackground(long startIndex, long 
firstIndexKept) {
@@ -628,14 +672,10 @@ public class RocksDbSharedLogStorage implements 
LogStorage, Describer {
         return ks;
     }
 
-    private void doSync() throws IOException, InterruptedException {
+    private void doSync() {
         onSync();
     }
 
-    protected WriteContext newWriteContext() {
-        return EmptyWriteContext.INSTANCE;
-    }
-
     /**
      * Called before appending data entry.
      *
@@ -644,8 +684,7 @@ public class RocksDbSharedLogStorage implements LogStorage, 
Describer {
      * @return the new value
      */
     @SuppressWarnings("unused")
-    protected byte[] onDataAppend(long logIndex, byte[] value, WriteContext 
ctx) throws IOException, InterruptedException {
-        ctx.finishJob();
+    protected byte[] onDataAppend(long logIndex, byte[] value) {
         return value;
     }
 
@@ -653,7 +692,7 @@ public class RocksDbSharedLogStorage implements LogStorage, 
Describer {
      * Called when sync data into file system.
      */
     @SuppressWarnings("RedundantThrows")
-    protected void onSync() throws IOException, InterruptedException {
+    protected void onSync() {
     }
 
     /**
@@ -700,45 +739,6 @@ public class RocksDbSharedLogStorage implements 
LogStorage, Describer {
         void execute(WriteBatch batch) throws RocksDBException, IOException, 
InterruptedException;
     }
 
-    /**
-     * Write context.
-     */
-    public interface WriteContext {
-        /**
-         * Start a sub job.
-         */
-        default void startJob() {
-        }
-
-        /**
-         * Finish a sub job.
-         */
-        default void finishJob() {
-        }
-
-        /**
-         * Adds a callback that will be invoked after all sub jobs finish.
-         */
-        @SuppressWarnings("unused")
-        default void addFinishHook(@SuppressWarnings("unused") Runnable r) {
-
-        }
-
-        /**
-         * Set an exception to context.
-         *
-         * @param e exception
-         */
-        default void setError(@SuppressWarnings("unused") Exception e) {
-        }
-
-        /**
-         * Wait for all sub jobs finish.
-         */
-        default void joinAll() throws InterruptedException, IOException {
-        }
-    }
-
     /** {@inheritDoc} */
     @Override
     public void describe(final Printer out) {
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/StripeAwareLogManager.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/StripeAwareLogManager.java
new file mode 100644
index 0000000000..4c7efa1ab5
--- /dev/null
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/StripeAwareLogManager.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft.storage.impl;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.raft.jraft.Status;
+import org.apache.ignite.raft.jraft.entity.LogEntry;
+import org.apache.ignite.raft.jraft.entity.LogId;
+import org.apache.ignite.raft.jraft.error.RaftError;
+import org.apache.ignite.raft.jraft.option.LogManagerOptions;
+import org.apache.ignite.raft.jraft.storage.LogStorage;
+import org.apache.ignite.raft.jraft.storage.impl.LogManagerImpl;
+
+/**
+ * Log manager that enables batch processing of log entries from different 
partitions within a stripe.
+ * <br>
+ * Upon each flush of the log, it will also trigger flush on all the other log 
storages, that belong to the same stripe in
+ * corresponding {@link LogManagerOptions#getLogManagerDisruptor()}.
+ */
+public class StripeAwareLogManager extends LogManagerImpl {
+    /** The logger. */
+    private static final IgniteLogger LOG = 
Loggers.forClass(StripeAwareLogManager.class);
+
+    /** Log storage instance. */
+    private LogStorage logStorage;
+
+    /** Stripe, that corresponds to the current log storage instance. */
+    private final Stripe stripe;
+
+    /** Size threshold of log entries list, that will trigger the flush upon 
the excess. */
+    private int maxAppendBufferSize;
+
+    /**
+     * Whether the log storage is a {@link RocksDbSharedLogStorage} or not.
+     * It requires special treatment in order to better optimize writes.
+     */
+    private boolean sharedLogStorage;
+
+    /**
+     * Constructor.
+     *
+     * @param stripe Stripe that corresponds to a worker thread in {@link 
LogManagerOptions#getLogManagerDisruptor()}.
+     */
+    public StripeAwareLogManager(Stripe stripe) {
+        this.stripe = stripe;
+    }
+
+    @Override
+    public boolean init(LogManagerOptions opts) {
+        LogStorage logStorage = opts.getLogStorage();
+
+        this.sharedLogStorage = logStorage instanceof RocksDbSharedLogStorage;
+        this.logStorage = logStorage;
+        this.maxAppendBufferSize = 
opts.getRaftOptions().getMaxAppendBufferSize();
+
+        return super.init(opts);
+    }
+
+    /**
+     * Regular append to shared storage has been replaced with appending data 
into a batch. Data will later be "committed" by calling
+     * {@link StripeAwareAppendBatcher#commitWriteBatch()} on any of the log 
instances.
+     * The reason why is given in {@link Stripe}'s comments.
+     */
+    @Override
+    protected int appendToLogStorage(List<LogEntry> toAppend) {
+        if (sharedLogStorage) {
+            return ((RocksDbSharedLogStorage) 
logStorage).appendEntriesToBatch(toAppend) ? toAppend.size() : 0;
+        } else {
+            return logStorage.appendEntries(toAppend);
+        }
+    }
+
+    @Override
+    protected AppendBatcher newAppendBatcher(List<StableClosure> storages, int 
cap, LogId diskId) {
+        return new StripeAwareAppendBatcher(storages, cap, diskId);
+    }
+
+    /**
+     * Append batcher implementation that triggers flush on all log storages, 
that belong to the same stripe.
+     */
+    private class StripeAwareAppendBatcher extends AppendBatcher {
+        StripeAwareAppendBatcher(List<StableClosure> storage, int cap, LogId 
lastId) {
+            super(storage, cap, new ArrayList<>(), lastId);
+        }
+
+        private LogId lastIdCandidate;
+
+        /**
+         * Flush is simply delegated to the {@link Stripe}.
+         */
+        @Override
+        protected LogId flush() {
+            stripe.flush();
+
+            // Last ID is already updated at this point.
+            return lastId;
+        }
+
+        /**
+         * Delegates to {@link LogManagerImpl#appendToStorage(List)}.
+         */
+        void appendToStorage() {
+            assert size > 0;
+
+            lastIdCandidate = 
StripeAwareLogManager.super.appendToStorage(toAppend);
+        }
+
+        /**
+         * Delegates to {@link RocksDbSharedLogStorage#commitWriteBatch()} if 
it can. No-op otherwise.
+         */
+        void commitWriteBatch() {
+            if (sharedLogStorage) {
+                ((RocksDbSharedLogStorage) logStorage).commitWriteBatch();
+            }
+        }
+
+        /**
+         * Delegates to {@link LogManagerImpl#reportError(int, String, 
Object...)}.
+         */
+        void reportError(int code, String fmt, Object... args) {
+            StripeAwareLogManager.super.reportError(code, fmt, args);
+        }
+
+        /**
+         * Notifies storage stable closures and clears the batch. Based on the 
code from {@link AppendBatcher#flush()}.
+         */
+        void notifyClosures() {
+            lastId = lastIdCandidate;
+
+            for (int i = 0; i < this.size; i++) {
+                this.storage.get(i).getEntries().clear();
+                Status st = null;
+                try {
+                    if (StripeAwareLogManager.super.hasError) {
+                        st = new Status(RaftError.EIO, "Corrupted LogStorage");
+                    } else {
+                        st = Status.OK();
+                    }
+                    this.storage.get(i).run(st);
+                } catch (Throwable t) {
+                    LOG.error("Fail to run closure with status: {}.", t, st);
+                }
+            }
+
+            toAppend.clear();
+            storage.clear();
+            size = 0;
+
+            setDiskId(lastId);
+        }
+
+        @Override
+        protected void append(StableClosure done) {
+            if (stripe.size >= cap || stripe.bufferSize >= 
maxAppendBufferSize) {
+                flush();
+            }
+
+            // "super.append(done);" will calculate the size of update entries 
and put that value into "bufferSize".
+            // We use it later to add to "stripe.bufferSize".
+            bufferSize = 0;
+            super.append(done);
+
+            stripe.addBatcher(this, bufferSize);
+        }
+    }
+
+    /**
+     * Special instance, shared between different instances of {@link 
StripeAwareLogManager}, that correspond to the same stripe in the
+     * {@link LogManagerOptions#getLogManagerDisruptor()}.
+     * <br>
+     * It accumulates data from different {@link AppendBatcher} instances, 
allowing to flush data from several log storages all at once.
+     * <br>
+     * Also supports batch log updates for {@link RocksDbSharedLogStorage}.
+     */
+    public static class Stripe {
+        /** Cumulative data size of all data entries, not yet flushed in this 
stripe. */
+        private int bufferSize;
+
+        /** The number of all entry lists added to all {@link AppendBatcher}s, 
not yet flushed in this stripe. */
+        private int size;
+
+        /** This list of all append batchers, that contain data not yet 
flushed by this stripe. */
+        private final Set<StripeAwareAppendBatcher> appendBatchers = new 
HashSet<>();
+
+        /**
+         * Notifies the stripe that there's a new append to one of the append 
batchers.
+         *
+         * @param appendBatcher Append batcher that had an append.
+         * @param bufferSize The buffer size of that append.
+         */
+        void addBatcher(StripeAwareAppendBatcher appendBatcher, int 
bufferSize) {
+            appendBatchers.add(appendBatcher);
+
+            size++;
+            this.bufferSize += bufferSize;
+        }
+
+        /**
+         * Performs an composite flush for all log storages that belong to the 
stripe.
+         */
+        void flush() {
+            if (size == 0) {
+                return;
+            }
+
+            // At first, all log storages should prepare the data by adding it 
to the write batch in the log storage factory.
+            for (StripeAwareAppendBatcher appendBatcher : appendBatchers) {
+                appendBatcher.appendToStorage();
+            }
+
+            if (!appendBatchers.isEmpty()) {
+                // Since the storage is shared, any batcher can flush it.
+                // This is a little confusing and hacky, but it doesn't 
require explicit access to the log storage factory,
+                // which makes it far easier to use in current jraft code.
+                // The reason why we don't call this method on log factory, 
for example, is because the factory doesn't have proper access
+                // to the RAFT configuration, and can't say, whether it should 
use "fsync" or not, for example.
+                try {
+                    appendBatchers.iterator().next().commitWriteBatch();
+                } catch (Exception e) {
+                    LOG.error("**Critical error**, failed to appendEntries.", 
e);
+
+                    for (StripeAwareAppendBatcher appendBatcher : 
appendBatchers) {
+                        appendBatcher.reportError(RaftError.EIO.getNumber(), 
"Fail to append log entries");
+                    }
+
+                    return;
+                }
+            }
+
+            // When data is committed, we can notify all stable closures and 
send response messages.
+            for (StripeAwareAppendBatcher appendBatcher : appendBatchers) {
+                appendBatcher.notifyClosures();
+            }
+
+            appendBatchers.clear();
+            size = 0;
+            bufferSize = 0;
+        }
+    }
+}
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
index 0ac43286e1..15cc2c0adb 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
@@ -16,6 +16,7 @@
  */
 package org.apache.ignite.raft.jraft.core;
 
+import static java.util.stream.Collectors.toList;
 import com.lmax.disruptor.EventHandler;
 import com.lmax.disruptor.EventTranslator;
 import com.lmax.disruptor.RingBuffer;
@@ -35,12 +36,16 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.raft.JraftGroupEventsListener;
 import org.apache.ignite.internal.raft.RaftNodeDisruptorConfiguration;
+import org.apache.ignite.internal.raft.storage.impl.RocksDbSharedLogStorage;
+import org.apache.ignite.internal.raft.storage.impl.StripeAwareLogManager;
+import 
org.apache.ignite.internal.raft.storage.impl.StripeAwareLogManager.Stripe;
 import org.apache.ignite.internal.thread.NamedThreadFactory;
 import org.apache.ignite.raft.jraft.Closure;
 import org.apache.ignite.raft.jraft.FSMCaller;
@@ -612,7 +617,8 @@ public class NodeImpl implements Node, RaftServerService {
     private boolean initLogStorage() {
         Requires.requireNonNull(this.fsmCaller, "Null fsm caller");
         this.logStorage = 
this.serviceFactory.createLogStorage(this.options.getLogUri(), 
this.raftOptions);
-        this.logManager = new LogManagerImpl();
+        int stripe = options.getLogManagerDisruptor().getStripe(getNodeId());
+        this.logManager = new 
StripeAwareLogManager(options.getLogStripes().get(stripe));
         final LogManagerOptions opts = new LogManagerOptions();
         
opts.setLogEntryCodecFactory(this.serviceFactory.createLogEntryCodecFactory());
         opts.setLogStorage(this.logStorage);
@@ -1279,7 +1285,9 @@ public class NodeImpl implements Node, RaftServerService {
                 NamedThreadFactory.threadPrefix(opts.getServerName(), 
"JRaft-FSMCaller-Disruptor"),
                 opts.getRaftOptions().getDisruptorBufferSize(),
                 () -> new FSMCallerImpl.ApplyTask(),
-                opts.getStripes()));
+                opts.getStripes(),
+                false
+            ));
         } else if (ownFsmCallerExecutorDisruptorConfig != null) {
             opts.setfSMCallerExecutorDisruptor(new 
StripedDisruptor<FSMCallerImpl.ApplyTask>(
                 NamedThreadFactory.threadPrefix(
@@ -1288,7 +1296,9 @@ public class NodeImpl implements Node, RaftServerService {
                 ),
                 opts.getRaftOptions().getDisruptorBufferSize(),
                 () -> new FSMCallerImpl.ApplyTask(),
-                ownFsmCallerExecutorDisruptorConfig.getStripes()));
+                ownFsmCallerExecutorDisruptorConfig.getStripes(),
+                false
+            ));
         }
 
         if (opts.getNodeApplyDisruptor() == null) {
@@ -1296,7 +1306,9 @@ public class NodeImpl implements Node, RaftServerService {
                 NamedThreadFactory.threadPrefix(opts.getServerName(), 
"JRaft-NodeImpl-Disruptor"),
                 opts.getRaftOptions().getDisruptorBufferSize(),
                 () -> new NodeImpl.LogEntryAndClosure(),
-                opts.getStripes()));
+                opts.getStripes(),
+                false
+            ));
         }
 
         if (opts.getReadOnlyServiceDisruptor() == null) {
@@ -1304,7 +1316,9 @@ public class NodeImpl implements Node, RaftServerService {
                 NamedThreadFactory.threadPrefix(opts.getServerName(), 
"JRaft-ReadOnlyService-Disruptor"),
                 opts.getRaftOptions().getDisruptorBufferSize(),
                 () -> new ReadOnlyServiceImpl.ReadIndexEvent(),
-                opts.getStripes()));
+                opts.getStripes(),
+                false
+            ));
         }
 
         if (opts.getLogManagerDisruptor() == null) {
@@ -1312,7 +1326,11 @@ public class NodeImpl implements Node, RaftServerService 
{
                 NamedThreadFactory.threadPrefix(opts.getServerName(), 
"JRaft-LogManager-Disruptor"),
                 opts.getRaftOptions().getDisruptorBufferSize(),
                 () -> new LogManagerImpl.StableClosureEvent(),
-                opts.getStripes()));
+                opts.getStripes(),
+                logStorage instanceof RocksDbSharedLogStorage
+            ));
+
+            opts.setLogStripes(IntStream.range(0, 
opts.getStripes()).mapToObj(i -> new Stripe()).collect(toList()));
         }
     }
 
@@ -3188,6 +3206,7 @@ public class NodeImpl implements Node, RaftServerService {
         if (opts.getLogManagerDisruptor() != null && !opts.isSharedPools()) {
             opts.getLogManagerDisruptor().shutdown();
             opts.setLogManagerDisruptor(null);
+            opts.setLogStripes(null);
         }
     }
 
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/disruptor/StripedDisruptor.java
 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/disruptor/StripedDisruptor.java
index db0fb63299..2910756206 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/disruptor/StripedDisruptor.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/disruptor/StripedDisruptor.java
@@ -61,19 +61,29 @@ public class StripedDisruptor<T extends NodeIdAware> {
     /** The Striped disruptor name. */
     private final String name;
 
+    /**
+     * If {@code false}, this stripe will always pass {@code true} into {@link 
EventHandler#onEvent(Object, long, boolean)}.
+     * Otherwise, the data will be provided with batches.
+     */
+    //TODO: IGNITE-15568 endOfBatch should be set to true to prevent caching 
tasks until IGNITE-15568 has fixed.
+    private final boolean supportsBatches;
+
     /**
      * @param name Name of the Striped disruptor.
      * @param bufferSize Buffer size for each Disruptor.
      * @param eventFactory Event factory for the Striped disruptor.
      * @param stripes Amount of stripes.
+     * @param supportsBatches If {@code false}, this stripe will always pass 
{@code true} into
+     *      {@link EventHandler#onEvent(Object, long, boolean)}. Otherwise, 
the data will be provided with batches.
      */
-    public StripedDisruptor(String name, int bufferSize, EventFactory<T> 
eventFactory, int stripes) {
+    public StripedDisruptor(String name, int bufferSize, EventFactory<T> 
eventFactory, int stripes, boolean supportsBatches) {
         disruptors = new Disruptor[stripes];
         queues = new RingBuffer[stripes];
         eventHandlers = new ArrayList<>(stripes);
         exceptionHandlers = new ArrayList<>(stripes);
         this.stripes = stripes;
         this.name = name;
+        this.supportsBatches = supportsBatches;
 
         for (int i = 0; i < stripes; i++) {
             String stripeName = format("{}_stripe_{}-", name, i);
@@ -160,7 +170,7 @@ public class StripedDisruptor<T extends NodeIdAware> {
      * @param nodeId Node id.
      * @return Stripe of the Striped disruptor.
      */
-    private int getStripe(NodeId nodeId) {
+    public int getStripe(NodeId nodeId) {
         return Math.abs(nodeId.hashCode() % stripes);
     }
 
@@ -213,8 +223,7 @@ public class StripedDisruptor<T extends NodeIdAware> {
 
             assert handler != null : format("Group of the event is unsupported 
[nodeId={}, event={}]", event.nodeId(), event);
 
-            //TODO: IGNITE-15568 endOfBatch should be set to true to prevent 
caching tasks until IGNITE-15568 has fixed.
-            handler.onEvent(event, sequence, subscribers.size() > 1 ? true : 
endOfBatch);
+            handler.onEvent(event, sequence, endOfBatch || subscribers.size() 
> 1 && !supportsBatches);
         }
     }
 
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
index e9a051512a..678dc61766 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
@@ -17,11 +17,14 @@
 package org.apache.ignite.raft.jraft.option;
 
 import java.util.List;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.raft.JraftGroupEventsListener;
+import org.apache.ignite.internal.raft.storage.impl.StripeAwareLogManager;
+import 
org.apache.ignite.internal.raft.storage.impl.StripeAwareLogManager.Stripe;
 import org.apache.ignite.raft.jraft.JRaftServiceFactory;
 import org.apache.ignite.raft.jraft.StateMachine;
 import org.apache.ignite.raft.jraft.conf.Configuration;
@@ -247,6 +250,9 @@ public class NodeOptions extends RpcOptions implements 
Copiable<NodeOptions> {
     /** */
     private boolean sharedPools = false;
 
+    /** */
+    private List<Stripe> logStripes;
+
     public NodeOptions() {
         raftOptions.setRaftMessagesFactory(getRaftMessagesFactory());
     }
@@ -592,6 +598,14 @@ public class NodeOptions extends RpcOptions implements 
Copiable<NodeOptions> {
         this.logManagerDisruptor = logManagerDisruptor;
     }
 
+    public void setLogStripes(List<Stripe> logStripes) {
+        this.logStripes = logStripes;
+    }
+
+    public List<Stripe> getLogStripes() {
+        return logStripes;
+    }
+
     public HybridClock getClock() {
         return clock;
     }
@@ -627,6 +641,7 @@ public class NodeOptions extends RpcOptions implements 
Copiable<NodeOptions> {
         
nodeOptions.setfSMCallerExecutorDisruptor(this.getfSMCallerExecutorDisruptor());
         
nodeOptions.setReadOnlyServiceDisruptor(this.getReadOnlyServiceDisruptor());
         nodeOptions.setLogManagerDisruptor(this.getLogManagerDisruptor());
+        nodeOptions.setLogStripes(this.getLogStripes());
         nodeOptions.setElectionTimer(this.getElectionTimer());
         nodeOptions.setVoteTimer(this.getVoteTimer());
         nodeOptions.setSnapshotTimer(this.getSnapshotTimer());
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerImpl.java
 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerImpl.java
index 906b573fa7..a6363108bb 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerImpl.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerImpl.java
@@ -79,7 +79,7 @@ public class LogManagerImpl implements LogManager {
     private final Lock writeLock = this.lock.writeLock();
     private final Lock readLock = this.lock.readLock();
     private volatile boolean stopped;
-    private volatile boolean hasError;
+    protected volatile boolean hasError;
     private long nextWaitId;
     private LogId diskId = new LogId(0, 0); // Last log entry written to disk.
     private LogId appliedId = new LogId(0, 0);
@@ -409,7 +409,7 @@ public class LogManagerImpl implements LogManager {
         return true;
     }
 
-    private LogId appendToStorage(final List<LogEntry> toAppend) {
+    protected LogId appendToStorage(final List<LogEntry> toAppend) {
         LogId lastId = null;
         if (!this.hasError) {
             final long startMs = Utils.monotonicMs();
@@ -422,7 +422,7 @@ public class LogManagerImpl implements LogManager {
                     writtenSize += entry.getData() != null ? 
entry.getData().remaining() : 0;
                 }
                 this.nodeMetrics.recordSize("append-logs-bytes", writtenSize);
-                final int nAppent = this.logStorage.appendEntries(toAppend);
+                final int nAppent = appendToLogStorage(toAppend);
                 if (nAppent != entriesCount) {
                     LOG.error("**Critical error**, fail to appendEntries, 
nAppent={}, toAppend={}", nAppent,
                         toAppend.size());
@@ -431,7 +431,6 @@ public class LogManagerImpl implements LogManager {
                 if (nAppent > 0) {
                     lastId = toAppend.get(nAppent - 1).getId();
                 }
-                toAppend.clear();
             }
             finally {
                 this.nodeMetrics.recordLatency("append-logs", 
Utils.monotonicMs() - startMs);
@@ -440,15 +439,19 @@ public class LogManagerImpl implements LogManager {
         return lastId;
     }
 
-    private class AppendBatcher {
-        List<StableClosure> storage;
-        int cap;
-        int size;
-        int bufferSize;
-        List<LogEntry> toAppend;
-        LogId lastId;
+    protected int appendToLogStorage(List<LogEntry> toAppend) {
+        return this.logStorage.appendEntries(toAppend);
+    }
+
+    protected class AppendBatcher {
+        protected final List<StableClosure> storage;
+        protected final int cap;
+        protected int size;
+        protected int bufferSize;
+        protected final List<LogEntry> toAppend;
+        protected LogId lastId;
 
-        AppendBatcher(final List<StableClosure> storage, final int cap, final 
List<LogEntry> toAppend,
+        protected AppendBatcher(final List<StableClosure> storage, final int 
cap, final List<LogEntry> toAppend,
             final LogId lastId) {
             super();
             this.storage = storage;
@@ -457,7 +460,7 @@ public class LogManagerImpl implements LogManager {
             this.lastId = lastId;
         }
 
-        LogId flush() {
+        protected LogId flush() {
             if (this.size > 0) {
                 this.lastId = appendToStorage(this.toAppend);
                 for (int i = 0; i < this.size; i++) {
@@ -485,7 +488,7 @@ public class LogManagerImpl implements LogManager {
             return this.lastId;
         }
 
-        void append(final StableClosure done) {
+        protected void append(final StableClosure done) {
             if (this.size == this.cap || this.bufferSize >= 
LogManagerImpl.this.raftOptions.getMaxAppendBufferSize()) {
                 flush();
             }
@@ -498,10 +501,14 @@ public class LogManagerImpl implements LogManager {
         }
     }
 
+    protected AppendBatcher newAppendBatcher(List<StableClosure> storages, int 
cap, LogId diskId) {
+        return new AppendBatcher(storages, cap, new ArrayList<>(), diskId);
+    }
+
     private class StableClosureEventHandler implements 
EventHandler<StableClosureEvent> {
         LogId lastId = LogManagerImpl.this.diskId;
         List<StableClosure> storage = new ArrayList<>(256);
-        AppendBatcher ab = new AppendBatcher(this.storage, 256, new 
ArrayList<>(), LogManagerImpl.this.diskId);
+        AppendBatcher ab = newAppendBatcher(this.storage, 256, 
LogManagerImpl.this.diskId);
 
         @Override
         public void onEvent(final StableClosureEvent event, final long 
sequence, final boolean endOfBatch)
@@ -523,6 +530,8 @@ public class LogManagerImpl implements LogManager {
             }
             else {
                 this.lastId = this.ab.flush();
+                setDiskId(this.lastId);
+
                 boolean ret = true;
                 switch (eventType) {
                     case LAST_LOG_ID:
@@ -582,14 +591,14 @@ public class LogManagerImpl implements LogManager {
 
     }
 
-    private void reportError(final int code, final String fmt, final Object... 
args) {
+    protected void reportError(final int code, final String fmt, final 
Object... args) {
         this.hasError = true;
         final RaftException error = new 
RaftException(ErrorType.ERROR_TYPE_LOG);
         error.setStatus(new Status(code, fmt, args));
         this.fsmCaller.onError(error);
     }
 
-    private void setDiskId(final LogId id) {
+    protected void setDiskId(final LogId id) {
         if (id == null) {
             return;
         }
diff --git 
a/modules/raft/src/test/java/org/apache/ignite/disruptor/StripedDisruptorTest.java
 
b/modules/raft/src/test/java/org/apache/ignite/disruptor/StripedDisruptorTest.java
index f1e03bef70..58a487fa04 100644
--- 
a/modules/raft/src/test/java/org/apache/ignite/disruptor/StripedDisruptorTest.java
+++ 
b/modules/raft/src/test/java/org/apache/ignite/disruptor/StripedDisruptorTest.java
@@ -47,7 +47,8 @@ public class StripedDisruptorTest extends IgniteAbstractTest {
         StripedDisruptor<NodeIdAwareTestObj> disruptor = new 
StripedDisruptor<>("test-disruptor",
                 16384,
                 NodeIdAwareTestObj::new,
-                1);
+                1,
+                false);
 
         var nodeId1 = new NodeId("grp1", new PeerId("foo"));
         var nodeId2 = new NodeId("grp2", new PeerId("foo"));
@@ -96,7 +97,8 @@ public class StripedDisruptorTest extends IgniteAbstractTest {
         StripedDisruptor<NodeIdAwareTestObj> disruptor = new 
StripedDisruptor<>("test-disruptor",
                 16384,
                 NodeIdAwareTestObj::new,
-                5);
+                5,
+                false);
 
         GroupAwareTestObjHandler handler = new GroupAwareTestObjHandler();
 
diff --git 
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/FSMCallerTest.java
 
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/FSMCallerTest.java
index eadd7c5900..a1bd084ef6 100644
--- 
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/FSMCallerTest.java
+++ 
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/FSMCallerTest.java
@@ -100,7 +100,8 @@ public class FSMCallerTest {
         opts.setfSMCallerExecutorDisruptor(disruptor = new 
StripedDisruptor<>("TestFSMDisruptor",
             1024,
             () -> new FSMCallerImpl.ApplyTask(),
-            1));
+            1,
+            false));
         assertTrue(this.fsmCaller.init(opts));
     }
 
diff --git 
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/ReadOnlyServiceTest.java
 
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/ReadOnlyServiceTest.java
index 818bc6526e..d1d64718aa 100644
--- 
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/ReadOnlyServiceTest.java
+++ 
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/ReadOnlyServiceTest.java
@@ -90,7 +90,8 @@ public class ReadOnlyServiceTest {
         opts.setReadOnlyServiceDisruptor(disruptor = new 
StripedDisruptor<>("TestReadOnlyServiceDisruptor",
             1024,
             () -> new ReadOnlyServiceImpl.ReadIndexEvent(),
-            1));
+            1,
+            false));
         NodeOptions nodeOptions = new NodeOptions();
         ExecutorService executor = JRaftUtils.createExecutor("test-executor", 
Utils.cpus());
         executors.add(executor);
diff --git 
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerTest.java
 
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerTest.java
index 2ee03c1b8b..bbc882da10 100644
--- 
a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerTest.java
+++ 
b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerTest.java
@@ -104,7 +104,8 @@ public class LogManagerTest extends BaseStorageTest {
         opts.setLogManagerDisruptor(disruptor = new 
StripedDisruptor<>("TestLogManagerDisruptor",
             1024,
             () -> new LogManagerImpl.StableClosureEvent(),
-            1));
+            1,
+            false));
         assertTrue(this.logManager.init(opts));
     }
 
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItRaftCommandLeftInLogUntilRestartTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItRaftCommandLeftInLogUntilRestartTest.java
index 21b4f669bc..968a06742b 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItRaftCommandLeftInLogUntilRestartTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItRaftCommandLeftInLogUntilRestartTest.java
@@ -224,7 +224,8 @@ public class ItRaftCommandLeftInLogUntilRestartTest extends 
ClusterPerClassInteg
                 NamedThreadFactory.threadPrefix(node.name() + "-test", 
"JRaft-FSMCaller-Disruptor"),
                 64,
                 () -> new ApplyTask(),
-                1
+                1,
+                false
         ) {
             @Override
             public RingBuffer<ApplyTask> subscribe(NodeId group, 
EventHandler<ApplyTask> handler,


Reply via email to