tkalkirill commented on a change in pull request #9327:
URL: https://github.com/apache/ignite/pull/9327#discussion_r705309062
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
##########
@@ -5519,66 +5520,89 @@ private void restorePartitionStates(
if (log.isInfoEnabled())
log.info("Restoring partition state for local groups.");
- AtomicLong totalProcessed = new AtomicLong();
-
AtomicReference<IgniteCheckedException> restoreStateError = new
AtomicReference<>();
ExecutorService sysPool = ctx.pools().getSystemExecutorService();
- CountDownLatch completionLatch = new
CountDownLatch(forGroups.size());
+ final int totalPart = forGroups.stream().mapToInt(grpCtx ->
grpCtx.affinity().partitions()).sum();
- AtomicReference<SortedSet<T3<Long, Long, GroupPartitionId>>>
topPartRef = new AtomicReference<>();
+ // Group id -> completed partitions counter.
+ Map<Integer, AtomicInteger> grps = new ConcurrentHashMap<>();
Review comment:
I think we can get rid of it.
We can call the `IgniteCacheOffheapManager#confirmPartitionStatesRestored`
after restoring all partitions for all groups.
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
##########
@@ -5614,17 +5642,44 @@ private void restorePartitionStates(
throw restoreStateError.get();
if (log.isInfoEnabled()) {
- SortedSet<T3<Long, Long, GroupPartitionId>> t = printTop ?
topPartRef.get() : null;
+ SortedSet<T3<Long, Long, GroupPartitionId>> t =
+ printTop ? collectTopProcessedParts(threadCtxs,
topPartRefLimit) : null;
+
+ long totalProcessed = threadCtxs.stream().mapToLong(c ->
c.processedCnt).sum();
log.info("Finished restoring partition state for local groups
[" +
"groupsProcessed=" + forGroups.size() +
- ", partitionsProcessed=" + totalProcessed.get() +
+ ", partitionsProcessed=" + totalProcessed +
", time=" + U.humanReadableDuration(U.currentTimeMillis()
- startRestorePart) +
(t == null ? "" : ", topProcessedPartitions=" +
toStringTopProcessingPartitions(t, forGroups)) +
"]");
}
}
+ /**
+ * Collects top processed partitions from thread local contexts of
restore partition state process.
+ *
+ * @param threadCtxs Thread local contexts.
+ * @param topPartRefLimit Limit of top partitions collection size.
+ */
+ private SortedSet<T3<Long, Long, GroupPartitionId>>
collectTopProcessedParts(
Review comment:
It may not work correctly in a multi-thread.
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
##########
@@ -5519,66 +5520,89 @@ private void restorePartitionStates(
if (log.isInfoEnabled())
log.info("Restoring partition state for local groups.");
- AtomicLong totalProcessed = new AtomicLong();
-
AtomicReference<IgniteCheckedException> restoreStateError = new
AtomicReference<>();
ExecutorService sysPool = ctx.pools().getSystemExecutorService();
- CountDownLatch completionLatch = new
CountDownLatch(forGroups.size());
+ final int totalPart = forGroups.stream().mapToInt(grpCtx ->
grpCtx.affinity().partitions()).sum();
- AtomicReference<SortedSet<T3<Long, Long, GroupPartitionId>>>
topPartRef = new AtomicReference<>();
+ // Group id -> completed partitions counter.
+ Map<Integer, AtomicInteger> grps = new ConcurrentHashMap<>();
- long totalPart = forGroups.stream().mapToLong(grpCtx ->
grpCtx.affinity().partitions()).sum();
+ CountDownLatch completionLatch = new CountDownLatch(totalPart);
- for (CacheGroupContext grp : forGroups) {
- sysPool.execute(() -> {
- try {
- Map<Integer, Long> processed =
grp.offheap().restorePartitionStates(partStates);
+ Collection<RestorePartitionStateThreadContext> threadCtxs = new
CopyOnWriteArrayList<>();
- totalProcessed.addAndGet(processed.size());
+ ThreadLocal<RestorePartitionStateThreadContext> threadLocalCtx =
Review comment:
I think we should get rid of her.
Let's use the striped map.
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
##########
@@ -5939,13 +5994,25 @@ static String toStringTopProcessingPartitions(
/**
* Comparator of processed partitions.
* T3 -> 1 - duration, 2 - timestamp, 3 - partition of group.
- * Sort order: duration -> timestamp -> partition of group.
+ * Sort order: duration -> timestamp (reversed order) -> partition of
group.
*
* @return Comparator.
*/
static Comparator<T3<Long, Long, GroupPartitionId>>
processedPartitionComparator() {
Comparator<T3<Long, Long, GroupPartitionId>> comp =
Comparator.comparing(T3::get1);
- return comp.thenComparing(T3::get2).thenComparing(T3::get3);
+ return comp.thenComparing(T3::get2,
Comparator.reverseOrder()).thenComparing(T3::get3);
+ }
+
+ /**
+ * Thread local context of restore partition state progress.
+ */
+ private static class RestorePartitionStateThreadContext {
+ /** Top partitions by processing time. */
+ final AtomicReference<SortedSet<T3<Long, Long, GroupPartitionId>>>
topPartRef =
+ new AtomicReference<>();
+
+ /** Processed partitions count. It is always updated from the same
thread. */
+ volatile long processedCnt = 0;
Review comment:
Plz use `java.util.concurrent.atomic.AtomicLongFieldUpdater`
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
##########
@@ -5519,66 +5520,89 @@ private void restorePartitionStates(
if (log.isInfoEnabled())
log.info("Restoring partition state for local groups.");
- AtomicLong totalProcessed = new AtomicLong();
-
AtomicReference<IgniteCheckedException> restoreStateError = new
AtomicReference<>();
ExecutorService sysPool = ctx.pools().getSystemExecutorService();
- CountDownLatch completionLatch = new
CountDownLatch(forGroups.size());
+ final int totalPart = forGroups.stream().mapToInt(grpCtx ->
grpCtx.affinity().partitions()).sum();
- AtomicReference<SortedSet<T3<Long, Long, GroupPartitionId>>>
topPartRef = new AtomicReference<>();
+ // Group id -> completed partitions counter.
+ Map<Integer, AtomicInteger> grps = new ConcurrentHashMap<>();
- long totalPart = forGroups.stream().mapToLong(grpCtx ->
grpCtx.affinity().partitions()).sum();
+ CountDownLatch completionLatch = new CountDownLatch(totalPart);
- for (CacheGroupContext grp : forGroups) {
- sysPool.execute(() -> {
- try {
- Map<Integer, Long> processed =
grp.offheap().restorePartitionStates(partStates);
+ Collection<RestorePartitionStateThreadContext> threadCtxs = new
CopyOnWriteArrayList<>();
- totalProcessed.addAndGet(processed.size());
+ ThreadLocal<RestorePartitionStateThreadContext> threadLocalCtx =
+ ThreadLocal.withInitial(() -> {
+ RestorePartitionStateThreadContext threadCtx = new
RestorePartitionStateThreadContext();
- if (log.isInfoEnabled()) {
- TreeSet<T3<Long, Long, GroupPartitionId>> top =
- new TreeSet<>(processedPartitionComparator());
+ threadCtxs.add(threadCtx);
- long ts = System.currentTimeMillis();
+ return threadCtx;
+ });
- for (Map.Entry<Integer, Long> e :
processed.entrySet()) {
- top.add(new T3<>(e.getValue(), ts, new
GroupPartitionId(grp.groupId(), e.getKey())));
+ final int topPartRefLimit = 5;
- trimToSize(top, 5);
- }
+ for (CacheGroupContext grpCtx : forGroups) {
+ for (int i = 0; i < grpCtx.affinity().partitions(); i++) {
+ final int partId = i;
+
+ sysPool.execute(() -> {
+ GroupPartitionId grpPartId = new
GroupPartitionId(grpCtx.groupId(), partId);
+
+ try {
+ long time =
grpCtx.offheap().restoreStateOfPartition(grpPartId.getPartitionId(),
+ partStates.get(grpPartId));
+
+ if (log.isInfoEnabled()) {
+ T3<Long, Long, GroupPartitionId> curPart = new
T3<>(time, U.currentTimeMillis(), grpPartId);
+
+ RestorePartitionStateThreadContext threadCtx =
threadLocalCtx.get();
+
+ Comparator<T3<Long, Long, GroupPartitionId>>
cmp = processedPartitionComparator();
+
+ if (threadCtx.topPartRef.get() == null ||
Review comment:
It may not work correctly in a multi-thread.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]