timoninmaxim commented on code in PR #11112:
URL: https://github.com/apache/ignite/pull/11112#discussion_r1432615662


##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/CreateDumpFutureTask.java:
##########
@@ -561,45 +586,60 @@ public boolean writeForIterator(
             CacheObject val,
             GridCacheVersion ver
         ) {
-            boolean written = true;
+            String reason = null;
 
             if (isAfterStart(ver))
-                written = false;
-            else if (changed.get(cache).contains(key))
-                written = false;
-            else
-                write(cache, expireTime, key, val, ver);
+                reason = "greater version";

Review Comment:
   Javadoc for `return` is outdated



##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/CreateDumpFutureTask.java:
##########
@@ -614,6 +654,38 @@ private boolean isAfterStart(GridCacheVersion ver) {
             return (startVer != null && ver.isGreater(startVer)) && 
!isolatedStreamerVer.equals(ver);
         }
 
+        /**
+         * Iterator returned by {@link 
IgniteCacheOffheapManager#reservedIterator(int, AffinityTopologyVersion)}
+         * iterates key in ascending order set by {@link 
CacheDataTree#compare(BPlusIO, long, int, CacheSearchRow)}.
+         * So if key changed by the user (see {@link #writeChanged(int, long, 
KeyCacheObject, CacheObject, GridCacheVersion)})
+         * is greater then last key written by partition iterator then it not 
saved in dump, already, and must be written.
+         * Otherwise, key already saved by the iterator and must be skiped.
+         *
+         * @param cache Cache id.
+         * @param key Key to write with {@link #writeChanged(int, long, 
KeyCacheObject, CacheObject, GridCacheVersion)}.
+         * @return {@code 0} if key written by the iterator, already. {@code 
False} otherwise.

Review Comment:
   Wrong return description



##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/CreateDumpFutureTask.java:
##########
@@ -614,6 +654,38 @@ private boolean isAfterStart(GridCacheVersion ver) {
             return (startVer != null && ver.isGreater(startVer)) && 
!isolatedStreamerVer.equals(ver);
         }
 
+        /**
+         * Iterator returned by {@link 
IgniteCacheOffheapManager#reservedIterator(int, AffinityTopologyVersion)}
+         * iterates key in ascending order set by {@link 
CacheDataTree#compare(BPlusIO, long, int, CacheSearchRow)}.
+         * So if key changed by the user (see {@link #writeChanged(int, long, 
KeyCacheObject, CacheObject, GridCacheVersion)})
+         * is greater then last key written by partition iterator then it not 
saved in dump, already, and must be written.

Review Comment:
   it hasn't been saved in the dump yet and must be written



##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/CreateDumpFutureTask.java:
##########
@@ -561,45 +586,60 @@ public boolean writeForIterator(
             CacheObject val,
             GridCacheVersion ver
         ) {
-            boolean written = true;
+            String reason = null;
 
             if (isAfterStart(ver))
-                written = false;
-            else if (changed.get(cache).contains(key))
-                written = false;
-            else
-                write(cache, expireTime, key, val, ver);
+                reason = "greater version";
+            else {
+                try {
+                    CacheObjectContext coCtx = cctx.cacheObjectContext(cache);
+
+                    synchronized (serializer) { // Prevent concurrent access 
to the dump file.
+                        iterLastKeyCache = cache;
+                        iterLastKey = key;
+
+                        if (changed.get(cache).contains(key))
+                            reason = "written by listener";
+                        else
+                            write(cache, expireTime, key, val, ver, coCtx);
+                    }
+                }
+                catch (IOException | IgniteCheckedException e) {
+                    throw new IgniteException(e);
+                }
+            }
 
             if (log.isTraceEnabled()) {
                 log.trace("Iterator [" +
                     "grp=" + grp +
                     ", cache=" + cache +
                     ", part=" + part +
                     ", key=" + key +
-                    ", written=" + written +
-                    ", ver=" + ver + ']');
+                    ", written=" + (reason == null ? "true" : reason) +
+                    ", ver=" + ver +
+                    ", startVer=" + (startVer != null) + ']');
             }
 
-            return written;
+            return reason == null;
         }
 
         /** */
-        private void write(int cache, long expireTime, KeyCacheObject key, 
CacheObject val, GridCacheVersion ver) {
-            synchronized (serializer) { // Prevent concurrent access to the 
dump file.
-                try {
-                    ByteBuffer buf = serializer.writeToBuffer(cache, 
expireTime, key, val, ver, cctx.cacheObjectContext(cache));
+        private void write(
+            int cache,
+            long expireTime,
+            KeyCacheObject key,
+            CacheObject val,
+            GridCacheVersion ver,
+            CacheObjectContext coCtx
+        ) throws IgniteCheckedException, IOException {

Review Comment:
   Why did you move exception handling out of this method? It looks like, it 
did code more complicated



##########
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/IgniteConcurrentCacheDumpTest.java:
##########
@@ -43,7 +43,7 @@ public static List<Object[]> params() {
         for (boolean encrypted : new boolean[]{true, false})
             for (int nodes : new int[]{2, 3})
                 for (int backups : new int[]{1, 2})
-                    for (boolean persistence : new boolean[]{true, false})
+                    for (boolean persistence : new boolean[]{/*true,*/ false})

Review Comment:
   Why do you need this change?



##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/CreateDumpFutureTask.java:
##########
@@ -519,12 +529,25 @@ public void writeChanged(int cache, long expireTime, 
KeyCacheObject key, CacheOb
                         reasonToSkip = "partition already saved";
                     else if (isAfterStart(ver))
                         reasonToSkip = "greater version";
-                    else if (!changed.get(cache).add(key)) // Entry changed 
several time during dump.
-                        reasonToSkip = "changed several times";
-                    else if (val == null)
-                        reasonToSkip = "newly created or already removed"; // 
Previous value is null. Entry created after dump start, skip.
                     else {
-                        write(cache, expireTime, key, val, ver);
+                        try {
+                            CacheObjectContext coCtx = 
cctx.cacheObjectContext(cache);
+
+                            synchronized (serializer) { // Prevent concurrent 
access to the dump file.
+                                if (isWrittenByIterator(cache, key, coCtx))
+                                    reasonToSkip = "written by iterator"; // 
Saved by iterator, already. Skip.
+                                else if (!changed.get(cache).add(key)) // 
Entry changed several time during dump.
+                                    reasonToSkip = "changed several times";
+                                else if (val == null)
+                                    // Previous value is null. Entry created 
after dump start, skip.
+                                    reasonToSkip = "newly created or already 
removed";
+                                else
+                                    write(cache, expireTime, key, val, ver, 
coCtx);
+                            }
+                        }
+                        catch (IOException | IgniteCheckedException e) {
+                            throw new IgniteException(e);
+                        }
 
                         changedCnt.increment();

Review Comment:
   Looks like it should invoked after write() only



##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/CreateDumpFutureTask.java:
##########
@@ -614,6 +654,38 @@ private boolean isAfterStart(GridCacheVersion ver) {
             return (startVer != null && ver.isGreater(startVer)) && 
!isolatedStreamerVer.equals(ver);
         }
 
+        /**
+         * Iterator returned by {@link 
IgniteCacheOffheapManager#reservedIterator(int, AffinityTopologyVersion)}
+         * iterates key in ascending order set by {@link 
CacheDataTree#compare(BPlusIO, long, int, CacheSearchRow)}.
+         * So if key changed by the user (see {@link #writeChanged(int, long, 
KeyCacheObject, CacheObject, GridCacheVersion)})
+         * is greater then last key written by partition iterator then it not 
saved in dump, already, and must be written.

Review Comment:
   greater than



##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/CreateDumpFutureTask.java:
##########
@@ -614,6 +654,38 @@ private boolean isAfterStart(GridCacheVersion ver) {
             return (startVer != null && ver.isGreater(startVer)) && 
!isolatedStreamerVer.equals(ver);
         }
 
+        /**
+         * Iterator returned by {@link 
IgniteCacheOffheapManager#reservedIterator(int, AffinityTopologyVersion)}
+         * iterates key in ascending order set by {@link 
CacheDataTree#compare(BPlusIO, long, int, CacheSearchRow)}.
+         * So if key changed by the user (see {@link #writeChanged(int, long, 
KeyCacheObject, CacheObject, GridCacheVersion)})
+         * is greater then last key written by partition iterator then it not 
saved in dump, already, and must be written.
+         * Otherwise, key already saved by the iterator and must be skiped.
+         *
+         * @param cache Cache id.
+         * @param key Key to write with {@link #writeChanged(int, long, 
KeyCacheObject, CacheObject, GridCacheVersion)}.
+         * @return {@code 0} if key written by the iterator, already. {@code 
False} otherwise.
+         * @see CacheDataTree#compareBytes(byte[], byte[])
+         */
+        private boolean isWrittenByIterator(int cache, KeyCacheObject key, 
CacheObjectContext coCtx) throws IgniteCheckedException {

Review Comment:
   Internal API should not use prefixes like "is", "set", "get"



##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/dump/CreateDumpFutureTask.java:
##########
@@ -519,12 +529,25 @@ public void writeChanged(int cache, long expireTime, 
KeyCacheObject key, CacheOb
                         reasonToSkip = "partition already saved";
                     else if (isAfterStart(ver))
                         reasonToSkip = "greater version";
-                    else if (!changed.get(cache).add(key)) // Entry changed 
several time during dump.
-                        reasonToSkip = "changed several times";
-                    else if (val == null)
-                        reasonToSkip = "newly created or already removed"; // 
Previous value is null. Entry created after dump start, skip.
                     else {
-                        write(cache, expireTime, key, val, ver);
+                        try {
+                            CacheObjectContext coCtx = 
cctx.cacheObjectContext(cache);
+
+                            synchronized (serializer) { // Prevent concurrent 
access to the dump file.
+                                if (isWrittenByIterator(cache, key, coCtx))
+                                    reasonToSkip = "written by iterator"; // 
Saved by iterator, already. Skip.
+                                else if (!changed.get(cache).add(key)) // 
Entry changed several time during dump.
+                                    reasonToSkip = "changed several times";
+                                else if (val == null)

Review Comment:
   Let's move this `if` from the synchronize block



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to