tkalkirill commented on a change in pull request #9243:
URL: https://github.com/apache/ignite/pull/9243#discussion_r686525840
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
##########
@@ -5524,58 +5527,91 @@ private void restorePartitionStates(
ExecutorService sysPool = ctx.pools().getSystemExecutorService();
- CountDownLatch completionLatch = new
CountDownLatch(forGroups.size());
-
AtomicReference<SortedSet<T3<Long, Long, GroupPartitionId>>>
topPartRef = new AtomicReference<>();
long totalPart = forGroups.stream().mapToLong(grpCtx ->
grpCtx.affinity().partitions()).sum();
- for (CacheGroupContext grp : forGroups) {
- sysPool.execute(() -> {
- try {
- Map<Integer, Long> processed =
grp.offheap().restorePartitionStates(partStates);
+ int poolSize = ctx.config().getSystemThreadPoolSize();
- totalProcessed.addAndGet(processed.size());
+ List<Queue<GroupPartitionId>> partIds = new ArrayList<>(poolSize);
- if (log.isInfoEnabled()) {
- TreeSet<T3<Long, Long, GroupPartitionId>> top =
- new TreeSet<>(processedPartitionComparator());
+ for (int i = 0; i < poolSize; i++)
+ partIds.add(new LinkedList<>());
- long ts = System.currentTimeMillis();
+ int cntr = 0;
- for (Map.Entry<Integer, Long> e :
processed.entrySet()) {
- top.add(new T3<>(e.getValue(), ts, new
GroupPartitionId(grp.groupId(), e.getKey())));
+ // Group id -> completed partitions counter
+ Map<Integer, AtomicInteger> grps = new HashMap<>();
- trimToSize(top, 5);
- }
+ for (CacheGroupContext ctx : forGroups) {
+ grps.put(ctx.groupId(), new AtomicInteger());
- topPartRef.updateAndGet(top0 -> {
- if (top0 == null)
- return top;
+ for (int i = 0; i < ctx.affinity().partitions(); i++)
+ partIds.get(cntr++ % poolSize).add(new
GroupPartitionId(ctx.groupId(), i));
+ }
+
+ CountDownLatch completionLatch = new CountDownLatch(cntr);
+
+ final int topPartRefLimit = 5;
+
+ for (int i = 0; i < poolSize; i++) {
+ final int batchIdx = i;
+
+ sysPool.execute(() -> {
+ Queue<GroupPartitionId> batch = partIds.get(batchIdx);
+
+ SortedSet<T3<Long, Long, GroupPartitionId>> top =
+ new
ConcurrentSkipListSet<>(processedPartitionComparator());
Review comment:
Concurrent collection for a single thread is not needed.
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
##########
@@ -5524,58 +5527,91 @@ private void restorePartitionStates(
ExecutorService sysPool = ctx.pools().getSystemExecutorService();
- CountDownLatch completionLatch = new
CountDownLatch(forGroups.size());
-
AtomicReference<SortedSet<T3<Long, Long, GroupPartitionId>>>
topPartRef = new AtomicReference<>();
long totalPart = forGroups.stream().mapToLong(grpCtx ->
grpCtx.affinity().partitions()).sum();
- for (CacheGroupContext grp : forGroups) {
- sysPool.execute(() -> {
- try {
- Map<Integer, Long> processed =
grp.offheap().restorePartitionStates(partStates);
+ int poolSize = ctx.config().getSystemThreadPoolSize();
- totalProcessed.addAndGet(processed.size());
+ List<Queue<GroupPartitionId>> partIds = new ArrayList<>(poolSize);
- if (log.isInfoEnabled()) {
- TreeSet<T3<Long, Long, GroupPartitionId>> top =
- new TreeSet<>(processedPartitionComparator());
+ for (int i = 0; i < poolSize; i++)
+ partIds.add(new LinkedList<>());
- long ts = System.currentTimeMillis();
+ int cntr = 0;
- for (Map.Entry<Integer, Long> e :
processed.entrySet()) {
- top.add(new T3<>(e.getValue(), ts, new
GroupPartitionId(grp.groupId(), e.getKey())));
+ // Group id -> completed partitions counter
Review comment:
Forgot the '.' at the end.
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
##########
@@ -5524,27 +5526,85 @@ private void restorePartitionStates(
ExecutorService sysPool = ctx.pools().getSystemExecutorService();
- CountDownLatch completionLatch = new
CountDownLatch(forGroups.size());
-
AtomicReference<SortedSet<T3<Long, Long, GroupPartitionId>>>
topPartRef = new AtomicReference<>();
long totalPart = forGroups.stream().mapToLong(grpCtx ->
grpCtx.affinity().partitions()).sum();
- for (CacheGroupContext grp : forGroups) {
- sysPool.execute(() -> {
+ int poolSize = ctx.config().getSystemThreadPoolSize();
+
+ List<List<GroupPartitionId>> partIds = new ArrayList<>(poolSize);
+
+ for (int i = 0; i < poolSize; i++)
+ partIds.add(new LinkedList<>());
Review comment:
Better use the `java.util.ArrayDeque`.
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
##########
@@ -615,131 +615,140 @@ else if (needSnapshot)
}
/** {@inheritDoc} */
- @Override public Map<Integer, Long> restorePartitionStates(
- Map<GroupPartitionId, Integer> partRecoveryStates
- ) throws IgniteCheckedException {
+ @Override public long restoreStateOfPartition(int p, @Nullable Integer
recoveryState) throws IgniteCheckedException {
if (grp.isLocal() || !grp.affinityNode() ||
!grp.dataRegion().config().isPersistenceEnabled()
|| partitionStatesRestored)
- return Collections.emptyMap();
-
- Map<Integer, Long> processed = new HashMap<>();
+ return 0;
PageMemoryEx pageMem = (PageMemoryEx)grp.dataRegion().pageMemory();
- for (int p = 0; p < grp.affinity().partitions(); p++) {
- Integer recoverState = partRecoveryStates.get(new
GroupPartitionId(grp.groupId(), p));
-
- long startTime = U.currentTimeMillis();
+ long startTime = U.currentTimeMillis();
- if (log.isDebugEnabled())
- log.debug("Started restoring partition state [grp=" +
grp.cacheOrGroupName() + ", p=" + p + ']');
-
- if (ctx.pageStore().exists(grp.groupId(), p)) {
- ctx.pageStore().ensure(grp.groupId(), p);
+ long res = 0;
- if (ctx.pageStore().pages(grp.groupId(), p) <= 1) {
- if (log.isDebugEnabled()) {
- log.debug("Skipping partition on recovery (pages less
than 1) " +
- "[grp=" + grp.cacheOrGroupName() + ", p=" + p +
']');
- }
+ if (log.isDebugEnabled())
+ log.debug("Started restoring partition state [grp=" +
grp.cacheOrGroupName() + ", p=" + p + ']');
- continue;
- }
+ if (ctx.pageStore().exists(grp.groupId(), p)) {
+ ctx.pageStore().ensure(grp.groupId(), p);
+ if (ctx.pageStore().pages(grp.groupId(), p) <= 1) {
if (log.isDebugEnabled()) {
- log.debug("Creating partition on recovery (exists in page
store) " +
+ log.debug("Skipping partition on recovery (pages less than
1) " +
Review comment:
less or equals
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
##########
@@ -5524,58 +5527,91 @@ private void restorePartitionStates(
ExecutorService sysPool = ctx.pools().getSystemExecutorService();
- CountDownLatch completionLatch = new
CountDownLatch(forGroups.size());
-
AtomicReference<SortedSet<T3<Long, Long, GroupPartitionId>>>
topPartRef = new AtomicReference<>();
long totalPart = forGroups.stream().mapToLong(grpCtx ->
grpCtx.affinity().partitions()).sum();
- for (CacheGroupContext grp : forGroups) {
- sysPool.execute(() -> {
- try {
- Map<Integer, Long> processed =
grp.offheap().restorePartitionStates(partStates);
+ int poolSize = ctx.config().getSystemThreadPoolSize();
- totalProcessed.addAndGet(processed.size());
+ List<Queue<GroupPartitionId>> partIds = new ArrayList<>(poolSize);
- if (log.isInfoEnabled()) {
- TreeSet<T3<Long, Long, GroupPartitionId>> top =
- new TreeSet<>(processedPartitionComparator());
+ for (int i = 0; i < poolSize; i++)
+ partIds.add(new LinkedList<>());
- long ts = System.currentTimeMillis();
+ int cntr = 0;
- for (Map.Entry<Integer, Long> e :
processed.entrySet()) {
- top.add(new T3<>(e.getValue(), ts, new
GroupPartitionId(grp.groupId(), e.getKey())));
+ // Group id -> completed partitions counter
+ Map<Integer, AtomicInteger> grps = new HashMap<>();
- trimToSize(top, 5);
- }
+ for (CacheGroupContext ctx : forGroups) {
+ grps.put(ctx.groupId(), new AtomicInteger());
- topPartRef.updateAndGet(top0 -> {
- if (top0 == null)
- return top;
+ for (int i = 0; i < ctx.affinity().partitions(); i++)
+ partIds.get(cntr++ % poolSize).add(new
GroupPartitionId(ctx.groupId(), i));
+ }
+
+ CountDownLatch completionLatch = new CountDownLatch(cntr);
+
+ final int topPartRefLimit = 5;
+
+ for (int i = 0; i < poolSize; i++) {
+ final int batchIdx = i;
+
+ sysPool.execute(() -> {
+ Queue<GroupPartitionId> batch = partIds.get(batchIdx);
+
+ SortedSet<T3<Long, Long, GroupPartitionId>> top =
+ new
ConcurrentSkipListSet<>(processedPartitionComparator());
+
+ while (!batch.isEmpty()) {
+ GroupPartitionId grpPartId = batch.poll();
+
+ CacheGroupContext grpCtx =
ctx.cache().cacheGroup(grpPartId.getGroupId());
+
+ try {
+ long time =
grpCtx.offheap().restoreStateOfPartition(grpPartId.getPartitionId(),
+ partStates.get(grpPartId));
- for (T3<Long, Long, GroupPartitionId> t2 :
top0) {
- top.add(t2);
+ top.add(new T3<>(time, U.currentTimeMillis(),
grpPartId));
Review comment:
Only if `log.isInfoEnabled()`
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
##########
@@ -5524,58 +5527,91 @@ private void restorePartitionStates(
ExecutorService sysPool = ctx.pools().getSystemExecutorService();
- CountDownLatch completionLatch = new
CountDownLatch(forGroups.size());
-
AtomicReference<SortedSet<T3<Long, Long, GroupPartitionId>>>
topPartRef = new AtomicReference<>();
long totalPart = forGroups.stream().mapToLong(grpCtx ->
grpCtx.affinity().partitions()).sum();
- for (CacheGroupContext grp : forGroups) {
- sysPool.execute(() -> {
- try {
- Map<Integer, Long> processed =
grp.offheap().restorePartitionStates(partStates);
+ int poolSize = ctx.config().getSystemThreadPoolSize();
- totalProcessed.addAndGet(processed.size());
+ List<Queue<GroupPartitionId>> partIds = new ArrayList<>(poolSize);
- if (log.isInfoEnabled()) {
- TreeSet<T3<Long, Long, GroupPartitionId>> top =
- new TreeSet<>(processedPartitionComparator());
+ for (int i = 0; i < poolSize; i++)
+ partIds.add(new LinkedList<>());
- long ts = System.currentTimeMillis();
+ int cntr = 0;
- for (Map.Entry<Integer, Long> e :
processed.entrySet()) {
- top.add(new T3<>(e.getValue(), ts, new
GroupPartitionId(grp.groupId(), e.getKey())));
+ // Group id -> completed partitions counter
+ Map<Integer, AtomicInteger> grps = new HashMap<>();
- trimToSize(top, 5);
- }
+ for (CacheGroupContext ctx : forGroups) {
+ grps.put(ctx.groupId(), new AtomicInteger());
- topPartRef.updateAndGet(top0 -> {
- if (top0 == null)
- return top;
+ for (int i = 0; i < ctx.affinity().partitions(); i++)
+ partIds.get(cntr++ % poolSize).add(new
GroupPartitionId(ctx.groupId(), i));
+ }
+
+ CountDownLatch completionLatch = new CountDownLatch(cntr);
+
+ final int topPartRefLimit = 5;
+
+ for (int i = 0; i < poolSize; i++) {
+ final int batchIdx = i;
+
+ sysPool.execute(() -> {
+ Queue<GroupPartitionId> batch = partIds.get(batchIdx);
+
+ SortedSet<T3<Long, Long, GroupPartitionId>> top =
+ new
ConcurrentSkipListSet<>(processedPartitionComparator());
+
+ while (!batch.isEmpty()) {
Review comment:
You can replace predicate to `(grpPartId = batch.poll()) != null`
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
##########
@@ -5524,58 +5527,91 @@ private void restorePartitionStates(
ExecutorService sysPool = ctx.pools().getSystemExecutorService();
- CountDownLatch completionLatch = new
CountDownLatch(forGroups.size());
-
AtomicReference<SortedSet<T3<Long, Long, GroupPartitionId>>>
topPartRef = new AtomicReference<>();
long totalPart = forGroups.stream().mapToLong(grpCtx ->
grpCtx.affinity().partitions()).sum();
- for (CacheGroupContext grp : forGroups) {
- sysPool.execute(() -> {
- try {
- Map<Integer, Long> processed =
grp.offheap().restorePartitionStates(partStates);
+ int poolSize = ctx.config().getSystemThreadPoolSize();
- totalProcessed.addAndGet(processed.size());
+ List<Queue<GroupPartitionId>> partIds = new ArrayList<>(poolSize);
- if (log.isInfoEnabled()) {
- TreeSet<T3<Long, Long, GroupPartitionId>> top =
- new TreeSet<>(processedPartitionComparator());
+ for (int i = 0; i < poolSize; i++)
+ partIds.add(new LinkedList<>());
- long ts = System.currentTimeMillis();
+ int cntr = 0;
- for (Map.Entry<Integer, Long> e :
processed.entrySet()) {
- top.add(new T3<>(e.getValue(), ts, new
GroupPartitionId(grp.groupId(), e.getKey())));
+ // Group id -> completed partitions counter
+ Map<Integer, AtomicInteger> grps = new HashMap<>();
Review comment:
`totalProcessed` can be replaced with the sum of all counters in this
map.
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
##########
@@ -5524,58 +5527,91 @@ private void restorePartitionStates(
ExecutorService sysPool = ctx.pools().getSystemExecutorService();
- CountDownLatch completionLatch = new
CountDownLatch(forGroups.size());
-
AtomicReference<SortedSet<T3<Long, Long, GroupPartitionId>>>
topPartRef = new AtomicReference<>();
long totalPart = forGroups.stream().mapToLong(grpCtx ->
grpCtx.affinity().partitions()).sum();
- for (CacheGroupContext grp : forGroups) {
- sysPool.execute(() -> {
- try {
- Map<Integer, Long> processed =
grp.offheap().restorePartitionStates(partStates);
+ int poolSize = ctx.config().getSystemThreadPoolSize();
- totalProcessed.addAndGet(processed.size());
+ List<Queue<GroupPartitionId>> partIds = new ArrayList<>(poolSize);
- if (log.isInfoEnabled()) {
- TreeSet<T3<Long, Long, GroupPartitionId>> top =
- new TreeSet<>(processedPartitionComparator());
+ for (int i = 0; i < poolSize; i++)
+ partIds.add(new LinkedList<>());
- long ts = System.currentTimeMillis();
+ int cntr = 0;
- for (Map.Entry<Integer, Long> e :
processed.entrySet()) {
- top.add(new T3<>(e.getValue(), ts, new
GroupPartitionId(grp.groupId(), e.getKey())));
+ // Group id -> completed partitions counter
+ Map<Integer, AtomicInteger> grps = new HashMap<>();
- trimToSize(top, 5);
- }
+ for (CacheGroupContext ctx : forGroups) {
+ grps.put(ctx.groupId(), new AtomicInteger());
- topPartRef.updateAndGet(top0 -> {
- if (top0 == null)
- return top;
+ for (int i = 0; i < ctx.affinity().partitions(); i++)
+ partIds.get(cntr++ % poolSize).add(new
GroupPartitionId(ctx.groupId(), i));
+ }
+
+ CountDownLatch completionLatch = new CountDownLatch(cntr);
+
+ final int topPartRefLimit = 5;
+
+ for (int i = 0; i < poolSize; i++) {
+ final int batchIdx = i;
+
+ sysPool.execute(() -> {
+ Queue<GroupPartitionId> batch = partIds.get(batchIdx);
+
+ SortedSet<T3<Long, Long, GroupPartitionId>> top =
+ new
ConcurrentSkipListSet<>(processedPartitionComparator());
+
+ while (!batch.isEmpty()) {
+ GroupPartitionId grpPartId = batch.poll();
+
+ CacheGroupContext grpCtx =
ctx.cache().cacheGroup(grpPartId.getGroupId());
+
+ try {
+ long time =
grpCtx.offheap().restoreStateOfPartition(grpPartId.getPartitionId(),
+ partStates.get(grpPartId));
- for (T3<Long, Long, GroupPartitionId> t2 :
top0) {
- top.add(t2);
+ top.add(new T3<>(time, U.currentTimeMillis(),
grpPartId));
- trimToSize(top, 5);
- }
+ trimToSize(top, topPartRefLimit);
Review comment:
Only if `log.isInfoEnabled()`
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
##########
@@ -5524,58 +5527,91 @@ private void restorePartitionStates(
ExecutorService sysPool = ctx.pools().getSystemExecutorService();
- CountDownLatch completionLatch = new
CountDownLatch(forGroups.size());
-
AtomicReference<SortedSet<T3<Long, Long, GroupPartitionId>>>
topPartRef = new AtomicReference<>();
long totalPart = forGroups.stream().mapToLong(grpCtx ->
grpCtx.affinity().partitions()).sum();
- for (CacheGroupContext grp : forGroups) {
- sysPool.execute(() -> {
- try {
- Map<Integer, Long> processed =
grp.offheap().restorePartitionStates(partStates);
+ int poolSize = ctx.config().getSystemThreadPoolSize();
- totalProcessed.addAndGet(processed.size());
+ List<Queue<GroupPartitionId>> partIds = new ArrayList<>(poolSize);
- if (log.isInfoEnabled()) {
- TreeSet<T3<Long, Long, GroupPartitionId>> top =
- new TreeSet<>(processedPartitionComparator());
+ for (int i = 0; i < poolSize; i++)
+ partIds.add(new LinkedList<>());
- long ts = System.currentTimeMillis();
+ int cntr = 0;
- for (Map.Entry<Integer, Long> e :
processed.entrySet()) {
- top.add(new T3<>(e.getValue(), ts, new
GroupPartitionId(grp.groupId(), e.getKey())));
+ // Group id -> completed partitions counter
+ Map<Integer, AtomicInteger> grps = new HashMap<>();
- trimToSize(top, 5);
- }
+ for (CacheGroupContext ctx : forGroups) {
+ grps.put(ctx.groupId(), new AtomicInteger());
- topPartRef.updateAndGet(top0 -> {
- if (top0 == null)
- return top;
+ for (int i = 0; i < ctx.affinity().partitions(); i++)
+ partIds.get(cntr++ % poolSize).add(new
GroupPartitionId(ctx.groupId(), i));
+ }
+
+ CountDownLatch completionLatch = new CountDownLatch(cntr);
+
+ final int topPartRefLimit = 5;
+
+ for (int i = 0; i < poolSize; i++) {
+ final int batchIdx = i;
+
+ sysPool.execute(() -> {
+ Queue<GroupPartitionId> batch = partIds.get(batchIdx);
+
+ SortedSet<T3<Long, Long, GroupPartitionId>> top =
+ new
ConcurrentSkipListSet<>(processedPartitionComparator());
+
+ while (!batch.isEmpty()) {
+ GroupPartitionId grpPartId = batch.poll();
+
+ CacheGroupContext grpCtx =
ctx.cache().cacheGroup(grpPartId.getGroupId());
+
+ try {
+ long time =
grpCtx.offheap().restoreStateOfPartition(grpPartId.getPartitionId(),
+ partStates.get(grpPartId));
- for (T3<Long, Long, GroupPartitionId> t2 :
top0) {
- top.add(t2);
+ top.add(new T3<>(time, U.currentTimeMillis(),
grpPartId));
- trimToSize(top, 5);
- }
+ trimToSize(top, topPartRefLimit);
- return top;
- });
+ totalProcessed.incrementAndGet();
}
- }
- catch (IgniteCheckedException | RuntimeException | Error
e) {
- U.error(log, "Failed to restore partition state for " +
- "groupName=" + grp.name() + " groupId=" +
grp.groupId(), e);
+ catch (IgniteCheckedException | RuntimeException |
Error e) {
+ U.error(log, "Failed to restore partition state
for " +
+ "groupName=" + grpCtx.name() + " groupId=" +
grpCtx.groupId(), e);
- restoreStateError.compareAndSet(
- null,
- e instanceof IgniteCheckedException
+ IgniteCheckedException ex = e instanceof
IgniteCheckedException
? ((IgniteCheckedException)e)
- : new IgniteCheckedException(e)
- );
- }
- finally {
- completionLatch.countDown();
+ : new IgniteCheckedException(e);
+
+ if (!restoreStateError.compareAndSet(null, ex))
+ restoreStateError.get().addSuppressed(ex);
+ }
+ finally {
+ completionLatch.countDown();
+
+ AtomicInteger completedCntr =
grps.get(grpPartId.getGroupId());
+
+ if (completedCntr.incrementAndGet() ==
grpCtx.affinity().partitions())
+
grpCtx.offheap().confirmPartitionStatesRestored();
+
+ if (log.isInfoEnabled()) {
+ topPartRef.updateAndGet(top0 -> {
Review comment:
This code is executed for each partition, then the local collection
(`SortedSet<T3<Long, Long, GroupPartitionId>> top`) is not needed, I think this
will be a place of high contention, perhaps you need to think about how to
reduce / remove it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]