This is an automated email from the ASF dual-hosted git repository. ibessonov pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 61864e4634 IGNITE-19946 Implement batch log updates for RAFT. (#2273) 61864e4634 is described below commit 61864e46340478afb0faf13cce28b509df68cfe1 Author: Ivan Bessonov <bessonov...@gmail.com> AuthorDate: Tue Jul 11 12:59:39 2023 +0300 IGNITE-19946 Implement batch log updates for RAFT. (#2273) --- .../internal/raft/server/impl/JraftServerImpl.java | 20 +- .../storage/impl/DefaultLogStorageFactory.java | 43 +++- .../raft/storage/impl/RocksDbSharedLogStorage.java | 140 +++++------ .../raft/storage/impl/StripeAwareLogManager.java | 260 +++++++++++++++++++++ .../apache/ignite/raft/jraft/core/NodeImpl.java | 31 ++- .../raft/jraft/disruptor/StripedDisruptor.java | 17 +- .../ignite/raft/jraft/option/NodeOptions.java | 17 +- .../raft/jraft/storage/impl/LogManagerImpl.java | 43 ++-- .../ignite/disruptor/StripedDisruptorTest.java | 6 +- .../ignite/raft/jraft/core/FSMCallerTest.java | 3 +- .../raft/jraft/core/ReadOnlyServiceTest.java | 3 +- .../raft/jraft/storage/impl/LogManagerTest.java | 3 +- .../ItRaftCommandLeftInLogUntilRestartTest.java | 3 +- 13 files changed, 478 insertions(+), 111 deletions(-) diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java index ce3285c841..7619821e8d 100644 --- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java +++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java @@ -37,6 +37,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import java.util.function.BiPredicate; +import java.util.stream.IntStream; import org.apache.ignite.internal.raft.Peer; import org.apache.ignite.internal.raft.PeersAndLearners; import org.apache.ignite.internal.raft.RaftGroupEventsListener; @@ -50,6 +51,7 @@ import org.apache.ignite.internal.raft.service.RaftGroupListener; import org.apache.ignite.internal.raft.storage.LogStorageFactory; import org.apache.ignite.internal.raft.storage.impl.DefaultLogStorageFactory; import org.apache.ignite.internal.raft.storage.impl.IgniteJraftServiceFactory; +import org.apache.ignite.internal.raft.storage.impl.StripeAwareLogManager.Stripe; import org.apache.ignite.internal.raft.util.ThreadLocalOptimizedMarshaller; import org.apache.ignite.internal.replicator.ReplicationGroupId; import org.apache.ignite.internal.thread.NamedThreadFactory; @@ -245,7 +247,9 @@ public class JraftServerImpl implements RaftServer { NamedThreadFactory.threadPrefix(opts.getServerName(), "JRaft-FSMCaller-Disruptor"), opts.getRaftOptions().getDisruptorBufferSize(), ApplyTask::new, - opts.getStripes())); + opts.getStripes(), + false + )); } if (opts.getNodeApplyDisruptor() == null) { @@ -253,7 +257,9 @@ public class JraftServerImpl implements RaftServer { NamedThreadFactory.threadPrefix(opts.getServerName(), "JRaft-NodeImpl-Disruptor"), opts.getRaftOptions().getDisruptorBufferSize(), LogEntryAndClosure::new, - opts.getStripes())); + opts.getStripes(), + false + )); } if (opts.getReadOnlyServiceDisruptor() == null) { @@ -261,7 +267,9 @@ public class JraftServerImpl implements RaftServer { NamedThreadFactory.threadPrefix(opts.getServerName(), "JRaft-ReadOnlyService-Disruptor"), opts.getRaftOptions().getDisruptorBufferSize(), ReadIndexEvent::new, - opts.getStripes())); + opts.getStripes(), + false + )); } if (opts.getLogManagerDisruptor() == null) { @@ -269,7 +277,11 @@ public class JraftServerImpl implements RaftServer { NamedThreadFactory.threadPrefix(opts.getServerName(), "JRaft-LogManager-Disruptor"), opts.getRaftOptions().getDisruptorBufferSize(), StableClosureEvent::new, - opts.getStripes())); + opts.getStripes(), + true + )); + + opts.setLogStripes(IntStream.range(0, opts.getStripes()).mapToObj(i -> new Stripe()).collect(toList())); } logStorageFactory.start(); diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/DefaultLogStorageFactory.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/DefaultLogStorageFactory.java index f4b62d10b3..7558b2aafb 100644 --- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/DefaultLogStorageFactory.java +++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/DefaultLogStorageFactory.java @@ -45,6 +45,7 @@ import org.rocksdb.DBOptions; import org.rocksdb.Env; import org.rocksdb.Priority; import org.rocksdb.RocksDB; +import org.rocksdb.WriteBatch; import org.rocksdb.util.SizeUnit; /** Implementation of the {@link LogStorageFactory} that creates {@link RocksDbSharedLogStorage}s. */ @@ -69,6 +70,15 @@ public class DefaultLogStorageFactory implements LogStorageFactory { /** Data column family handle. */ private ColumnFamilyHandle dataHandle; + /** + * Thread-local batch instance, used by {@link RocksDbSharedLogStorage#appendEntriesToBatch(List)} and + * {@link RocksDbSharedLogStorage#commitWriteBatch()}. + * <br> + * Shared between instances to provide more efficient way of executing batch updates. + */ + @SuppressWarnings("ThreadLocalNotStaticFinal") + private final ThreadLocal<WriteBatch> threadLocalWriteBatch = new ThreadLocal<>(); + /** * Constructor. * @@ -77,8 +87,7 @@ public class DefaultLogStorageFactory implements LogStorageFactory { public DefaultLogStorageFactory(Path path) { this.path = path; - executorService = Executors.newFixedThreadPool( - Runtime.getRuntime().availableProcessors() * 2, + executorService = Executors.newSingleThreadExecutor( new NamedThreadFactory("raft-shared-log-storage-pool", LOG) ); } @@ -136,7 +145,35 @@ public class DefaultLogStorageFactory implements LogStorageFactory { /** {@inheritDoc} */ @Override public LogStorage createLogStorage(String groupId, RaftOptions raftOptions) { - return new RocksDbSharedLogStorage(db, confHandle, dataHandle, groupId, raftOptions, executorService); + return new RocksDbSharedLogStorage(this, db, confHandle, dataHandle, groupId, raftOptions, executorService); + } + + /** + * Returns a thread-local {@link WriteBatch} instance, attached to current factory, append data from multiple storages at the same time. + */ + WriteBatch getOrCreateThreadLocalWriteBatch() { + WriteBatch writeBatch = threadLocalWriteBatch.get(); + + if (writeBatch == null) { + writeBatch = new WriteBatch(); + + threadLocalWriteBatch.set(writeBatch); + } + + return writeBatch; + } + + /** + * Clears {@link WriteBatch} returned by {@link #getOrCreateThreadLocalWriteBatch()}. + */ + void clearThreadLocalWriteBatch() { + WriteBatch writeBatch = threadLocalWriteBatch.get(); + + if (writeBatch != null) { + writeBatch.close(); + + threadLocalWriteBatch.set(null); + } } /** diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/RocksDbSharedLogStorage.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/RocksDbSharedLogStorage.java index 15f3f7b5a2..8d989f1596 100644 --- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/RocksDbSharedLogStorage.java +++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/RocksDbSharedLogStorage.java @@ -68,13 +68,6 @@ public class RocksDbSharedLogStorage implements LogStorage, Describer { RocksDB.loadLibrary(); } - /** - * An empty write context. - */ - private static class EmptyWriteContext implements WriteContext { - static EmptyWriteContext INSTANCE = new EmptyWriteContext(); - } - /** * VarHandle that gives the access to the elements of a {@code byte[]} array viewed as if it was a {@code long[]} * array. @@ -89,6 +82,9 @@ public class RocksDbSharedLogStorage implements LogStorage, Describer { */ private static final byte[] FIRST_LOG_IDX_KEY = Utils.getBytes("meta/firstLogIndex"); + /** Log factory instance, that created current log storage. */ + private final DefaultLogStorageFactory logStorageFactory; + /** Shared db instance. */ private final RocksDB db; @@ -142,6 +138,7 @@ public class RocksDbSharedLogStorage implements LogStorage, Describer { /** Constructor. */ RocksDbSharedLogStorage( + DefaultLogStorageFactory logStorageFactory, RocksDB db, ColumnFamilyHandle confHandle, ColumnFamilyHandle dataHandle, @@ -163,6 +160,7 @@ public class RocksDbSharedLogStorage implements LogStorage, Describer { "Raft group id " + groupId + " must not contain char(1)" ); + this.logStorageFactory = logStorageFactory; this.db = db; this.confHandle = confHandle; this.dataHandle = dataHandle; @@ -176,6 +174,13 @@ public class RocksDbSharedLogStorage implements LogStorage, Describer { this.writeOptions.setSync(raftOptions.isSync()); } + /** + * Returns the log factory instance, that created current log storage. + */ + DefaultLogStorageFactory getLogStorageFactory() { + return logStorageFactory; + } + /** {@inheritDoc} */ @Override public boolean init(LogStorageOptions opts) { @@ -403,23 +408,17 @@ public class RocksDbSharedLogStorage implements LogStorage, Describer { return false; } - WriteContext writeCtx = newWriteContext(); long logIndex = entry.getId().getIndex(); byte[] valueBytes = this.logEntryEncoder.encode(entry); - byte[] newValueBytes = onDataAppend(logIndex, valueBytes, writeCtx); - writeCtx.startJob(); + byte[] newValueBytes = onDataAppend(logIndex, valueBytes); this.db.put(this.dataHandle, this.writeOptions, createKey(logIndex), newValueBytes); - writeCtx.joinAll(); if (newValueBytes != valueBytes) { doSync(); } return true; - } catch (RocksDBException | IOException e) { + } catch (RocksDBException e) { LOG.error("Fail to append entry.", e); return false; - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - return false; } finally { this.useLock.unlock(); } @@ -436,18 +435,14 @@ public class RocksDbSharedLogStorage implements LogStorage, Describer { int entriesCount = entries.size(); boolean ret = executeBatch(batch -> { - WriteContext writeCtx = newWriteContext(); - for (LogEntry entry : entries) { if (entry.getType() == EnumOutter.EntryType.ENTRY_TYPE_CONFIGURATION) { addConfBatch(entry, batch); } else { - writeCtx.startJob(); - addDataBatch(entry, batch, writeCtx); + addDataBatch(entry, batch); } } - writeCtx.joinAll(); doSync(); }); @@ -458,6 +453,56 @@ public class RocksDbSharedLogStorage implements LogStorage, Describer { } } + /** + * Appends log entries to the batch, received from {@link DefaultLogStorageFactory#getOrCreateThreadLocalWriteBatch()}. This batch is + * shared between all instances of log, that belong to the given factory. + */ + boolean appendEntriesToBatch(List<LogEntry> entries) { + if (entries == null || entries.isEmpty()) { + return true; + } + + useLock.lock(); + + try { + WriteBatch writeBatch = logStorageFactory.getOrCreateThreadLocalWriteBatch(); + + for (LogEntry entry : entries) { + if (entry.getType() == EnumOutter.EntryType.ENTRY_TYPE_CONFIGURATION) { + addConfBatch(entry, writeBatch); + } else { + addDataBatch(entry, writeBatch); + } + } + + return true; + } catch (RocksDBException e) { + LOG.error("Execute batch failed with rocksdb exception.", e); + + return false; + } finally { + useLock.unlock(); + } + } + + /** + * Writes batch, previously filled by {@link #appendEntriesToBatch(List)} calls, into a rocksdb storage and clears the batch by calling + * {@link DefaultLogStorageFactory#clearThreadLocalWriteBatch()}. + */ + void commitWriteBatch() { + try { + WriteBatch writeBatch = logStorageFactory.getOrCreateThreadLocalWriteBatch(); + + if (writeBatch.count() > 0) { + db.write(this.writeOptions, writeBatch); + } + } catch (RocksDBException e) { + LOG.error("Execute batch failed with rocksdb exception.", e); + } finally { + logStorageFactory.clearThreadLocalWriteBatch(); + } + } + /** {@inheritDoc} */ @Override public boolean truncateSuffix(long lastIndexKept) { @@ -573,11 +618,10 @@ public class RocksDbSharedLogStorage implements LogStorage, Describer { return true; } - private void addDataBatch(LogEntry entry, WriteBatch batch, - WriteContext ctx) throws RocksDBException, IOException, InterruptedException { + private void addDataBatch(LogEntry entry, WriteBatch batch) throws RocksDBException { long logIndex = entry.getId().getIndex(); byte[] content = this.logEntryEncoder.encode(entry); - batch.put(this.dataHandle, createKey(logIndex), onDataAppend(logIndex, content, ctx)); + batch.put(this.dataHandle, createKey(logIndex), onDataAppend(logIndex, content)); } private void truncatePrefixInBackground(long startIndex, long firstIndexKept) { @@ -628,14 +672,10 @@ public class RocksDbSharedLogStorage implements LogStorage, Describer { return ks; } - private void doSync() throws IOException, InterruptedException { + private void doSync() { onSync(); } - protected WriteContext newWriteContext() { - return EmptyWriteContext.INSTANCE; - } - /** * Called before appending data entry. * @@ -644,8 +684,7 @@ public class RocksDbSharedLogStorage implements LogStorage, Describer { * @return the new value */ @SuppressWarnings("unused") - protected byte[] onDataAppend(long logIndex, byte[] value, WriteContext ctx) throws IOException, InterruptedException { - ctx.finishJob(); + protected byte[] onDataAppend(long logIndex, byte[] value) { return value; } @@ -653,7 +692,7 @@ public class RocksDbSharedLogStorage implements LogStorage, Describer { * Called when sync data into file system. */ @SuppressWarnings("RedundantThrows") - protected void onSync() throws IOException, InterruptedException { + protected void onSync() { } /** @@ -700,45 +739,6 @@ public class RocksDbSharedLogStorage implements LogStorage, Describer { void execute(WriteBatch batch) throws RocksDBException, IOException, InterruptedException; } - /** - * Write context. - */ - public interface WriteContext { - /** - * Start a sub job. - */ - default void startJob() { - } - - /** - * Finish a sub job. - */ - default void finishJob() { - } - - /** - * Adds a callback that will be invoked after all sub jobs finish. - */ - @SuppressWarnings("unused") - default void addFinishHook(@SuppressWarnings("unused") Runnable r) { - - } - - /** - * Set an exception to context. - * - * @param e exception - */ - default void setError(@SuppressWarnings("unused") Exception e) { - } - - /** - * Wait for all sub jobs finish. - */ - default void joinAll() throws InterruptedException, IOException { - } - } - /** {@inheritDoc} */ @Override public void describe(final Printer out) { diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/StripeAwareLogManager.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/StripeAwareLogManager.java new file mode 100644 index 0000000000..4c7efa1ab5 --- /dev/null +++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/StripeAwareLogManager.java @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.raft.storage.impl; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import org.apache.ignite.internal.logger.IgniteLogger; +import org.apache.ignite.internal.logger.Loggers; +import org.apache.ignite.raft.jraft.Status; +import org.apache.ignite.raft.jraft.entity.LogEntry; +import org.apache.ignite.raft.jraft.entity.LogId; +import org.apache.ignite.raft.jraft.error.RaftError; +import org.apache.ignite.raft.jraft.option.LogManagerOptions; +import org.apache.ignite.raft.jraft.storage.LogStorage; +import org.apache.ignite.raft.jraft.storage.impl.LogManagerImpl; + +/** + * Log manager that enables batch processing of log entries from different partitions within a stripe. + * <br> + * Upon each flush of the log, it will also trigger flush on all the other log storages, that belong to the same stripe in + * corresponding {@link LogManagerOptions#getLogManagerDisruptor()}. + */ +public class StripeAwareLogManager extends LogManagerImpl { + /** The logger. */ + private static final IgniteLogger LOG = Loggers.forClass(StripeAwareLogManager.class); + + /** Log storage instance. */ + private LogStorage logStorage; + + /** Stripe, that corresponds to the current log storage instance. */ + private final Stripe stripe; + + /** Size threshold of log entries list, that will trigger the flush upon the excess. */ + private int maxAppendBufferSize; + + /** + * Whether the log storage is a {@link RocksDbSharedLogStorage} or not. + * It requires special treatment in order to better optimize writes. + */ + private boolean sharedLogStorage; + + /** + * Constructor. + * + * @param stripe Stripe that corresponds to a worker thread in {@link LogManagerOptions#getLogManagerDisruptor()}. + */ + public StripeAwareLogManager(Stripe stripe) { + this.stripe = stripe; + } + + @Override + public boolean init(LogManagerOptions opts) { + LogStorage logStorage = opts.getLogStorage(); + + this.sharedLogStorage = logStorage instanceof RocksDbSharedLogStorage; + this.logStorage = logStorage; + this.maxAppendBufferSize = opts.getRaftOptions().getMaxAppendBufferSize(); + + return super.init(opts); + } + + /** + * Regular append to shared storage has been replaced with appending data into a batch. Data will later be "committed" by calling + * {@link StripeAwareAppendBatcher#commitWriteBatch()} on any of the log instances. + * The reason why is given in {@link Stripe}'s comments. + */ + @Override + protected int appendToLogStorage(List<LogEntry> toAppend) { + if (sharedLogStorage) { + return ((RocksDbSharedLogStorage) logStorage).appendEntriesToBatch(toAppend) ? toAppend.size() : 0; + } else { + return logStorage.appendEntries(toAppend); + } + } + + @Override + protected AppendBatcher newAppendBatcher(List<StableClosure> storages, int cap, LogId diskId) { + return new StripeAwareAppendBatcher(storages, cap, diskId); + } + + /** + * Append batcher implementation that triggers flush on all log storages, that belong to the same stripe. + */ + private class StripeAwareAppendBatcher extends AppendBatcher { + StripeAwareAppendBatcher(List<StableClosure> storage, int cap, LogId lastId) { + super(storage, cap, new ArrayList<>(), lastId); + } + + private LogId lastIdCandidate; + + /** + * Flush is simply delegated to the {@link Stripe}. + */ + @Override + protected LogId flush() { + stripe.flush(); + + // Last ID is already updated at this point. + return lastId; + } + + /** + * Delegates to {@link LogManagerImpl#appendToStorage(List)}. + */ + void appendToStorage() { + assert size > 0; + + lastIdCandidate = StripeAwareLogManager.super.appendToStorage(toAppend); + } + + /** + * Delegates to {@link RocksDbSharedLogStorage#commitWriteBatch()} if it can. No-op otherwise. + */ + void commitWriteBatch() { + if (sharedLogStorage) { + ((RocksDbSharedLogStorage) logStorage).commitWriteBatch(); + } + } + + /** + * Delegates to {@link LogManagerImpl#reportError(int, String, Object...)}. + */ + void reportError(int code, String fmt, Object... args) { + StripeAwareLogManager.super.reportError(code, fmt, args); + } + + /** + * Notifies storage stable closures and clears the batch. Based on the code from {@link AppendBatcher#flush()}. + */ + void notifyClosures() { + lastId = lastIdCandidate; + + for (int i = 0; i < this.size; i++) { + this.storage.get(i).getEntries().clear(); + Status st = null; + try { + if (StripeAwareLogManager.super.hasError) { + st = new Status(RaftError.EIO, "Corrupted LogStorage"); + } else { + st = Status.OK(); + } + this.storage.get(i).run(st); + } catch (Throwable t) { + LOG.error("Fail to run closure with status: {}.", t, st); + } + } + + toAppend.clear(); + storage.clear(); + size = 0; + + setDiskId(lastId); + } + + @Override + protected void append(StableClosure done) { + if (stripe.size >= cap || stripe.bufferSize >= maxAppendBufferSize) { + flush(); + } + + // "super.append(done);" will calculate the size of update entries and put that value into "bufferSize". + // We use it later to add to "stripe.bufferSize". + bufferSize = 0; + super.append(done); + + stripe.addBatcher(this, bufferSize); + } + } + + /** + * Special instance, shared between different instances of {@link StripeAwareLogManager}, that correspond to the same stripe in the + * {@link LogManagerOptions#getLogManagerDisruptor()}. + * <br> + * It accumulates data from different {@link AppendBatcher} instances, allowing to flush data from several log storages all at once. + * <br> + * Also supports batch log updates for {@link RocksDbSharedLogStorage}. + */ + public static class Stripe { + /** Cumulative data size of all data entries, not yet flushed in this stripe. */ + private int bufferSize; + + /** The number of all entry lists added to all {@link AppendBatcher}s, not yet flushed in this stripe. */ + private int size; + + /** This list of all append batchers, that contain data not yet flushed by this stripe. */ + private final Set<StripeAwareAppendBatcher> appendBatchers = new HashSet<>(); + + /** + * Notifies the stripe that there's a new append to one of the append batchers. + * + * @param appendBatcher Append batcher that had an append. + * @param bufferSize The buffer size of that append. + */ + void addBatcher(StripeAwareAppendBatcher appendBatcher, int bufferSize) { + appendBatchers.add(appendBatcher); + + size++; + this.bufferSize += bufferSize; + } + + /** + * Performs an composite flush for all log storages that belong to the stripe. + */ + void flush() { + if (size == 0) { + return; + } + + // At first, all log storages should prepare the data by adding it to the write batch in the log storage factory. + for (StripeAwareAppendBatcher appendBatcher : appendBatchers) { + appendBatcher.appendToStorage(); + } + + if (!appendBatchers.isEmpty()) { + // Since the storage is shared, any batcher can flush it. + // This is a little confusing and hacky, but it doesn't require explicit access to the log storage factory, + // which makes it far easier to use in current jraft code. + // The reason why we don't call this method on log factory, for example, is because the factory doesn't have proper access + // to the RAFT configuration, and can't say, whether it should use "fsync" or not, for example. + try { + appendBatchers.iterator().next().commitWriteBatch(); + } catch (Exception e) { + LOG.error("**Critical error**, failed to appendEntries.", e); + + for (StripeAwareAppendBatcher appendBatcher : appendBatchers) { + appendBatcher.reportError(RaftError.EIO.getNumber(), "Fail to append log entries"); + } + + return; + } + } + + // When data is committed, we can notify all stable closures and send response messages. + for (StripeAwareAppendBatcher appendBatcher : appendBatchers) { + appendBatcher.notifyClosures(); + } + + appendBatchers.clear(); + size = 0; + bufferSize = 0; + } + } +} diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java index 0ac43286e1..15cc2c0adb 100644 --- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java +++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java @@ -16,6 +16,7 @@ */ package org.apache.ignite.raft.jraft.core; +import static java.util.stream.Collectors.toList; import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.EventTranslator; import com.lmax.disruptor.RingBuffer; @@ -35,12 +36,16 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.stream.Collectors; +import java.util.stream.IntStream; import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.raft.JraftGroupEventsListener; import org.apache.ignite.internal.raft.RaftNodeDisruptorConfiguration; +import org.apache.ignite.internal.raft.storage.impl.RocksDbSharedLogStorage; +import org.apache.ignite.internal.raft.storage.impl.StripeAwareLogManager; +import org.apache.ignite.internal.raft.storage.impl.StripeAwareLogManager.Stripe; import org.apache.ignite.internal.thread.NamedThreadFactory; import org.apache.ignite.raft.jraft.Closure; import org.apache.ignite.raft.jraft.FSMCaller; @@ -612,7 +617,8 @@ public class NodeImpl implements Node, RaftServerService { private boolean initLogStorage() { Requires.requireNonNull(this.fsmCaller, "Null fsm caller"); this.logStorage = this.serviceFactory.createLogStorage(this.options.getLogUri(), this.raftOptions); - this.logManager = new LogManagerImpl(); + int stripe = options.getLogManagerDisruptor().getStripe(getNodeId()); + this.logManager = new StripeAwareLogManager(options.getLogStripes().get(stripe)); final LogManagerOptions opts = new LogManagerOptions(); opts.setLogEntryCodecFactory(this.serviceFactory.createLogEntryCodecFactory()); opts.setLogStorage(this.logStorage); @@ -1279,7 +1285,9 @@ public class NodeImpl implements Node, RaftServerService { NamedThreadFactory.threadPrefix(opts.getServerName(), "JRaft-FSMCaller-Disruptor"), opts.getRaftOptions().getDisruptorBufferSize(), () -> new FSMCallerImpl.ApplyTask(), - opts.getStripes())); + opts.getStripes(), + false + )); } else if (ownFsmCallerExecutorDisruptorConfig != null) { opts.setfSMCallerExecutorDisruptor(new StripedDisruptor<FSMCallerImpl.ApplyTask>( NamedThreadFactory.threadPrefix( @@ -1288,7 +1296,9 @@ public class NodeImpl implements Node, RaftServerService { ), opts.getRaftOptions().getDisruptorBufferSize(), () -> new FSMCallerImpl.ApplyTask(), - ownFsmCallerExecutorDisruptorConfig.getStripes())); + ownFsmCallerExecutorDisruptorConfig.getStripes(), + false + )); } if (opts.getNodeApplyDisruptor() == null) { @@ -1296,7 +1306,9 @@ public class NodeImpl implements Node, RaftServerService { NamedThreadFactory.threadPrefix(opts.getServerName(), "JRaft-NodeImpl-Disruptor"), opts.getRaftOptions().getDisruptorBufferSize(), () -> new NodeImpl.LogEntryAndClosure(), - opts.getStripes())); + opts.getStripes(), + false + )); } if (opts.getReadOnlyServiceDisruptor() == null) { @@ -1304,7 +1316,9 @@ public class NodeImpl implements Node, RaftServerService { NamedThreadFactory.threadPrefix(opts.getServerName(), "JRaft-ReadOnlyService-Disruptor"), opts.getRaftOptions().getDisruptorBufferSize(), () -> new ReadOnlyServiceImpl.ReadIndexEvent(), - opts.getStripes())); + opts.getStripes(), + false + )); } if (opts.getLogManagerDisruptor() == null) { @@ -1312,7 +1326,11 @@ public class NodeImpl implements Node, RaftServerService { NamedThreadFactory.threadPrefix(opts.getServerName(), "JRaft-LogManager-Disruptor"), opts.getRaftOptions().getDisruptorBufferSize(), () -> new LogManagerImpl.StableClosureEvent(), - opts.getStripes())); + opts.getStripes(), + logStorage instanceof RocksDbSharedLogStorage + )); + + opts.setLogStripes(IntStream.range(0, opts.getStripes()).mapToObj(i -> new Stripe()).collect(toList())); } } @@ -3188,6 +3206,7 @@ public class NodeImpl implements Node, RaftServerService { if (opts.getLogManagerDisruptor() != null && !opts.isSharedPools()) { opts.getLogManagerDisruptor().shutdown(); opts.setLogManagerDisruptor(null); + opts.setLogStripes(null); } } diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/disruptor/StripedDisruptor.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/disruptor/StripedDisruptor.java index db0fb63299..2910756206 100644 --- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/disruptor/StripedDisruptor.java +++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/disruptor/StripedDisruptor.java @@ -61,19 +61,29 @@ public class StripedDisruptor<T extends NodeIdAware> { /** The Striped disruptor name. */ private final String name; + /** + * If {@code false}, this stripe will always pass {@code true} into {@link EventHandler#onEvent(Object, long, boolean)}. + * Otherwise, the data will be provided with batches. + */ + //TODO: IGNITE-15568 endOfBatch should be set to true to prevent caching tasks until IGNITE-15568 has fixed. + private final boolean supportsBatches; + /** * @param name Name of the Striped disruptor. * @param bufferSize Buffer size for each Disruptor. * @param eventFactory Event factory for the Striped disruptor. * @param stripes Amount of stripes. + * @param supportsBatches If {@code false}, this stripe will always pass {@code true} into + * {@link EventHandler#onEvent(Object, long, boolean)}. Otherwise, the data will be provided with batches. */ - public StripedDisruptor(String name, int bufferSize, EventFactory<T> eventFactory, int stripes) { + public StripedDisruptor(String name, int bufferSize, EventFactory<T> eventFactory, int stripes, boolean supportsBatches) { disruptors = new Disruptor[stripes]; queues = new RingBuffer[stripes]; eventHandlers = new ArrayList<>(stripes); exceptionHandlers = new ArrayList<>(stripes); this.stripes = stripes; this.name = name; + this.supportsBatches = supportsBatches; for (int i = 0; i < stripes; i++) { String stripeName = format("{}_stripe_{}-", name, i); @@ -160,7 +170,7 @@ public class StripedDisruptor<T extends NodeIdAware> { * @param nodeId Node id. * @return Stripe of the Striped disruptor. */ - private int getStripe(NodeId nodeId) { + public int getStripe(NodeId nodeId) { return Math.abs(nodeId.hashCode() % stripes); } @@ -213,8 +223,7 @@ public class StripedDisruptor<T extends NodeIdAware> { assert handler != null : format("Group of the event is unsupported [nodeId={}, event={}]", event.nodeId(), event); - //TODO: IGNITE-15568 endOfBatch should be set to true to prevent caching tasks until IGNITE-15568 has fixed. - handler.onEvent(event, sequence, subscribers.size() > 1 ? true : endOfBatch); + handler.onEvent(event, sequence, endOfBatch || subscribers.size() > 1 && !supportsBatches); } } diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java index e9a051512a..678dc61766 100644 --- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java +++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java @@ -17,11 +17,14 @@ package org.apache.ignite.raft.jraft.option; import java.util.List; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import org.apache.ignite.internal.hlc.HybridClockImpl; import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.raft.JraftGroupEventsListener; +import org.apache.ignite.internal.raft.storage.impl.StripeAwareLogManager; +import org.apache.ignite.internal.raft.storage.impl.StripeAwareLogManager.Stripe; import org.apache.ignite.raft.jraft.JRaftServiceFactory; import org.apache.ignite.raft.jraft.StateMachine; import org.apache.ignite.raft.jraft.conf.Configuration; @@ -247,6 +250,9 @@ public class NodeOptions extends RpcOptions implements Copiable<NodeOptions> { /** */ private boolean sharedPools = false; + /** */ + private List<Stripe> logStripes; + public NodeOptions() { raftOptions.setRaftMessagesFactory(getRaftMessagesFactory()); } @@ -592,6 +598,14 @@ public class NodeOptions extends RpcOptions implements Copiable<NodeOptions> { this.logManagerDisruptor = logManagerDisruptor; } + public void setLogStripes(List<Stripe> logStripes) { + this.logStripes = logStripes; + } + + public List<Stripe> getLogStripes() { + return logStripes; + } + public HybridClock getClock() { return clock; } @@ -627,6 +641,7 @@ public class NodeOptions extends RpcOptions implements Copiable<NodeOptions> { nodeOptions.setfSMCallerExecutorDisruptor(this.getfSMCallerExecutorDisruptor()); nodeOptions.setReadOnlyServiceDisruptor(this.getReadOnlyServiceDisruptor()); nodeOptions.setLogManagerDisruptor(this.getLogManagerDisruptor()); + nodeOptions.setLogStripes(this.getLogStripes()); nodeOptions.setElectionTimer(this.getElectionTimer()); nodeOptions.setVoteTimer(this.getVoteTimer()); nodeOptions.setSnapshotTimer(this.getSnapshotTimer()); diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerImpl.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerImpl.java index 906b573fa7..a6363108bb 100644 --- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerImpl.java +++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerImpl.java @@ -79,7 +79,7 @@ public class LogManagerImpl implements LogManager { private final Lock writeLock = this.lock.writeLock(); private final Lock readLock = this.lock.readLock(); private volatile boolean stopped; - private volatile boolean hasError; + protected volatile boolean hasError; private long nextWaitId; private LogId diskId = new LogId(0, 0); // Last log entry written to disk. private LogId appliedId = new LogId(0, 0); @@ -409,7 +409,7 @@ public class LogManagerImpl implements LogManager { return true; } - private LogId appendToStorage(final List<LogEntry> toAppend) { + protected LogId appendToStorage(final List<LogEntry> toAppend) { LogId lastId = null; if (!this.hasError) { final long startMs = Utils.monotonicMs(); @@ -422,7 +422,7 @@ public class LogManagerImpl implements LogManager { writtenSize += entry.getData() != null ? entry.getData().remaining() : 0; } this.nodeMetrics.recordSize("append-logs-bytes", writtenSize); - final int nAppent = this.logStorage.appendEntries(toAppend); + final int nAppent = appendToLogStorage(toAppend); if (nAppent != entriesCount) { LOG.error("**Critical error**, fail to appendEntries, nAppent={}, toAppend={}", nAppent, toAppend.size()); @@ -431,7 +431,6 @@ public class LogManagerImpl implements LogManager { if (nAppent > 0) { lastId = toAppend.get(nAppent - 1).getId(); } - toAppend.clear(); } finally { this.nodeMetrics.recordLatency("append-logs", Utils.monotonicMs() - startMs); @@ -440,15 +439,19 @@ public class LogManagerImpl implements LogManager { return lastId; } - private class AppendBatcher { - List<StableClosure> storage; - int cap; - int size; - int bufferSize; - List<LogEntry> toAppend; - LogId lastId; + protected int appendToLogStorage(List<LogEntry> toAppend) { + return this.logStorage.appendEntries(toAppend); + } + + protected class AppendBatcher { + protected final List<StableClosure> storage; + protected final int cap; + protected int size; + protected int bufferSize; + protected final List<LogEntry> toAppend; + protected LogId lastId; - AppendBatcher(final List<StableClosure> storage, final int cap, final List<LogEntry> toAppend, + protected AppendBatcher(final List<StableClosure> storage, final int cap, final List<LogEntry> toAppend, final LogId lastId) { super(); this.storage = storage; @@ -457,7 +460,7 @@ public class LogManagerImpl implements LogManager { this.lastId = lastId; } - LogId flush() { + protected LogId flush() { if (this.size > 0) { this.lastId = appendToStorage(this.toAppend); for (int i = 0; i < this.size; i++) { @@ -485,7 +488,7 @@ public class LogManagerImpl implements LogManager { return this.lastId; } - void append(final StableClosure done) { + protected void append(final StableClosure done) { if (this.size == this.cap || this.bufferSize >= LogManagerImpl.this.raftOptions.getMaxAppendBufferSize()) { flush(); } @@ -498,10 +501,14 @@ public class LogManagerImpl implements LogManager { } } + protected AppendBatcher newAppendBatcher(List<StableClosure> storages, int cap, LogId diskId) { + return new AppendBatcher(storages, cap, new ArrayList<>(), diskId); + } + private class StableClosureEventHandler implements EventHandler<StableClosureEvent> { LogId lastId = LogManagerImpl.this.diskId; List<StableClosure> storage = new ArrayList<>(256); - AppendBatcher ab = new AppendBatcher(this.storage, 256, new ArrayList<>(), LogManagerImpl.this.diskId); + AppendBatcher ab = newAppendBatcher(this.storage, 256, LogManagerImpl.this.diskId); @Override public void onEvent(final StableClosureEvent event, final long sequence, final boolean endOfBatch) @@ -523,6 +530,8 @@ public class LogManagerImpl implements LogManager { } else { this.lastId = this.ab.flush(); + setDiskId(this.lastId); + boolean ret = true; switch (eventType) { case LAST_LOG_ID: @@ -582,14 +591,14 @@ public class LogManagerImpl implements LogManager { } - private void reportError(final int code, final String fmt, final Object... args) { + protected void reportError(final int code, final String fmt, final Object... args) { this.hasError = true; final RaftException error = new RaftException(ErrorType.ERROR_TYPE_LOG); error.setStatus(new Status(code, fmt, args)); this.fsmCaller.onError(error); } - private void setDiskId(final LogId id) { + protected void setDiskId(final LogId id) { if (id == null) { return; } diff --git a/modules/raft/src/test/java/org/apache/ignite/disruptor/StripedDisruptorTest.java b/modules/raft/src/test/java/org/apache/ignite/disruptor/StripedDisruptorTest.java index f1e03bef70..58a487fa04 100644 --- a/modules/raft/src/test/java/org/apache/ignite/disruptor/StripedDisruptorTest.java +++ b/modules/raft/src/test/java/org/apache/ignite/disruptor/StripedDisruptorTest.java @@ -47,7 +47,8 @@ public class StripedDisruptorTest extends IgniteAbstractTest { StripedDisruptor<NodeIdAwareTestObj> disruptor = new StripedDisruptor<>("test-disruptor", 16384, NodeIdAwareTestObj::new, - 1); + 1, + false); var nodeId1 = new NodeId("grp1", new PeerId("foo")); var nodeId2 = new NodeId("grp2", new PeerId("foo")); @@ -96,7 +97,8 @@ public class StripedDisruptorTest extends IgniteAbstractTest { StripedDisruptor<NodeIdAwareTestObj> disruptor = new StripedDisruptor<>("test-disruptor", 16384, NodeIdAwareTestObj::new, - 5); + 5, + false); GroupAwareTestObjHandler handler = new GroupAwareTestObjHandler(); diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/FSMCallerTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/FSMCallerTest.java index eadd7c5900..a1bd084ef6 100644 --- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/FSMCallerTest.java +++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/FSMCallerTest.java @@ -100,7 +100,8 @@ public class FSMCallerTest { opts.setfSMCallerExecutorDisruptor(disruptor = new StripedDisruptor<>("TestFSMDisruptor", 1024, () -> new FSMCallerImpl.ApplyTask(), - 1)); + 1, + false)); assertTrue(this.fsmCaller.init(opts)); } diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/ReadOnlyServiceTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/ReadOnlyServiceTest.java index 818bc6526e..d1d64718aa 100644 --- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/ReadOnlyServiceTest.java +++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/core/ReadOnlyServiceTest.java @@ -90,7 +90,8 @@ public class ReadOnlyServiceTest { opts.setReadOnlyServiceDisruptor(disruptor = new StripedDisruptor<>("TestReadOnlyServiceDisruptor", 1024, () -> new ReadOnlyServiceImpl.ReadIndexEvent(), - 1)); + 1, + false)); NodeOptions nodeOptions = new NodeOptions(); ExecutorService executor = JRaftUtils.createExecutor("test-executor", Utils.cpus()); executors.add(executor); diff --git a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerTest.java b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerTest.java index 2ee03c1b8b..bbc882da10 100644 --- a/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerTest.java +++ b/modules/raft/src/test/java/org/apache/ignite/raft/jraft/storage/impl/LogManagerTest.java @@ -104,7 +104,8 @@ public class LogManagerTest extends BaseStorageTest { opts.setLogManagerDisruptor(disruptor = new StripedDisruptor<>("TestLogManagerDisruptor", 1024, () -> new LogManagerImpl.StableClosureEvent(), - 1)); + 1, + false)); assertTrue(this.logManager.init(opts)); } diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItRaftCommandLeftInLogUntilRestartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItRaftCommandLeftInLogUntilRestartTest.java index 21b4f669bc..968a06742b 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItRaftCommandLeftInLogUntilRestartTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItRaftCommandLeftInLogUntilRestartTest.java @@ -224,7 +224,8 @@ public class ItRaftCommandLeftInLogUntilRestartTest extends ClusterPerClassInteg NamedThreadFactory.threadPrefix(node.name() + "-test", "JRaft-FSMCaller-Disruptor"), 64, () -> new ApplyTask(), - 1 + 1, + false ) { @Override public RingBuffer<ApplyTask> subscribe(NodeId group, EventHandler<ApplyTask> handler,