Mmuzaf commented on code in PR #10360:
URL: https://github.com/apache/ignite/pull/10360#discussion_r1017893630


##########
docs/_docs/snapshots/snapshots.adoc:
##########
@@ -95,6 +95,19 @@ increases the total amount of time for taking a snapshot. 
However, this keeps th
 
 See the link:perf-and-troubleshooting/thread-pools-tuning[Ignite Snapshot 
Execution Pool,window=_blank] page for more details.
 
+=== Distributed properties
+
+The distributed properties listed in the table below allow you to configure 
snapshots at runtime:
+
+[cols="1,3,1",opts="header"]
+|===
+|Parameter | Description | Default Value
+|`snapshotDeltaSortBatchSize`| Snapshot delta sort batch size in pages count. 
If set then delta pages will be indexed
+by page index to almost sequential disk write on apply. This generates an 
extra delta read. If value is non-positive or
+not set then delta pages will be applied directly. | 0

Review Comment:
   I think we should add some examples of what the property value may be e.g. 
20% of the total amount of partition size.



##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java:
##########
@@ -3254,19 +3271,73 @@ public LocalSnapshotSender(String snpName, @Nullable 
String snpPath) {
 
                 assert totalBytes % pageSize == 0 : "Given file with delta 
pages has incorrect size: " + fileIo.size();
 
+                int pagesCnt = (int)(totalBytes / pageSize);
+
+                Integer batchSize = deltaSortBatch.get();
+
+                GridIntIterator iter;
+
+                if (batchSize == null || batchSize <= 0)
+                    iter = U.forRange(0, pagesCnt);
+                else {
+                    iter = new GridIntIterator() {
+                        private int idx = 0;
+
+                        private Iterator<Integer> sortedIter;
+
+                        @Override public boolean hasNext() {
+                            if (sortedIter == null || !sortedIter.hasNext()) {
+                                try {
+                                    advance();
+                                }
+                                catch (Exception e) {
+                                    throw new IgniteException(e);
+                                }
+                            }
+
+                            return sortedIter.hasNext();
+                        }
+
+                        @Override public int next() {
+                            return sortedIter.next();

Review Comment:
   It will produce NullPointerExcepction here if we call the `next` first time. 
It seems it should be IllegalStateException or something similar.



##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java:
##########
@@ -3254,19 +3271,73 @@ public LocalSnapshotSender(String snpName, @Nullable 
String snpPath) {
 
                 assert totalBytes % pageSize == 0 : "Given file with delta 
pages has incorrect size: " + fileIo.size();
 
+                int pagesCnt = (int)(totalBytes / pageSize);
+
+                Integer batchSize = deltaSortBatch.get();
+
+                GridIntIterator iter;
+
+                if (batchSize == null || batchSize <= 0)
+                    iter = U.forRange(0, pagesCnt);
+                else {
+                    iter = new GridIntIterator() {
+                        private int idx = 0;
+
+                        private Iterator<Integer> sortedIter;
+
+                        @Override public boolean hasNext() {
+                            if (sortedIter == null || !sortedIter.hasNext()) {
+                                try {
+                                    advance();
+                                }
+                                catch (Exception e) {
+                                    throw new IgniteException(e);
+                                }
+                            }
+
+                            return sortedIter.hasNext();
+                        }
+
+                        @Override public int next() {
+                            return sortedIter.next();
+                        }
+
+                        private void advance() throws Exception {
+                            TreeMap<Integer, Integer> sorted = new TreeMap<>();
+
+                            while (idx < pagesCnt && sorted.size() < 
batchSize) {
+                                transferRateLimiter.acquire(pageSize);

Review Comment:
   We are limiting only writes, why we should use the same for reads? 



##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java:
##########
@@ -394,6 +403,13 @@ public class IgniteSnapshotManager extends 
GridCacheSharedManagerAdapter
     /** Snapshot transfer rate limit in bytes/sec. */
     private final DistributedLongProperty snapshotTransferRate = 
detachedLongProperty(SNAPSHOT_TRANSFER_RATE_DMS_KEY);
 
+    /**
+     * Snapshot delta sort batch size in pages count. If set then delta pages 
will be indexed by page index to almost
+     * sequential disk write on apply. This generates an extra delta read. If 
value is non-positive or not set then delta
+     * pages will be applied directly.
+     */
+    private final DistributedIntegerProperty deltaSortBatch = 
detachedIntegerProperty(SNAPSHOT_DELTA_SORT_BATCH_SIZE_KEY);

Review Comment:
   This will not work if encryption is enabled, right?



##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java:
##########
@@ -394,6 +403,13 @@ public class IgniteSnapshotManager extends 
GridCacheSharedManagerAdapter
     /** Snapshot transfer rate limit in bytes/sec. */
     private final DistributedLongProperty snapshotTransferRate = 
detachedLongProperty(SNAPSHOT_TRANSFER_RATE_DMS_KEY);
 
+    /**
+     * Snapshot delta sort batch size in pages count. If set then delta pages 
will be indexed by page index to almost
+     * sequential disk write on apply. This generates an extra delta read. If 
value is non-positive or not set then delta
+     * pages will be applied directly.
+     */
+    private final DistributedIntegerProperty deltaSortBatch = 
detachedIntegerProperty(SNAPSHOT_DELTA_SORT_BATCH_SIZE_KEY);

Review Comment:
   I'd rather avoid exposing some of the snapshot internal process things like 
"writing delta" or so on to a user.
   
   Actually, I think it would be more friendly for a user to have a snapshot 
configuration property of a snapshot write mode `RANDOM` or `SEQUENTIAL`. The 
batch must be predefined by the system.
   
   We don't even need to change this parameter at runtime. 



##########
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotDeltaTest.java:
##########
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.nio.file.OpenOption;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.BiFunction;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import 
org.apache.ignite.internal.processors.cache.persistence.file.FileIODecorator;
+import 
org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
+import 
org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
+import 
org.apache.ignite.internal.processors.configuration.distributed.DistributedChangeableProperty;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static 
org.apache.ignite.configuration.DataStorageConfiguration.DFLT_PAGE_SIZE;
+import static 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CACHE_DIR_PREFIX;
+import static 
org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.SNAPSHOT_DELTA_SORT_BATCH_SIZE_KEY;
+import static org.apache.ignite.testframework.GridTestUtils.cartesianProduct;
+import static org.junit.Assert.assertArrayEquals;
+
+/**
+ * Cluster snapshot delta tests.
+ */
+@RunWith(Parameterized.class)
+public class IgniteClusterSnapshotDeltaTest extends AbstractSnapshotSelfTest {
+    /** */
+    @Parameterized.Parameter(1)
+    public boolean sorted;
+
+    /** Parameters. */
+    @Parameterized.Parameters(name = "encryption={0}, sorted={1}")
+    public static Collection<Object[]> parameters() {
+        return cartesianProduct(encryptionParams(), F.asList(false, true));
+    }
+
+    /** @throws Exception If failed. */
+    @Test
+    public void testSendDelta() throws Exception {
+        int keys = 1_000;
+        byte[] payload = new byte[DFLT_PAGE_SIZE / 2];
+        int partCnt = 2;
+        int batchSize = keys / 10;
+
+        // 1. Start a cluster and fill cache.
+        ThreadLocalRandom.current().nextBytes(payload);
+
+        byte[] expPayload = Arrays.copyOf(payload, payload.length);
+
+        CacheConfiguration<Integer, byte[]> ccfg = new 
CacheConfiguration<Integer, byte[]>(DEFAULT_CACHE_NAME)
+            .setAffinity(new RendezvousAffinityFunction(false, partCnt));
+
+        String cacheDir = CACHE_DIR_PREFIX + DEFAULT_CACHE_NAME;
+
+        IgniteEx srv = startGridsWithCache(1, keys, (k) -> expPayload, ccfg);
+
+        if (sorted) {
+            setDeltaSortBatchSize(batchSize);
+
+            injectSequentialWriteCheck(srv, batchSize);

Review Comment:
   Why this factory can't be used as a JUnit test param?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to