tkalkirill commented on a change in pull request #9243:
URL: https://github.com/apache/ignite/pull/9243#discussion_r686525840



##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
##########
@@ -5524,58 +5527,91 @@ private void restorePartitionStates(
 
             ExecutorService sysPool = ctx.pools().getSystemExecutorService();
 
-            CountDownLatch completionLatch = new 
CountDownLatch(forGroups.size());
-
             AtomicReference<SortedSet<T3<Long, Long, GroupPartitionId>>> 
topPartRef = new AtomicReference<>();
 
             long totalPart = forGroups.stream().mapToLong(grpCtx -> 
grpCtx.affinity().partitions()).sum();
 
-            for (CacheGroupContext grp : forGroups) {
-                sysPool.execute(() -> {
-                    try {
-                        Map<Integer, Long> processed = 
grp.offheap().restorePartitionStates(partStates);
+            int poolSize = ctx.config().getSystemThreadPoolSize();
 
-                        totalProcessed.addAndGet(processed.size());
+            List<Queue<GroupPartitionId>> partIds = new ArrayList<>(poolSize);
 
-                        if (log.isInfoEnabled()) {
-                            TreeSet<T3<Long, Long, GroupPartitionId>> top =
-                                new TreeSet<>(processedPartitionComparator());
+            for (int i = 0; i < poolSize; i++)
+                partIds.add(new LinkedList<>());
 
-                            long ts = System.currentTimeMillis();
+            int cntr = 0;
 
-                            for (Map.Entry<Integer, Long> e : 
processed.entrySet()) {
-                                top.add(new T3<>(e.getValue(), ts, new 
GroupPartitionId(grp.groupId(), e.getKey())));
+            // Group id -> completed partitions counter
+            Map<Integer, AtomicInteger> grps = new HashMap<>();
 
-                                trimToSize(top, 5);
-                            }
+            for (CacheGroupContext ctx : forGroups) {
+                grps.put(ctx.groupId(), new AtomicInteger());
 
-                            topPartRef.updateAndGet(top0 -> {
-                                if (top0 == null)
-                                    return top;
+                for (int i = 0; i < ctx.affinity().partitions(); i++)
+                    partIds.get(cntr++ % poolSize).add(new 
GroupPartitionId(ctx.groupId(), i));
+            }
+
+            CountDownLatch completionLatch = new CountDownLatch(cntr);
+
+            final int topPartRefLimit = 5;
+
+            for (int i = 0; i < poolSize; i++) {
+                final int batchIdx = i;
+
+                sysPool.execute(() -> {
+                    Queue<GroupPartitionId> batch = partIds.get(batchIdx);
+
+                    SortedSet<T3<Long, Long, GroupPartitionId>> top =
+                        new 
ConcurrentSkipListSet<>(processedPartitionComparator());

Review comment:
       Concurrent collection for a single thread is not needed.

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
##########
@@ -5524,58 +5527,91 @@ private void restorePartitionStates(
 
             ExecutorService sysPool = ctx.pools().getSystemExecutorService();
 
-            CountDownLatch completionLatch = new 
CountDownLatch(forGroups.size());
-
             AtomicReference<SortedSet<T3<Long, Long, GroupPartitionId>>> 
topPartRef = new AtomicReference<>();
 
             long totalPart = forGroups.stream().mapToLong(grpCtx -> 
grpCtx.affinity().partitions()).sum();
 
-            for (CacheGroupContext grp : forGroups) {
-                sysPool.execute(() -> {
-                    try {
-                        Map<Integer, Long> processed = 
grp.offheap().restorePartitionStates(partStates);
+            int poolSize = ctx.config().getSystemThreadPoolSize();
 
-                        totalProcessed.addAndGet(processed.size());
+            List<Queue<GroupPartitionId>> partIds = new ArrayList<>(poolSize);
 
-                        if (log.isInfoEnabled()) {
-                            TreeSet<T3<Long, Long, GroupPartitionId>> top =
-                                new TreeSet<>(processedPartitionComparator());
+            for (int i = 0; i < poolSize; i++)
+                partIds.add(new LinkedList<>());
 
-                            long ts = System.currentTimeMillis();
+            int cntr = 0;
 
-                            for (Map.Entry<Integer, Long> e : 
processed.entrySet()) {
-                                top.add(new T3<>(e.getValue(), ts, new 
GroupPartitionId(grp.groupId(), e.getKey())));
+            // Group id -> completed partitions counter

Review comment:
       Forgot the '.' at the end.

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
##########
@@ -5524,27 +5526,85 @@ private void restorePartitionStates(
 
             ExecutorService sysPool = ctx.pools().getSystemExecutorService();
 
-            CountDownLatch completionLatch = new 
CountDownLatch(forGroups.size());
-
             AtomicReference<SortedSet<T3<Long, Long, GroupPartitionId>>> 
topPartRef = new AtomicReference<>();
 
             long totalPart = forGroups.stream().mapToLong(grpCtx -> 
grpCtx.affinity().partitions()).sum();
 
-            for (CacheGroupContext grp : forGroups) {
-                sysPool.execute(() -> {
+            int poolSize = ctx.config().getSystemThreadPoolSize();
+
+            List<List<GroupPartitionId>> partIds = new ArrayList<>(poolSize);
+
+            for (int i = 0; i < poolSize; i++)
+                partIds.add(new LinkedList<>());

Review comment:
       Better use the `java.util.ArrayDeque`.

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
##########
@@ -615,131 +615,140 @@ else if (needSnapshot)
     }
 
     /** {@inheritDoc} */
-    @Override public Map<Integer, Long> restorePartitionStates(
-        Map<GroupPartitionId, Integer> partRecoveryStates
-    ) throws IgniteCheckedException {
+    @Override public long restoreStateOfPartition(int p, @Nullable Integer 
recoveryState) throws IgniteCheckedException {
         if (grp.isLocal() || !grp.affinityNode() || 
!grp.dataRegion().config().isPersistenceEnabled()
             || partitionStatesRestored)
-            return Collections.emptyMap();
-
-        Map<Integer, Long> processed = new HashMap<>();
+            return 0;
 
         PageMemoryEx pageMem = (PageMemoryEx)grp.dataRegion().pageMemory();
 
-        for (int p = 0; p < grp.affinity().partitions(); p++) {
-            Integer recoverState = partRecoveryStates.get(new 
GroupPartitionId(grp.groupId(), p));
-
-            long startTime = U.currentTimeMillis();
+        long startTime = U.currentTimeMillis();
 
-            if (log.isDebugEnabled())
-                log.debug("Started restoring partition state [grp=" + 
grp.cacheOrGroupName() + ", p=" + p + ']');
-
-            if (ctx.pageStore().exists(grp.groupId(), p)) {
-                ctx.pageStore().ensure(grp.groupId(), p);
+        long res = 0;
 
-                if (ctx.pageStore().pages(grp.groupId(), p) <= 1) {
-                    if (log.isDebugEnabled()) {
-                        log.debug("Skipping partition on recovery (pages less 
than 1) " +
-                            "[grp=" + grp.cacheOrGroupName() + ", p=" + p + 
']');
-                    }
+        if (log.isDebugEnabled())
+            log.debug("Started restoring partition state [grp=" + 
grp.cacheOrGroupName() + ", p=" + p + ']');
 
-                    continue;
-                }
+        if (ctx.pageStore().exists(grp.groupId(), p)) {
+            ctx.pageStore().ensure(grp.groupId(), p);
 
+            if (ctx.pageStore().pages(grp.groupId(), p) <= 1) {
                 if (log.isDebugEnabled()) {
-                    log.debug("Creating partition on recovery (exists in page 
store) " +
+                    log.debug("Skipping partition on recovery (pages less than 
1) " +

Review comment:
       less or equals

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
##########
@@ -5524,58 +5527,91 @@ private void restorePartitionStates(
 
             ExecutorService sysPool = ctx.pools().getSystemExecutorService();
 
-            CountDownLatch completionLatch = new 
CountDownLatch(forGroups.size());
-
             AtomicReference<SortedSet<T3<Long, Long, GroupPartitionId>>> 
topPartRef = new AtomicReference<>();
 
             long totalPart = forGroups.stream().mapToLong(grpCtx -> 
grpCtx.affinity().partitions()).sum();
 
-            for (CacheGroupContext grp : forGroups) {
-                sysPool.execute(() -> {
-                    try {
-                        Map<Integer, Long> processed = 
grp.offheap().restorePartitionStates(partStates);
+            int poolSize = ctx.config().getSystemThreadPoolSize();
 
-                        totalProcessed.addAndGet(processed.size());
+            List<Queue<GroupPartitionId>> partIds = new ArrayList<>(poolSize);
 
-                        if (log.isInfoEnabled()) {
-                            TreeSet<T3<Long, Long, GroupPartitionId>> top =
-                                new TreeSet<>(processedPartitionComparator());
+            for (int i = 0; i < poolSize; i++)
+                partIds.add(new LinkedList<>());
 
-                            long ts = System.currentTimeMillis();
+            int cntr = 0;
 
-                            for (Map.Entry<Integer, Long> e : 
processed.entrySet()) {
-                                top.add(new T3<>(e.getValue(), ts, new 
GroupPartitionId(grp.groupId(), e.getKey())));
+            // Group id -> completed partitions counter
+            Map<Integer, AtomicInteger> grps = new HashMap<>();
 
-                                trimToSize(top, 5);
-                            }
+            for (CacheGroupContext ctx : forGroups) {
+                grps.put(ctx.groupId(), new AtomicInteger());
 
-                            topPartRef.updateAndGet(top0 -> {
-                                if (top0 == null)
-                                    return top;
+                for (int i = 0; i < ctx.affinity().partitions(); i++)
+                    partIds.get(cntr++ % poolSize).add(new 
GroupPartitionId(ctx.groupId(), i));
+            }
+
+            CountDownLatch completionLatch = new CountDownLatch(cntr);
+
+            final int topPartRefLimit = 5;
+
+            for (int i = 0; i < poolSize; i++) {
+                final int batchIdx = i;
+
+                sysPool.execute(() -> {
+                    Queue<GroupPartitionId> batch = partIds.get(batchIdx);
+
+                    SortedSet<T3<Long, Long, GroupPartitionId>> top =
+                        new 
ConcurrentSkipListSet<>(processedPartitionComparator());
+
+                    while (!batch.isEmpty()) {
+                        GroupPartitionId grpPartId = batch.poll();
+
+                        CacheGroupContext grpCtx = 
ctx.cache().cacheGroup(grpPartId.getGroupId());
+
+                        try {
+                            long time = 
grpCtx.offheap().restoreStateOfPartition(grpPartId.getPartitionId(),
+                                partStates.get(grpPartId));
 
-                                for (T3<Long, Long, GroupPartitionId> t2 : 
top0) {
-                                    top.add(t2);
+                            top.add(new T3<>(time, U.currentTimeMillis(), 
grpPartId));

Review comment:
       Only if `log.isInfoEnabled()`

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
##########
@@ -5524,58 +5527,91 @@ private void restorePartitionStates(
 
             ExecutorService sysPool = ctx.pools().getSystemExecutorService();
 
-            CountDownLatch completionLatch = new 
CountDownLatch(forGroups.size());
-
             AtomicReference<SortedSet<T3<Long, Long, GroupPartitionId>>> 
topPartRef = new AtomicReference<>();
 
             long totalPart = forGroups.stream().mapToLong(grpCtx -> 
grpCtx.affinity().partitions()).sum();
 
-            for (CacheGroupContext grp : forGroups) {
-                sysPool.execute(() -> {
-                    try {
-                        Map<Integer, Long> processed = 
grp.offheap().restorePartitionStates(partStates);
+            int poolSize = ctx.config().getSystemThreadPoolSize();
 
-                        totalProcessed.addAndGet(processed.size());
+            List<Queue<GroupPartitionId>> partIds = new ArrayList<>(poolSize);
 
-                        if (log.isInfoEnabled()) {
-                            TreeSet<T3<Long, Long, GroupPartitionId>> top =
-                                new TreeSet<>(processedPartitionComparator());
+            for (int i = 0; i < poolSize; i++)
+                partIds.add(new LinkedList<>());
 
-                            long ts = System.currentTimeMillis();
+            int cntr = 0;
 
-                            for (Map.Entry<Integer, Long> e : 
processed.entrySet()) {
-                                top.add(new T3<>(e.getValue(), ts, new 
GroupPartitionId(grp.groupId(), e.getKey())));
+            // Group id -> completed partitions counter
+            Map<Integer, AtomicInteger> grps = new HashMap<>();
 
-                                trimToSize(top, 5);
-                            }
+            for (CacheGroupContext ctx : forGroups) {
+                grps.put(ctx.groupId(), new AtomicInteger());
 
-                            topPartRef.updateAndGet(top0 -> {
-                                if (top0 == null)
-                                    return top;
+                for (int i = 0; i < ctx.affinity().partitions(); i++)
+                    partIds.get(cntr++ % poolSize).add(new 
GroupPartitionId(ctx.groupId(), i));
+            }
+
+            CountDownLatch completionLatch = new CountDownLatch(cntr);
+
+            final int topPartRefLimit = 5;
+
+            for (int i = 0; i < poolSize; i++) {
+                final int batchIdx = i;
+
+                sysPool.execute(() -> {
+                    Queue<GroupPartitionId> batch = partIds.get(batchIdx);
+
+                    SortedSet<T3<Long, Long, GroupPartitionId>> top =
+                        new 
ConcurrentSkipListSet<>(processedPartitionComparator());
+
+                    while (!batch.isEmpty()) {

Review comment:
       You can replace predicate to `(grpPartId = batch.poll()) != null`

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
##########
@@ -5524,58 +5527,91 @@ private void restorePartitionStates(
 
             ExecutorService sysPool = ctx.pools().getSystemExecutorService();
 
-            CountDownLatch completionLatch = new 
CountDownLatch(forGroups.size());
-
             AtomicReference<SortedSet<T3<Long, Long, GroupPartitionId>>> 
topPartRef = new AtomicReference<>();
 
             long totalPart = forGroups.stream().mapToLong(grpCtx -> 
grpCtx.affinity().partitions()).sum();
 
-            for (CacheGroupContext grp : forGroups) {
-                sysPool.execute(() -> {
-                    try {
-                        Map<Integer, Long> processed = 
grp.offheap().restorePartitionStates(partStates);
+            int poolSize = ctx.config().getSystemThreadPoolSize();
 
-                        totalProcessed.addAndGet(processed.size());
+            List<Queue<GroupPartitionId>> partIds = new ArrayList<>(poolSize);
 
-                        if (log.isInfoEnabled()) {
-                            TreeSet<T3<Long, Long, GroupPartitionId>> top =
-                                new TreeSet<>(processedPartitionComparator());
+            for (int i = 0; i < poolSize; i++)
+                partIds.add(new LinkedList<>());
 
-                            long ts = System.currentTimeMillis();
+            int cntr = 0;
 
-                            for (Map.Entry<Integer, Long> e : 
processed.entrySet()) {
-                                top.add(new T3<>(e.getValue(), ts, new 
GroupPartitionId(grp.groupId(), e.getKey())));
+            // Group id -> completed partitions counter
+            Map<Integer, AtomicInteger> grps = new HashMap<>();

Review comment:
       `totalProcessed` can be replaced with the sum of all counters in this 
map.

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
##########
@@ -5524,58 +5527,91 @@ private void restorePartitionStates(
 
             ExecutorService sysPool = ctx.pools().getSystemExecutorService();
 
-            CountDownLatch completionLatch = new 
CountDownLatch(forGroups.size());
-
             AtomicReference<SortedSet<T3<Long, Long, GroupPartitionId>>> 
topPartRef = new AtomicReference<>();
 
             long totalPart = forGroups.stream().mapToLong(grpCtx -> 
grpCtx.affinity().partitions()).sum();
 
-            for (CacheGroupContext grp : forGroups) {
-                sysPool.execute(() -> {
-                    try {
-                        Map<Integer, Long> processed = 
grp.offheap().restorePartitionStates(partStates);
+            int poolSize = ctx.config().getSystemThreadPoolSize();
 
-                        totalProcessed.addAndGet(processed.size());
+            List<Queue<GroupPartitionId>> partIds = new ArrayList<>(poolSize);
 
-                        if (log.isInfoEnabled()) {
-                            TreeSet<T3<Long, Long, GroupPartitionId>> top =
-                                new TreeSet<>(processedPartitionComparator());
+            for (int i = 0; i < poolSize; i++)
+                partIds.add(new LinkedList<>());
 
-                            long ts = System.currentTimeMillis();
+            int cntr = 0;
 
-                            for (Map.Entry<Integer, Long> e : 
processed.entrySet()) {
-                                top.add(new T3<>(e.getValue(), ts, new 
GroupPartitionId(grp.groupId(), e.getKey())));
+            // Group id -> completed partitions counter
+            Map<Integer, AtomicInteger> grps = new HashMap<>();
 
-                                trimToSize(top, 5);
-                            }
+            for (CacheGroupContext ctx : forGroups) {
+                grps.put(ctx.groupId(), new AtomicInteger());
 
-                            topPartRef.updateAndGet(top0 -> {
-                                if (top0 == null)
-                                    return top;
+                for (int i = 0; i < ctx.affinity().partitions(); i++)
+                    partIds.get(cntr++ % poolSize).add(new 
GroupPartitionId(ctx.groupId(), i));
+            }
+
+            CountDownLatch completionLatch = new CountDownLatch(cntr);
+
+            final int topPartRefLimit = 5;
+
+            for (int i = 0; i < poolSize; i++) {
+                final int batchIdx = i;
+
+                sysPool.execute(() -> {
+                    Queue<GroupPartitionId> batch = partIds.get(batchIdx);
+
+                    SortedSet<T3<Long, Long, GroupPartitionId>> top =
+                        new 
ConcurrentSkipListSet<>(processedPartitionComparator());
+
+                    while (!batch.isEmpty()) {
+                        GroupPartitionId grpPartId = batch.poll();
+
+                        CacheGroupContext grpCtx = 
ctx.cache().cacheGroup(grpPartId.getGroupId());
+
+                        try {
+                            long time = 
grpCtx.offheap().restoreStateOfPartition(grpPartId.getPartitionId(),
+                                partStates.get(grpPartId));
 
-                                for (T3<Long, Long, GroupPartitionId> t2 : 
top0) {
-                                    top.add(t2);
+                            top.add(new T3<>(time, U.currentTimeMillis(), 
grpPartId));
 
-                                    trimToSize(top, 5);
-                                }
+                            trimToSize(top, topPartRefLimit);

Review comment:
       Only if `log.isInfoEnabled()`

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
##########
@@ -5524,58 +5527,91 @@ private void restorePartitionStates(
 
             ExecutorService sysPool = ctx.pools().getSystemExecutorService();
 
-            CountDownLatch completionLatch = new 
CountDownLatch(forGroups.size());
-
             AtomicReference<SortedSet<T3<Long, Long, GroupPartitionId>>> 
topPartRef = new AtomicReference<>();
 
             long totalPart = forGroups.stream().mapToLong(grpCtx -> 
grpCtx.affinity().partitions()).sum();
 
-            for (CacheGroupContext grp : forGroups) {
-                sysPool.execute(() -> {
-                    try {
-                        Map<Integer, Long> processed = 
grp.offheap().restorePartitionStates(partStates);
+            int poolSize = ctx.config().getSystemThreadPoolSize();
 
-                        totalProcessed.addAndGet(processed.size());
+            List<Queue<GroupPartitionId>> partIds = new ArrayList<>(poolSize);
 
-                        if (log.isInfoEnabled()) {
-                            TreeSet<T3<Long, Long, GroupPartitionId>> top =
-                                new TreeSet<>(processedPartitionComparator());
+            for (int i = 0; i < poolSize; i++)
+                partIds.add(new LinkedList<>());
 
-                            long ts = System.currentTimeMillis();
+            int cntr = 0;
 
-                            for (Map.Entry<Integer, Long> e : 
processed.entrySet()) {
-                                top.add(new T3<>(e.getValue(), ts, new 
GroupPartitionId(grp.groupId(), e.getKey())));
+            // Group id -> completed partitions counter
+            Map<Integer, AtomicInteger> grps = new HashMap<>();
 
-                                trimToSize(top, 5);
-                            }
+            for (CacheGroupContext ctx : forGroups) {
+                grps.put(ctx.groupId(), new AtomicInteger());
 
-                            topPartRef.updateAndGet(top0 -> {
-                                if (top0 == null)
-                                    return top;
+                for (int i = 0; i < ctx.affinity().partitions(); i++)
+                    partIds.get(cntr++ % poolSize).add(new 
GroupPartitionId(ctx.groupId(), i));
+            }
+
+            CountDownLatch completionLatch = new CountDownLatch(cntr);
+
+            final int topPartRefLimit = 5;
+
+            for (int i = 0; i < poolSize; i++) {
+                final int batchIdx = i;
+
+                sysPool.execute(() -> {
+                    Queue<GroupPartitionId> batch = partIds.get(batchIdx);
+
+                    SortedSet<T3<Long, Long, GroupPartitionId>> top =
+                        new 
ConcurrentSkipListSet<>(processedPartitionComparator());
+
+                    while (!batch.isEmpty()) {
+                        GroupPartitionId grpPartId = batch.poll();
+
+                        CacheGroupContext grpCtx = 
ctx.cache().cacheGroup(grpPartId.getGroupId());
+
+                        try {
+                            long time = 
grpCtx.offheap().restoreStateOfPartition(grpPartId.getPartitionId(),
+                                partStates.get(grpPartId));
 
-                                for (T3<Long, Long, GroupPartitionId> t2 : 
top0) {
-                                    top.add(t2);
+                            top.add(new T3<>(time, U.currentTimeMillis(), 
grpPartId));
 
-                                    trimToSize(top, 5);
-                                }
+                            trimToSize(top, topPartRefLimit);
 
-                                return top;
-                            });
+                            totalProcessed.incrementAndGet();
                         }
-                    }
-                    catch (IgniteCheckedException | RuntimeException | Error 
e) {
-                        U.error(log, "Failed to restore partition state for " +
-                            "groupName=" + grp.name() + " groupId=" + 
grp.groupId(), e);
+                        catch (IgniteCheckedException | RuntimeException | 
Error e) {
+                            U.error(log, "Failed to restore partition state 
for " +
+                                "groupName=" + grpCtx.name() + " groupId=" + 
grpCtx.groupId(), e);
 
-                        restoreStateError.compareAndSet(
-                            null,
-                            e instanceof IgniteCheckedException
+                            IgniteCheckedException ex = e instanceof 
IgniteCheckedException
                                 ? ((IgniteCheckedException)e)
-                                : new IgniteCheckedException(e)
-                        );
-                    }
-                    finally {
-                        completionLatch.countDown();
+                                : new IgniteCheckedException(e);
+
+                            if (!restoreStateError.compareAndSet(null, ex))
+                                restoreStateError.get().addSuppressed(ex);
+                        }
+                        finally {
+                            completionLatch.countDown();
+
+                            AtomicInteger completedCntr = 
grps.get(grpPartId.getGroupId());
+
+                            if (completedCntr.incrementAndGet() == 
grpCtx.affinity().partitions())
+                                
grpCtx.offheap().confirmPartitionStatesRestored();
+
+                            if (log.isInfoEnabled()) {
+                                topPartRef.updateAndGet(top0 -> {

Review comment:
       This code is executed for each partition, then the local collection 
(`SortedSet<T3<Long, Long, GroupPartitionId>> top`) is not needed, I think this 
will be a place of high contention, perhaps you need to think about how to 
reduce / remove it.
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to