tkalkirill commented on a change in pull request #9327:
URL: https://github.com/apache/ignite/pull/9327#discussion_r710129931



##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
##########
@@ -89,16 +88,28 @@
      */
     public void stop();
 
+    /**
+     * Pre-create single partition that resides in page memory or WAL and 
restores their state.
+     *
+     * @param p Partition id.
+     * @param recoveryState Partition recovery state.
+     * @return Processing time in millis.
+     * @throws IgniteCheckedException If failed.
+     */
+    long restoreStateOfPartition(int p, @Nullable Integer recoveryState) 
throws IgniteCheckedException;

Review comment:
       Maybe a better name: `restorePartitionState`

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
##########
@@ -5519,66 +5519,76 @@ private void restorePartitionStates(
             if (log.isInfoEnabled())
                 log.info("Restoring partition state for local groups.");
 
-            AtomicLong totalProcessed = new AtomicLong();
-
             AtomicReference<IgniteCheckedException> restoreStateError = new 
AtomicReference<>();
 
             ExecutorService sysPool = ctx.pools().getSystemExecutorService();
 
-            CountDownLatch completionLatch = new 
CountDownLatch(forGroups.size());
+            final int totalPart = forGroups.stream().mapToInt(grpCtx -> 
grpCtx.affinity().partitions()).sum();
 
-            AtomicReference<SortedSet<T3<Long, Long, GroupPartitionId>>> 
topPartRef = new AtomicReference<>();
+            CountDownLatch completionLatch = new CountDownLatch(totalPart);
 
-            long totalPart = forGroups.stream().mapToLong(grpCtx -> 
grpCtx.affinity().partitions()).sum();
+            Map<Thread, RestorePartitionStateThreadContext> threadCtxs = new 
ConcurrentHashMap<>();
 
-            for (CacheGroupContext grp : forGroups) {
-                sysPool.execute(() -> {
-                    try {
-                        Map<Integer, Long> processed = 
grp.offheap().restorePartitionStates(partStates);
+            final int topPartRefLimit = 5;
 
-                        totalProcessed.addAndGet(processed.size());
+            for (CacheGroupContext grpCtx : forGroups) {
+                for (int i = 0; i < grpCtx.affinity().partitions(); i++) {
+                    final int partId = i;
 
-                        if (log.isInfoEnabled()) {
-                            TreeSet<T3<Long, Long, GroupPartitionId>> top =
-                                new TreeSet<>(processedPartitionComparator());
+                    sysPool.execute(() -> {
+                        GroupPartitionId grpPartId = new 
GroupPartitionId(grpCtx.groupId(), partId);
 
-                            long ts = System.currentTimeMillis();
+                        try {
+                            long time = 
grpCtx.offheap().restoreStateOfPartition(grpPartId.getPartitionId(),

Review comment:
       You can use `partId` instead `grpPartId.getPartitionId()`

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
##########
@@ -5519,66 +5519,76 @@ private void restorePartitionStates(
             if (log.isInfoEnabled())
                 log.info("Restoring partition state for local groups.");
 
-            AtomicLong totalProcessed = new AtomicLong();
-
             AtomicReference<IgniteCheckedException> restoreStateError = new 
AtomicReference<>();
 
             ExecutorService sysPool = ctx.pools().getSystemExecutorService();
 
-            CountDownLatch completionLatch = new 
CountDownLatch(forGroups.size());
+            final int totalPart = forGroups.stream().mapToInt(grpCtx -> 
grpCtx.affinity().partitions()).sum();
 
-            AtomicReference<SortedSet<T3<Long, Long, GroupPartitionId>>> 
topPartRef = new AtomicReference<>();
+            CountDownLatch completionLatch = new CountDownLatch(totalPart);
 
-            long totalPart = forGroups.stream().mapToLong(grpCtx -> 
grpCtx.affinity().partitions()).sum();
+            Map<Thread, RestorePartitionStateThreadContext> threadCtxs = new 
ConcurrentHashMap<>();
 
-            for (CacheGroupContext grp : forGroups) {
-                sysPool.execute(() -> {
-                    try {
-                        Map<Integer, Long> processed = 
grp.offheap().restorePartitionStates(partStates);
+            final int topPartRefLimit = 5;
 
-                        totalProcessed.addAndGet(processed.size());
+            for (CacheGroupContext grpCtx : forGroups) {
+                for (int i = 0; i < grpCtx.affinity().partitions(); i++) {
+                    final int partId = i;
 
-                        if (log.isInfoEnabled()) {
-                            TreeSet<T3<Long, Long, GroupPartitionId>> top =
-                                new TreeSet<>(processedPartitionComparator());
+                    sysPool.execute(() -> {
+                        GroupPartitionId grpPartId = new 
GroupPartitionId(grpCtx.groupId(), partId);
 
-                            long ts = System.currentTimeMillis();
+                        try {
+                            long time = 
grpCtx.offheap().restoreStateOfPartition(grpPartId.getPartitionId(),
+                                partStates.get(grpPartId));
 
-                            for (Map.Entry<Integer, Long> e : 
processed.entrySet()) {
-                                top.add(new T3<>(e.getValue(), ts, new 
GroupPartitionId(grp.groupId(), e.getKey())));
+                            if (log.isInfoEnabled()) {
+                                T3<Long, Long, GroupPartitionId> curPart = new 
T3<>(time, U.currentTimeMillis(), grpPartId);
 
-                                trimToSize(top, 5);
-                            }
+                                RestorePartitionStateThreadContext threadCtx = 
threadCtxs.computeIfAbsent(
+                                    Thread.currentThread(),
+                                    t -> new 
RestorePartitionStateThreadContext()
+                                );
 
-                            topPartRef.updateAndGet(top0 -> {
-                                if (top0 == null)
-                                    return top;
+                                Comparator<T3<Long, Long, GroupPartitionId>> 
cmp = processedPartitionComparator();
 
-                                for (T3<Long, Long, GroupPartitionId> t2 : 
top0) {
-                                    top.add(t2);
+                                threadCtx.topPartRef.updateAndGet(top0 -> {
+                                    if (threadCtx.topPartRef.get() == null ||
+                                        
cmp.compare(threadCtx.topPartRef.get().last(), curPart) < 0) {

Review comment:
       `threadCtx.topPartRef.get().last()` -> `prev.last()`

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
##########
@@ -5519,66 +5519,76 @@ private void restorePartitionStates(
             if (log.isInfoEnabled())
                 log.info("Restoring partition state for local groups.");
 
-            AtomicLong totalProcessed = new AtomicLong();
-
             AtomicReference<IgniteCheckedException> restoreStateError = new 
AtomicReference<>();
 
             ExecutorService sysPool = ctx.pools().getSystemExecutorService();
 
-            CountDownLatch completionLatch = new 
CountDownLatch(forGroups.size());
+            final int totalPart = forGroups.stream().mapToInt(grpCtx -> 
grpCtx.affinity().partitions()).sum();
 
-            AtomicReference<SortedSet<T3<Long, Long, GroupPartitionId>>> 
topPartRef = new AtomicReference<>();
+            CountDownLatch completionLatch = new CountDownLatch(totalPart);
 
-            long totalPart = forGroups.stream().mapToLong(grpCtx -> 
grpCtx.affinity().partitions()).sum();
+            Map<Thread, RestorePartitionStateThreadContext> threadCtxs = new 
ConcurrentHashMap<>();
 
-            for (CacheGroupContext grp : forGroups) {
-                sysPool.execute(() -> {
-                    try {
-                        Map<Integer, Long> processed = 
grp.offheap().restorePartitionStates(partStates);
+            final int topPartRefLimit = 5;
 
-                        totalProcessed.addAndGet(processed.size());
+            for (CacheGroupContext grpCtx : forGroups) {
+                for (int i = 0; i < grpCtx.affinity().partitions(); i++) {
+                    final int partId = i;
 
-                        if (log.isInfoEnabled()) {
-                            TreeSet<T3<Long, Long, GroupPartitionId>> top =
-                                new TreeSet<>(processedPartitionComparator());
+                    sysPool.execute(() -> {
+                        GroupPartitionId grpPartId = new 
GroupPartitionId(grpCtx.groupId(), partId);
 
-                            long ts = System.currentTimeMillis();
+                        try {
+                            long time = 
grpCtx.offheap().restoreStateOfPartition(grpPartId.getPartitionId(),
+                                partStates.get(grpPartId));
 
-                            for (Map.Entry<Integer, Long> e : 
processed.entrySet()) {
-                                top.add(new T3<>(e.getValue(), ts, new 
GroupPartitionId(grp.groupId(), e.getKey())));
+                            if (log.isInfoEnabled()) {
+                                T3<Long, Long, GroupPartitionId> curPart = new 
T3<>(time, U.currentTimeMillis(), grpPartId);
 
-                                trimToSize(top, 5);
-                            }
+                                RestorePartitionStateThreadContext threadCtx = 
threadCtxs.computeIfAbsent(
+                                    Thread.currentThread(),
+                                    t -> new 
RestorePartitionStateThreadContext()
+                                );
 
-                            topPartRef.updateAndGet(top0 -> {
-                                if (top0 == null)
-                                    return top;
+                                Comparator<T3<Long, Long, GroupPartitionId>> 
cmp = processedPartitionComparator();
 
-                                for (T3<Long, Long, GroupPartitionId> t2 : 
top0) {
-                                    top.add(t2);
+                                threadCtx.topPartRef.updateAndGet(top0 -> {
+                                    if (threadCtx.topPartRef.get() == null ||
+                                        
cmp.compare(threadCtx.topPartRef.get().last(), curPart) < 0) {
+                                        SortedSet<T3<Long, Long, 
GroupPartitionId>> top = new TreeSet<>(cmp);
 
-                                    trimToSize(top, 5);
-                                }
+                                        top.add(curPart);
 
-                                return top;
-                            });
+                                        if (top0 != null)
+                                            top.addAll(top0);
+
+                                        trimToSize(top, topPartRefLimit);
+
+                                        return top;
+                                    }
+                                    else
+                                        return top0;
+                                });
+
+                                
RestorePartitionStateThreadContext.PROCESSED_CNT_UPD.incrementAndGet(threadCtx);

Review comment:
       Let's make it a `RestorePartitionStateThreadContext` method.

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
##########
@@ -5519,66 +5519,76 @@ private void restorePartitionStates(
             if (log.isInfoEnabled())
                 log.info("Restoring partition state for local groups.");
 
-            AtomicLong totalProcessed = new AtomicLong();
-
             AtomicReference<IgniteCheckedException> restoreStateError = new 
AtomicReference<>();
 
             ExecutorService sysPool = ctx.pools().getSystemExecutorService();
 
-            CountDownLatch completionLatch = new 
CountDownLatch(forGroups.size());
+            final int totalPart = forGroups.stream().mapToInt(grpCtx -> 
grpCtx.affinity().partitions()).sum();
 
-            AtomicReference<SortedSet<T3<Long, Long, GroupPartitionId>>> 
topPartRef = new AtomicReference<>();
+            CountDownLatch completionLatch = new CountDownLatch(totalPart);
 
-            long totalPart = forGroups.stream().mapToLong(grpCtx -> 
grpCtx.affinity().partitions()).sum();
+            Map<Thread, RestorePartitionStateThreadContext> threadCtxs = new 
ConcurrentHashMap<>();
 
-            for (CacheGroupContext grp : forGroups) {
-                sysPool.execute(() -> {
-                    try {
-                        Map<Integer, Long> processed = 
grp.offheap().restorePartitionStates(partStates);
+            final int topPartRefLimit = 5;
 
-                        totalProcessed.addAndGet(processed.size());
+            for (CacheGroupContext grpCtx : forGroups) {
+                for (int i = 0; i < grpCtx.affinity().partitions(); i++) {
+                    final int partId = i;
 
-                        if (log.isInfoEnabled()) {
-                            TreeSet<T3<Long, Long, GroupPartitionId>> top =
-                                new TreeSet<>(processedPartitionComparator());
+                    sysPool.execute(() -> {
+                        GroupPartitionId grpPartId = new 
GroupPartitionId(grpCtx.groupId(), partId);
 
-                            long ts = System.currentTimeMillis();
+                        try {
+                            long time = 
grpCtx.offheap().restoreStateOfPartition(grpPartId.getPartitionId(),
+                                partStates.get(grpPartId));
 
-                            for (Map.Entry<Integer, Long> e : 
processed.entrySet()) {
-                                top.add(new T3<>(e.getValue(), ts, new 
GroupPartitionId(grp.groupId(), e.getKey())));
+                            if (log.isInfoEnabled()) {
+                                T3<Long, Long, GroupPartitionId> curPart = new 
T3<>(time, U.currentTimeMillis(), grpPartId);
 
-                                trimToSize(top, 5);
-                            }
+                                RestorePartitionStateThreadContext threadCtx = 
threadCtxs.computeIfAbsent(
+                                    Thread.currentThread(),
+                                    t -> new 
RestorePartitionStateThreadContext()
+                                );
 
-                            topPartRef.updateAndGet(top0 -> {
-                                if (top0 == null)
-                                    return top;
+                                Comparator<T3<Long, Long, GroupPartitionId>> 
cmp = processedPartitionComparator();
 
-                                for (T3<Long, Long, GroupPartitionId> t2 : 
top0) {
-                                    top.add(t2);
+                                threadCtx.topPartRef.updateAndGet(top0 -> {

Review comment:
       Please rename `top0` -> `prev`.
   

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
##########
@@ -5519,66 +5519,76 @@ private void restorePartitionStates(
             if (log.isInfoEnabled())
                 log.info("Restoring partition state for local groups.");
 
-            AtomicLong totalProcessed = new AtomicLong();
-
             AtomicReference<IgniteCheckedException> restoreStateError = new 
AtomicReference<>();
 
             ExecutorService sysPool = ctx.pools().getSystemExecutorService();
 
-            CountDownLatch completionLatch = new 
CountDownLatch(forGroups.size());
+            final int totalPart = forGroups.stream().mapToInt(grpCtx -> 
grpCtx.affinity().partitions()).sum();
 
-            AtomicReference<SortedSet<T3<Long, Long, GroupPartitionId>>> 
topPartRef = new AtomicReference<>();
+            CountDownLatch completionLatch = new CountDownLatch(totalPart);
 
-            long totalPart = forGroups.stream().mapToLong(grpCtx -> 
grpCtx.affinity().partitions()).sum();
+            Map<Thread, RestorePartitionStateThreadContext> threadCtxs = new 
ConcurrentHashMap<>();
 
-            for (CacheGroupContext grp : forGroups) {
-                sysPool.execute(() -> {
-                    try {
-                        Map<Integer, Long> processed = 
grp.offheap().restorePartitionStates(partStates);
+            final int topPartRefLimit = 5;
 
-                        totalProcessed.addAndGet(processed.size());
+            for (CacheGroupContext grpCtx : forGroups) {
+                for (int i = 0; i < grpCtx.affinity().partitions(); i++) {
+                    final int partId = i;
 
-                        if (log.isInfoEnabled()) {
-                            TreeSet<T3<Long, Long, GroupPartitionId>> top =
-                                new TreeSet<>(processedPartitionComparator());
+                    sysPool.execute(() -> {
+                        GroupPartitionId grpPartId = new 
GroupPartitionId(grpCtx.groupId(), partId);
 
-                            long ts = System.currentTimeMillis();
+                        try {
+                            long time = 
grpCtx.offheap().restoreStateOfPartition(grpPartId.getPartitionId(),
+                                partStates.get(grpPartId));
 
-                            for (Map.Entry<Integer, Long> e : 
processed.entrySet()) {
-                                top.add(new T3<>(e.getValue(), ts, new 
GroupPartitionId(grp.groupId(), e.getKey())));
+                            if (log.isInfoEnabled()) {
+                                T3<Long, Long, GroupPartitionId> curPart = new 
T3<>(time, U.currentTimeMillis(), grpPartId);
 
-                                trimToSize(top, 5);
-                            }
+                                RestorePartitionStateThreadContext threadCtx = 
threadCtxs.computeIfAbsent(
+                                    Thread.currentThread(),
+                                    t -> new 
RestorePartitionStateThreadContext()
+                                );
 
-                            topPartRef.updateAndGet(top0 -> {
-                                if (top0 == null)
-                                    return top;
+                                Comparator<T3<Long, Long, GroupPartitionId>> 
cmp = processedPartitionComparator();
 
-                                for (T3<Long, Long, GroupPartitionId> t2 : 
top0) {
-                                    top.add(t2);
+                                threadCtx.topPartRef.updateAndGet(top0 -> {
+                                    if (threadCtx.topPartRef.get() == null ||

Review comment:
       Use `prev == null`

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
##########
@@ -89,16 +88,28 @@
      */
     public void stop();
 
+    /**
+     * Pre-create single partition that resides in page memory or WAL and 
restores their state.
+     *
+     * @param p Partition id.
+     * @param recoveryState Partition recovery state.
+     * @return Processing time in millis.
+     * @throws IgniteCheckedException If failed.
+     */
+    long restoreStateOfPartition(int p, @Nullable Integer recoveryState) 
throws IgniteCheckedException;
+
     /**
      * Pre-create partitions that resides in page memory or WAL and restores 
their state.
      *
-     * @param partRecoveryStates Partition recovery states.
-     * @return Processed partitions: partition id -> processing time in millis.
      * @throws IgniteCheckedException If failed.
      */
-    Map<Integer, Long> restorePartitionStates(
-        Map<GroupPartitionId, Integer> partRecoveryStates
-    ) throws IgniteCheckedException;
+    void restorePartitionStates() throws IgniteCheckedException;
+
+    /**
+     * Confirm that partition states are restored. This method should be 
called after restoring state of all partitions
+     * in group using {@link #restoreStateOfPartition(int, Integer)}.
+     */
+    void confirmPartitionStatesRestored();

Review comment:
       It's look like callback: `onRestorePartitionStates`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to