alievmirza commented on code in PR #939:
URL: https://github.com/apache/ignite-3/pull/939#discussion_r946977713


##########
modules/api/src/main/java/org/apache/ignite/configuration/schemas/table/UnlimitedBudgetConfigurationSchema.java:
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.configuration.schemas.table;
+
+import org.apache.ignite.configuration.annotation.PolymorphicConfigInstance;
+
+/**
+ * Configuration for 'unlimited' log storage budget.

Review Comment:
   Let's add here link to the corresponding interface instead of 'unlimited' 
phrase. 



##########
modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/EntryCountBudget.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.storage.impl;
+
+import java.util.List;
+import org.apache.ignite.raft.jraft.entity.LogEntry;
+
+/**
+ * {@link LogStorageBudget} that makes sure that no more entries than the 
provided limit is stored.
+ *
+ * <p>This is not thread safe as {@link LogStorageBudget} implementations do 
not need to be thread safe (because all budget methods
+ * are called under locks).
+ */
+public class EntryCountBudget implements LogStorageBudget {
+
+    private static final long NO_INDEX = -1;
+
+    private final long entriesCountLimit;
+
+    private long firstIndex = NO_INDEX;
+    private long lastIndex = NO_INDEX;
+
+    public EntryCountBudget(long entriesCountLimit) {
+        this.entriesCountLimit = entriesCountLimit;
+    }
+
+    @Override
+    public boolean hasRoomFor(LogEntry entry) {

Review Comment:
   Seems like we don't need entry here, as long as we don't need it in all 
implementations of 
`org.apache.ignite.raft.jraft.storage.impl.LogStorageBudget#hasRoomFor`, maybe 
we can simplify it? 



##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/VolatileLogStorageFactory.java:
##########
@@ -17,15 +17,64 @@
 
 package org.apache.ignite.internal.raft.storage.impl;
 
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.ServiceLoader;
+import java.util.Set;
+import org.apache.ignite.configuration.schemas.table.LogStorageBudgetView;
 import org.apache.ignite.internal.raft.storage.LogStorageFactory;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.raft.jraft.core.LogStorageBudgetFactory;
+import org.apache.ignite.raft.jraft.core.LogStorageBudgetsModule;
 import org.apache.ignite.raft.jraft.option.RaftOptions;
 import org.apache.ignite.raft.jraft.storage.LogStorage;
 import org.apache.ignite.raft.jraft.storage.impl.LocalLogStorage;
+import org.apache.ignite.raft.jraft.storage.impl.LogStorageBudget;
+import org.apache.ignite.raft.jraft.storage.impl.VolatileLogStorage;
 
 /**
  * Log storage factory based on {@link LocalLogStorage}.

Review Comment:
   Probably, here should be `based on VolatileLogStorage`



##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/VolatileLogStorageFactory.java:
##########
@@ -17,15 +17,64 @@
 
 package org.apache.ignite.internal.raft.storage.impl;
 
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.ServiceLoader;
+import java.util.Set;
+import org.apache.ignite.configuration.schemas.table.LogStorageBudgetView;
 import org.apache.ignite.internal.raft.storage.LogStorageFactory;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.raft.jraft.core.LogStorageBudgetFactory;
+import org.apache.ignite.raft.jraft.core.LogStorageBudgetsModule;
 import org.apache.ignite.raft.jraft.option.RaftOptions;
 import org.apache.ignite.raft.jraft.storage.LogStorage;
 import org.apache.ignite.raft.jraft.storage.impl.LocalLogStorage;
+import org.apache.ignite.raft.jraft.storage.impl.LogStorageBudget;
+import org.apache.ignite.raft.jraft.storage.impl.VolatileLogStorage;
 
 /**
  * Log storage factory based on {@link LocalLogStorage}.
  */
 public class VolatileLogStorageFactory implements LogStorageFactory {
+    private final LogStorageBudgetView logStorageBudgetConfig;
+
+    private final Map<String, LogStorageBudgetFactory> budgetFactories;
+
+    /**
+     * Creates a new instance.
+     *
+     * @param logStorageBudgetConfig Budget config.
+     */
+    public VolatileLogStorageFactory(LogStorageBudgetView 
logStorageBudgetConfig) {
+        this.logStorageBudgetConfig = logStorageBudgetConfig;
+
+        Map<String, LogStorageBudgetFactory> factories = new HashMap<>();
+
+        ClassLoader serviceClassLoader = 
Thread.currentThread().getContextClassLoader();
+
+        for (LogStorageBudgetsModule module : 
ServiceLoader.load(LogStorageBudgetsModule.class, serviceClassLoader)) {
+            Map<String, LogStorageBudgetFactory> factoriesFromModule = 
module.budgetFactories();
+
+            checkForBudgetNameClashes(factories.keySet(), 
factoriesFromModule.keySet());
+
+            factories.putAll(factoriesFromModule);
+        }
+
+        budgetFactories = Map.copyOf(factories);
+    }
+
+    private void checkForBudgetNameClashes(Set<String> names1, Set<String> 
names2) {
+        Set<String> intersection = new HashSet<>(names1);
+        intersection.retainAll(names2);
+
+        if (!intersection.isEmpty()) {
+            throw new IgniteInternalException(

Review Comment:
   Is there any possibility to not use deprecated constructor for 
IgniteInternalException?
   Maybe we should introduce some error codes for that scenario



##########
modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/EntryCountBudget.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.raft.jraft.storage.impl;
+
+import java.util.List;
+import org.apache.ignite.raft.jraft.entity.LogEntry;
+
+/**
+ * {@link LogStorageBudget} that makes sure that no more entries than the 
provided limit is stored.
+ *
+ * <p>This is not thread safe as {@link LogStorageBudget} implementations do 
not need to be thread safe (because all budget methods
+ * are called under locks).
+ */
+public class EntryCountBudget implements LogStorageBudget {
+
+    private static final long NO_INDEX = -1;
+
+    private final long entriesCountLimit;
+
+    private long firstIndex = NO_INDEX;
+    private long lastIndex = NO_INDEX;
+
+    public EntryCountBudget(long entriesCountLimit) {
+        this.entriesCountLimit = entriesCountLimit;
+    }
+
+    @Override
+    public boolean hasRoomFor(LogEntry entry) {
+        return storedEntries() < entriesCountLimit;
+    }
+
+    private long storedEntries() {
+        if (firstIndex == NO_INDEX && lastIndex == NO_INDEX) {
+            return 0;
+        } else if (firstIndex != NO_INDEX && lastIndex != NO_INDEX) {
+            return lastIndex - firstIndex + 1;
+        } else {
+            throw new IllegalStateException("Only one of firstIndex and 
lastIndex is initialized: " + firstIndex + " and " + lastIndex);
+        }
+    }
+
+    @Override
+    public void onAppended(LogEntry entry) {

Review Comment:
   Seems, that we don't need the whole entry here, we can use only 
corresponding index. Let's simplify it and use `long` here and in 
`EntryCountBudget#onAppended(java.util.List<org.apache.ignite.raft.jraft.entity.LogEntry>)`



##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftGroupOptions.java:
##########
@@ -64,6 +64,16 @@ public static RaftGroupOptions forVolatileStores() {
         return new RaftGroupOptions(true);
     }
 
+    /**
+     * Creates options derived from table configuration.
+     *
+     * @param isVolatile Whether the table is configured as volatile 
(in-memory) or not.
+     * @return Options derived from table configuration.
+     */
+    public static RaftGroupOptions forTable(boolean isVolatile) {

Review Comment:
   Why do we need this? I don't see any usages



##########
modules/api/src/main/java/org/apache/ignite/configuration/schemas/table/EntryCountBudgetConfigurationSchema.java:
##########
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.configuration.schemas.table;
+
+import org.apache.ignite.configuration.annotation.PolymorphicConfigInstance;
+import org.apache.ignite.configuration.annotation.Value;
+
+/**
+ * Configuration for 'entry-count' log storage budget.

Review Comment:
   Let's add here link to the corresponding interface instead of 'entry-count' 
phrase. It is quite unclear from the context what we are talking about 



##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/VolatileLogStorageFactory.java:
##########
@@ -34,7 +83,21 @@ public void start() {
     /** {@inheritDoc} */
     @Override
     public LogStorage createLogStorage(String uri, RaftOptions raftOptions) {
-        return new LocalLogStorage(raftOptions);
+        return new VolatileLogStorage(createLogStorageBudget());
+    }
+
+    private LogStorageBudget createLogStorageBudget() {
+        return newBudget(logStorageBudgetConfig);
+    }
+
+    private LogStorageBudget newBudget(LogStorageBudgetView 
logStorageBudgetConfig) {
+        LogStorageBudgetFactory factory = 
budgetFactories.get(logStorageBudgetConfig.name());
+
+        if (factory == null) {
+            throw new IgniteInternalException("Cannot find a log storage 
budget by name '" + logStorageBudgetConfig.name() + "'");

Review Comment:
   Is there any possibility to not use deprecated constructor for` 
IgniteInternalException`? 
   Maybe we should introduce some error codes for that scenario 



##########
modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/VolatileLogStorage.java:
##########
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.raft.jraft.storage.impl;
+
+import java.util.List;
+import java.util.NavigableMap;
+import java.util.SortedMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.raft.jraft.entity.EnumOutter;
+import org.apache.ignite.raft.jraft.entity.LogEntry;
+import org.apache.ignite.raft.jraft.entity.LogId;
+import org.apache.ignite.raft.jraft.entity.codec.LogEntryDecoder;
+import org.apache.ignite.raft.jraft.entity.codec.LogEntryEncoder;
+import org.apache.ignite.raft.jraft.option.LogStorageOptions;
+import org.apache.ignite.raft.jraft.storage.LogStorage;
+import org.apache.ignite.raft.jraft.storage.VolatileStorage;
+import org.apache.ignite.raft.jraft.util.Describer;
+import org.apache.ignite.raft.jraft.util.Requires;
+
+/**
+ * Stores RAFT log in memory.
+ */
+public class VolatileLogStorage implements LogStorage, Describer, 
VolatileStorage {
+    private static final IgniteLogger LOG = 
Loggers.forClass(VolatileLogStorage.class);
+
+    private final LogStorageBudget budget;
+
+    private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+    private final Lock readLock = this.readWriteLock.readLock();
+    private final Lock writeLock = this.readWriteLock.writeLock();
+
+    private final NavigableMap<Long, LogEntry> log = new 
ConcurrentSkipListMap<>();
+
+    private LogEntryEncoder logEntryEncoder;
+    private LogEntryDecoder logEntryDecoder;
+
+    private volatile long firstLogIndex = 1;
+    private volatile long lastLogIndex = 0;
+
+    private volatile boolean initialized = false;
+
+    public VolatileLogStorage(LogStorageBudget budget) {
+        super();
+
+        this.budget = budget;
+    }
+
+    @Override
+    public boolean init(final LogStorageOptions opts) {
+        Requires.requireNonNull(opts.getConfigurationManager(), "Null conf 
manager");
+        Requires.requireNonNull(opts.getLogEntryCodecFactory(), "Null log 
entry codec factory");
+
+        this.writeLock.lock();
+
+        try {
+            if (initialized) {
+                LOG.warn("VolatileLogStorage init() was already called.");
+                return true;
+            }
+            this.initialized = true;
+            this.logEntryDecoder = opts.getLogEntryCodecFactory().decoder();
+            this.logEntryEncoder = opts.getLogEntryCodecFactory().encoder();
+            Requires.requireNonNull(this.logEntryDecoder, "Null log entry 
decoder");
+            Requires.requireNonNull(this.logEntryEncoder, "Null log entry 
encoder");
+
+            return true;
+        } finally {
+            this.writeLock.unlock();
+        }
+    }
+
+    @Override
+    public void shutdown() {
+        this.writeLock.lock();
+
+        try {
+            this.initialized = false;
+            this.log.clear();
+        } finally {
+            this.writeLock.unlock();
+        }
+    }
+
+    @Override
+    public long getFirstLogIndex() {
+        this.readLock.lock();
+
+        try {
+            return this.firstLogIndex;
+        } finally {
+            this.readLock.unlock();
+        }
+    }
+
+    @Override
+    public long getLastLogIndex() {
+        this.readLock.lock();
+
+        try {
+            return this.lastLogIndex;
+        } finally {
+            this.readLock.unlock();
+        }
+    }
+
+    @Override
+    public LogEntry getEntry(final long index) {
+        this.readLock.lock();
+
+        try {
+            if (index < getFirstLogIndex()) {
+                return null;
+            }
+
+            return log.get(index);
+        } finally {
+            this.readLock.unlock();
+        }
+    }
+
+    @Override
+    public long getTerm(final long index) {
+        final LogEntry entry = getEntry(index);
+        if (entry != null) {
+            return entry.getId().getTerm();
+        }
+        return 0;
+    }
+
+    @Override
+    public boolean appendEntry(final LogEntry entry) {
+        this.readLock.lock();
+
+        try {
+            if (!initialized) {
+                LOG.warn("DB not initialized or destroyed.");
+                return false;
+            }
+
+            this.log.put(entry.getId().getIndex(), entry);
+
+            lastLogIndex = log.lastKey();
+            firstLogIndex = log.firstKey();
+
+            budget.onAppended(entry);
+
+            return true;
+        } finally {
+            this.readLock.unlock();
+        }
+    }
+
+    @Override
+    public int appendEntries(final List<LogEntry> entries) {
+        if (entries == null || entries.isEmpty()) {
+            return 0;
+        }
+
+        final int entriesCount = entries.size();
+
+        this.readLock.lock();
+
+        try {
+            if (!initialized) {
+                LOG.warn("DB not initialized or destroyed.");
+                return 0;
+            }
+
+            for (LogEntry logEntry : entries) {
+                log.put(logEntry.getId().getIndex(), logEntry);
+            }
+
+            lastLogIndex = log.lastKey();
+            firstLogIndex = log.firstKey();
+
+            budget.onAppended(entries);

Review Comment:
   If this will be added later, let's add at least todo with the corresponding 
ticket 



##########
modules/raft/src/main/java/org/apache/ignite/raft/jraft/storage/impl/VolatileLogStorage.java:
##########
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.raft.jraft.storage.impl;
+
+import java.util.List;
+import java.util.NavigableMap;
+import java.util.SortedMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.raft.jraft.entity.EnumOutter;
+import org.apache.ignite.raft.jraft.entity.LogEntry;
+import org.apache.ignite.raft.jraft.entity.LogId;
+import org.apache.ignite.raft.jraft.entity.codec.LogEntryDecoder;
+import org.apache.ignite.raft.jraft.entity.codec.LogEntryEncoder;
+import org.apache.ignite.raft.jraft.option.LogStorageOptions;
+import org.apache.ignite.raft.jraft.storage.LogStorage;
+import org.apache.ignite.raft.jraft.storage.VolatileStorage;
+import org.apache.ignite.raft.jraft.util.Describer;
+import org.apache.ignite.raft.jraft.util.Requires;
+
+/**
+ * Stores RAFT log in memory.
+ */
+public class VolatileLogStorage implements LogStorage, Describer, 
VolatileStorage {
+    private static final IgniteLogger LOG = 
Loggers.forClass(VolatileLogStorage.class);
+
+    private final LogStorageBudget budget;
+
+    private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+    private final Lock readLock = this.readWriteLock.readLock();
+    private final Lock writeLock = this.readWriteLock.writeLock();
+
+    private final NavigableMap<Long, LogEntry> log = new 
ConcurrentSkipListMap<>();
+
+    private LogEntryEncoder logEntryEncoder;
+    private LogEntryDecoder logEntryDecoder;
+
+    private volatile long firstLogIndex = 1;
+    private volatile long lastLogIndex = 0;
+
+    private volatile boolean initialized = false;
+
+    public VolatileLogStorage(LogStorageBudget budget) {
+        super();
+
+        this.budget = budget;
+    }
+
+    @Override
+    public boolean init(final LogStorageOptions opts) {
+        Requires.requireNonNull(opts.getConfigurationManager(), "Null conf 
manager");
+        Requires.requireNonNull(opts.getLogEntryCodecFactory(), "Null log 
entry codec factory");
+
+        this.writeLock.lock();
+
+        try {
+            if (initialized) {
+                LOG.warn("VolatileLogStorage init() was already called.");
+                return true;
+            }
+            this.initialized = true;
+            this.logEntryDecoder = opts.getLogEntryCodecFactory().decoder();
+            this.logEntryEncoder = opts.getLogEntryCodecFactory().encoder();
+            Requires.requireNonNull(this.logEntryDecoder, "Null log entry 
decoder");
+            Requires.requireNonNull(this.logEntryEncoder, "Null log entry 
encoder");
+
+            return true;
+        } finally {
+            this.writeLock.unlock();
+        }
+    }
+
+    @Override
+    public void shutdown() {
+        this.writeLock.lock();
+
+        try {
+            this.initialized = false;
+            this.log.clear();
+        } finally {
+            this.writeLock.unlock();
+        }
+    }
+
+    @Override
+    public long getFirstLogIndex() {
+        this.readLock.lock();
+
+        try {
+            return this.firstLogIndex;
+        } finally {
+            this.readLock.unlock();
+        }
+    }
+
+    @Override
+    public long getLastLogIndex() {
+        this.readLock.lock();
+
+        try {
+            return this.lastLogIndex;
+        } finally {
+            this.readLock.unlock();
+        }
+    }
+
+    @Override
+    public LogEntry getEntry(final long index) {
+        this.readLock.lock();
+
+        try {
+            if (index < getFirstLogIndex()) {
+                return null;
+            }
+
+            return log.get(index);
+        } finally {
+            this.readLock.unlock();
+        }
+    }
+
+    @Override
+    public long getTerm(final long index) {
+        final LogEntry entry = getEntry(index);
+        if (entry != null) {
+            return entry.getId().getTerm();
+        }
+        return 0;
+    }
+
+    @Override
+    public boolean appendEntry(final LogEntry entry) {
+        this.readLock.lock();
+
+        try {
+            if (!initialized) {
+                LOG.warn("DB not initialized or destroyed.");
+                return false;
+            }
+
+            this.log.put(entry.getId().getIndex(), entry);
+
+            lastLogIndex = log.lastKey();
+            firstLogIndex = log.firstKey();
+
+            budget.onAppended(entry);
+
+            return true;
+        } finally {
+            this.readLock.unlock();
+        }
+    }
+
+    @Override
+    public int appendEntries(final List<LogEntry> entries) {
+        if (entries == null || entries.isEmpty()) {
+            return 0;
+        }
+
+        final int entriesCount = entries.size();
+
+        this.readLock.lock();
+
+        try {
+            if (!initialized) {
+                LOG.warn("DB not initialized or destroyed.");
+                return 0;
+            }
+
+            for (LogEntry logEntry : entries) {
+                log.put(logEntry.getId().getIndex(), logEntry);
+            }
+
+            lastLogIndex = log.lastKey();
+            firstLogIndex = log.firstKey();
+
+            budget.onAppended(entries);

Review Comment:
   I don't see any outcomes when there is no room for entries, why do we even 
need this budget logic then?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to