tkalkirill commented on a change in pull request #9327:
URL: https://github.com/apache/ignite/pull/9327#discussion_r705309062



##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
##########
@@ -5519,66 +5520,89 @@ private void restorePartitionStates(
             if (log.isInfoEnabled())
                 log.info("Restoring partition state for local groups.");
 
-            AtomicLong totalProcessed = new AtomicLong();
-
             AtomicReference<IgniteCheckedException> restoreStateError = new 
AtomicReference<>();
 
             ExecutorService sysPool = ctx.pools().getSystemExecutorService();
 
-            CountDownLatch completionLatch = new 
CountDownLatch(forGroups.size());
+            final int totalPart = forGroups.stream().mapToInt(grpCtx -> 
grpCtx.affinity().partitions()).sum();
 
-            AtomicReference<SortedSet<T3<Long, Long, GroupPartitionId>>> 
topPartRef = new AtomicReference<>();
+            // Group id -> completed partitions counter.
+            Map<Integer, AtomicInteger> grps = new ConcurrentHashMap<>();

Review comment:
       I think we can get rid of it. 
   We can call the `IgniteCacheOffheapManager#confirmPartitionStatesRestored` 
after restoring all partitions for all groups.

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
##########
@@ -5614,17 +5642,44 @@ private void restorePartitionStates(
                 throw restoreStateError.get();
 
             if (log.isInfoEnabled()) {
-                SortedSet<T3<Long, Long, GroupPartitionId>> t = printTop ? 
topPartRef.get() : null;
+                SortedSet<T3<Long, Long, GroupPartitionId>> t =
+                    printTop ? collectTopProcessedParts(threadCtxs, 
topPartRefLimit) : null;
+
+                long totalProcessed = threadCtxs.stream().mapToLong(c -> 
c.processedCnt).sum();
 
                 log.info("Finished restoring partition state for local groups 
[" +
                     "groupsProcessed=" + forGroups.size() +
-                    ", partitionsProcessed=" + totalProcessed.get() +
+                    ", partitionsProcessed=" + totalProcessed +
                     ", time=" + U.humanReadableDuration(U.currentTimeMillis() 
- startRestorePart) +
                     (t == null ? "" : ", topProcessedPartitions=" + 
toStringTopProcessingPartitions(t, forGroups)) +
                     "]");
             }
         }
 
+        /**
+         * Collects top processed partitions from thread local contexts of 
restore partition state process.
+         *
+         * @param threadCtxs Thread local contexts.
+         * @param topPartRefLimit Limit of top partitions collection size.
+         */
+        private SortedSet<T3<Long, Long, GroupPartitionId>> 
collectTopProcessedParts(

Review comment:
       It may not work correctly in a multi-thread.

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
##########
@@ -5519,66 +5520,89 @@ private void restorePartitionStates(
             if (log.isInfoEnabled())
                 log.info("Restoring partition state for local groups.");
 
-            AtomicLong totalProcessed = new AtomicLong();
-
             AtomicReference<IgniteCheckedException> restoreStateError = new 
AtomicReference<>();
 
             ExecutorService sysPool = ctx.pools().getSystemExecutorService();
 
-            CountDownLatch completionLatch = new 
CountDownLatch(forGroups.size());
+            final int totalPart = forGroups.stream().mapToInt(grpCtx -> 
grpCtx.affinity().partitions()).sum();
 
-            AtomicReference<SortedSet<T3<Long, Long, GroupPartitionId>>> 
topPartRef = new AtomicReference<>();
+            // Group id -> completed partitions counter.
+            Map<Integer, AtomicInteger> grps = new ConcurrentHashMap<>();
 
-            long totalPart = forGroups.stream().mapToLong(grpCtx -> 
grpCtx.affinity().partitions()).sum();
+            CountDownLatch completionLatch = new CountDownLatch(totalPart);
 
-            for (CacheGroupContext grp : forGroups) {
-                sysPool.execute(() -> {
-                    try {
-                        Map<Integer, Long> processed = 
grp.offheap().restorePartitionStates(partStates);
+            Collection<RestorePartitionStateThreadContext> threadCtxs = new 
CopyOnWriteArrayList<>();
 
-                        totalProcessed.addAndGet(processed.size());
+            ThreadLocal<RestorePartitionStateThreadContext> threadLocalCtx =

Review comment:
       I think we should get rid of her.
   Let's use the striped map.

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
##########
@@ -5939,13 +5994,25 @@ static String toStringTopProcessingPartitions(
     /**
      * Comparator of processed partitions.
      * T3 -> 1 - duration, 2 - timestamp, 3 - partition of group.
-     * Sort order: duration -> timestamp -> partition of group.
+     * Sort order: duration -> timestamp (reversed order) -> partition of 
group.
      *
      * @return Comparator.
      */
     static Comparator<T3<Long, Long, GroupPartitionId>> 
processedPartitionComparator() {
         Comparator<T3<Long, Long, GroupPartitionId>> comp = 
Comparator.comparing(T3::get1);
 
-        return comp.thenComparing(T3::get2).thenComparing(T3::get3);
+        return comp.thenComparing(T3::get2, 
Comparator.reverseOrder()).thenComparing(T3::get3);
+    }
+
+    /**
+     * Thread local context of restore partition state progress.
+     */
+    private static class RestorePartitionStateThreadContext {
+        /** Top partitions by processing time. */
+        final AtomicReference<SortedSet<T3<Long, Long, GroupPartitionId>>> 
topPartRef =
+            new AtomicReference<>();
+
+        /** Processed partitions count. It is always updated from the same 
thread. */
+        volatile long processedCnt = 0;

Review comment:
       Plz use `java.util.concurrent.atomic.AtomicLongFieldUpdater`

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
##########
@@ -5519,66 +5520,89 @@ private void restorePartitionStates(
             if (log.isInfoEnabled())
                 log.info("Restoring partition state for local groups.");
 
-            AtomicLong totalProcessed = new AtomicLong();
-
             AtomicReference<IgniteCheckedException> restoreStateError = new 
AtomicReference<>();
 
             ExecutorService sysPool = ctx.pools().getSystemExecutorService();
 
-            CountDownLatch completionLatch = new 
CountDownLatch(forGroups.size());
+            final int totalPart = forGroups.stream().mapToInt(grpCtx -> 
grpCtx.affinity().partitions()).sum();
 
-            AtomicReference<SortedSet<T3<Long, Long, GroupPartitionId>>> 
topPartRef = new AtomicReference<>();
+            // Group id -> completed partitions counter.
+            Map<Integer, AtomicInteger> grps = new ConcurrentHashMap<>();
 
-            long totalPart = forGroups.stream().mapToLong(grpCtx -> 
grpCtx.affinity().partitions()).sum();
+            CountDownLatch completionLatch = new CountDownLatch(totalPart);
 
-            for (CacheGroupContext grp : forGroups) {
-                sysPool.execute(() -> {
-                    try {
-                        Map<Integer, Long> processed = 
grp.offheap().restorePartitionStates(partStates);
+            Collection<RestorePartitionStateThreadContext> threadCtxs = new 
CopyOnWriteArrayList<>();
 
-                        totalProcessed.addAndGet(processed.size());
+            ThreadLocal<RestorePartitionStateThreadContext> threadLocalCtx =
+                ThreadLocal.withInitial(() -> {
+                    RestorePartitionStateThreadContext threadCtx = new 
RestorePartitionStateThreadContext();
 
-                        if (log.isInfoEnabled()) {
-                            TreeSet<T3<Long, Long, GroupPartitionId>> top =
-                                new TreeSet<>(processedPartitionComparator());
+                    threadCtxs.add(threadCtx);
 
-                            long ts = System.currentTimeMillis();
+                    return threadCtx;
+                });
 
-                            for (Map.Entry<Integer, Long> e : 
processed.entrySet()) {
-                                top.add(new T3<>(e.getValue(), ts, new 
GroupPartitionId(grp.groupId(), e.getKey())));
+            final int topPartRefLimit = 5;
 
-                                trimToSize(top, 5);
-                            }
+            for (CacheGroupContext grpCtx : forGroups) {
+                for (int i = 0; i < grpCtx.affinity().partitions(); i++) {
+                    final int partId = i;
+
+                    sysPool.execute(() -> {
+                        GroupPartitionId grpPartId = new 
GroupPartitionId(grpCtx.groupId(), partId);
+
+                        try {
+                            long time = 
grpCtx.offheap().restoreStateOfPartition(grpPartId.getPartitionId(),
+                                partStates.get(grpPartId));
+
+                            if (log.isInfoEnabled()) {
+                                T3<Long, Long, GroupPartitionId> curPart = new 
T3<>(time, U.currentTimeMillis(), grpPartId);
+
+                                RestorePartitionStateThreadContext threadCtx = 
threadLocalCtx.get();
+
+                                Comparator<T3<Long, Long, GroupPartitionId>> 
cmp = processedPartitionComparator();
+
+                                if (threadCtx.topPartRef.get() == null ||

Review comment:
       It may not work correctly in a multi-thread.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to