tkalkirill commented on a change in pull request #9327:
URL: https://github.com/apache/ignite/pull/9327#discussion_r710129931
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
##########
@@ -89,16 +88,28 @@
*/
public void stop();
+ /**
+ * Pre-create single partition that resides in page memory or WAL and
restores their state.
+ *
+ * @param p Partition id.
+ * @param recoveryState Partition recovery state.
+ * @return Processing time in millis.
+ * @throws IgniteCheckedException If failed.
+ */
+ long restoreStateOfPartition(int p, @Nullable Integer recoveryState)
throws IgniteCheckedException;
Review comment:
Maybe a better name: `restorePartitionState`
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
##########
@@ -5519,66 +5519,76 @@ private void restorePartitionStates(
if (log.isInfoEnabled())
log.info("Restoring partition state for local groups.");
- AtomicLong totalProcessed = new AtomicLong();
-
AtomicReference<IgniteCheckedException> restoreStateError = new
AtomicReference<>();
ExecutorService sysPool = ctx.pools().getSystemExecutorService();
- CountDownLatch completionLatch = new
CountDownLatch(forGroups.size());
+ final int totalPart = forGroups.stream().mapToInt(grpCtx ->
grpCtx.affinity().partitions()).sum();
- AtomicReference<SortedSet<T3<Long, Long, GroupPartitionId>>>
topPartRef = new AtomicReference<>();
+ CountDownLatch completionLatch = new CountDownLatch(totalPart);
- long totalPart = forGroups.stream().mapToLong(grpCtx ->
grpCtx.affinity().partitions()).sum();
+ Map<Thread, RestorePartitionStateThreadContext> threadCtxs = new
ConcurrentHashMap<>();
- for (CacheGroupContext grp : forGroups) {
- sysPool.execute(() -> {
- try {
- Map<Integer, Long> processed =
grp.offheap().restorePartitionStates(partStates);
+ final int topPartRefLimit = 5;
- totalProcessed.addAndGet(processed.size());
+ for (CacheGroupContext grpCtx : forGroups) {
+ for (int i = 0; i < grpCtx.affinity().partitions(); i++) {
+ final int partId = i;
- if (log.isInfoEnabled()) {
- TreeSet<T3<Long, Long, GroupPartitionId>> top =
- new TreeSet<>(processedPartitionComparator());
+ sysPool.execute(() -> {
+ GroupPartitionId grpPartId = new
GroupPartitionId(grpCtx.groupId(), partId);
- long ts = System.currentTimeMillis();
+ try {
+ long time =
grpCtx.offheap().restoreStateOfPartition(grpPartId.getPartitionId(),
Review comment:
You can use `partId` instead `grpPartId.getPartitionId()`
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
##########
@@ -5519,66 +5519,76 @@ private void restorePartitionStates(
if (log.isInfoEnabled())
log.info("Restoring partition state for local groups.");
- AtomicLong totalProcessed = new AtomicLong();
-
AtomicReference<IgniteCheckedException> restoreStateError = new
AtomicReference<>();
ExecutorService sysPool = ctx.pools().getSystemExecutorService();
- CountDownLatch completionLatch = new
CountDownLatch(forGroups.size());
+ final int totalPart = forGroups.stream().mapToInt(grpCtx ->
grpCtx.affinity().partitions()).sum();
- AtomicReference<SortedSet<T3<Long, Long, GroupPartitionId>>>
topPartRef = new AtomicReference<>();
+ CountDownLatch completionLatch = new CountDownLatch(totalPart);
- long totalPart = forGroups.stream().mapToLong(grpCtx ->
grpCtx.affinity().partitions()).sum();
+ Map<Thread, RestorePartitionStateThreadContext> threadCtxs = new
ConcurrentHashMap<>();
- for (CacheGroupContext grp : forGroups) {
- sysPool.execute(() -> {
- try {
- Map<Integer, Long> processed =
grp.offheap().restorePartitionStates(partStates);
+ final int topPartRefLimit = 5;
- totalProcessed.addAndGet(processed.size());
+ for (CacheGroupContext grpCtx : forGroups) {
+ for (int i = 0; i < grpCtx.affinity().partitions(); i++) {
+ final int partId = i;
- if (log.isInfoEnabled()) {
- TreeSet<T3<Long, Long, GroupPartitionId>> top =
- new TreeSet<>(processedPartitionComparator());
+ sysPool.execute(() -> {
+ GroupPartitionId grpPartId = new
GroupPartitionId(grpCtx.groupId(), partId);
- long ts = System.currentTimeMillis();
+ try {
+ long time =
grpCtx.offheap().restoreStateOfPartition(grpPartId.getPartitionId(),
+ partStates.get(grpPartId));
- for (Map.Entry<Integer, Long> e :
processed.entrySet()) {
- top.add(new T3<>(e.getValue(), ts, new
GroupPartitionId(grp.groupId(), e.getKey())));
+ if (log.isInfoEnabled()) {
+ T3<Long, Long, GroupPartitionId> curPart = new
T3<>(time, U.currentTimeMillis(), grpPartId);
- trimToSize(top, 5);
- }
+ RestorePartitionStateThreadContext threadCtx =
threadCtxs.computeIfAbsent(
+ Thread.currentThread(),
+ t -> new
RestorePartitionStateThreadContext()
+ );
- topPartRef.updateAndGet(top0 -> {
- if (top0 == null)
- return top;
+ Comparator<T3<Long, Long, GroupPartitionId>>
cmp = processedPartitionComparator();
- for (T3<Long, Long, GroupPartitionId> t2 :
top0) {
- top.add(t2);
+ threadCtx.topPartRef.updateAndGet(top0 -> {
+ if (threadCtx.topPartRef.get() == null ||
+
cmp.compare(threadCtx.topPartRef.get().last(), curPart) < 0) {
Review comment:
`threadCtx.topPartRef.get().last()` -> `prev.last()`
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
##########
@@ -5519,66 +5519,76 @@ private void restorePartitionStates(
if (log.isInfoEnabled())
log.info("Restoring partition state for local groups.");
- AtomicLong totalProcessed = new AtomicLong();
-
AtomicReference<IgniteCheckedException> restoreStateError = new
AtomicReference<>();
ExecutorService sysPool = ctx.pools().getSystemExecutorService();
- CountDownLatch completionLatch = new
CountDownLatch(forGroups.size());
+ final int totalPart = forGroups.stream().mapToInt(grpCtx ->
grpCtx.affinity().partitions()).sum();
- AtomicReference<SortedSet<T3<Long, Long, GroupPartitionId>>>
topPartRef = new AtomicReference<>();
+ CountDownLatch completionLatch = new CountDownLatch(totalPart);
- long totalPart = forGroups.stream().mapToLong(grpCtx ->
grpCtx.affinity().partitions()).sum();
+ Map<Thread, RestorePartitionStateThreadContext> threadCtxs = new
ConcurrentHashMap<>();
- for (CacheGroupContext grp : forGroups) {
- sysPool.execute(() -> {
- try {
- Map<Integer, Long> processed =
grp.offheap().restorePartitionStates(partStates);
+ final int topPartRefLimit = 5;
- totalProcessed.addAndGet(processed.size());
+ for (CacheGroupContext grpCtx : forGroups) {
+ for (int i = 0; i < grpCtx.affinity().partitions(); i++) {
+ final int partId = i;
- if (log.isInfoEnabled()) {
- TreeSet<T3<Long, Long, GroupPartitionId>> top =
- new TreeSet<>(processedPartitionComparator());
+ sysPool.execute(() -> {
+ GroupPartitionId grpPartId = new
GroupPartitionId(grpCtx.groupId(), partId);
- long ts = System.currentTimeMillis();
+ try {
+ long time =
grpCtx.offheap().restoreStateOfPartition(grpPartId.getPartitionId(),
+ partStates.get(grpPartId));
- for (Map.Entry<Integer, Long> e :
processed.entrySet()) {
- top.add(new T3<>(e.getValue(), ts, new
GroupPartitionId(grp.groupId(), e.getKey())));
+ if (log.isInfoEnabled()) {
+ T3<Long, Long, GroupPartitionId> curPart = new
T3<>(time, U.currentTimeMillis(), grpPartId);
- trimToSize(top, 5);
- }
+ RestorePartitionStateThreadContext threadCtx =
threadCtxs.computeIfAbsent(
+ Thread.currentThread(),
+ t -> new
RestorePartitionStateThreadContext()
+ );
- topPartRef.updateAndGet(top0 -> {
- if (top0 == null)
- return top;
+ Comparator<T3<Long, Long, GroupPartitionId>>
cmp = processedPartitionComparator();
- for (T3<Long, Long, GroupPartitionId> t2 :
top0) {
- top.add(t2);
+ threadCtx.topPartRef.updateAndGet(top0 -> {
+ if (threadCtx.topPartRef.get() == null ||
+
cmp.compare(threadCtx.topPartRef.get().last(), curPart) < 0) {
+ SortedSet<T3<Long, Long,
GroupPartitionId>> top = new TreeSet<>(cmp);
- trimToSize(top, 5);
- }
+ top.add(curPart);
- return top;
- });
+ if (top0 != null)
+ top.addAll(top0);
+
+ trimToSize(top, topPartRefLimit);
+
+ return top;
+ }
+ else
+ return top0;
+ });
+
+
RestorePartitionStateThreadContext.PROCESSED_CNT_UPD.incrementAndGet(threadCtx);
Review comment:
Let's make it a `RestorePartitionStateThreadContext` method.
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
##########
@@ -5519,66 +5519,76 @@ private void restorePartitionStates(
if (log.isInfoEnabled())
log.info("Restoring partition state for local groups.");
- AtomicLong totalProcessed = new AtomicLong();
-
AtomicReference<IgniteCheckedException> restoreStateError = new
AtomicReference<>();
ExecutorService sysPool = ctx.pools().getSystemExecutorService();
- CountDownLatch completionLatch = new
CountDownLatch(forGroups.size());
+ final int totalPart = forGroups.stream().mapToInt(grpCtx ->
grpCtx.affinity().partitions()).sum();
- AtomicReference<SortedSet<T3<Long, Long, GroupPartitionId>>>
topPartRef = new AtomicReference<>();
+ CountDownLatch completionLatch = new CountDownLatch(totalPart);
- long totalPart = forGroups.stream().mapToLong(grpCtx ->
grpCtx.affinity().partitions()).sum();
+ Map<Thread, RestorePartitionStateThreadContext> threadCtxs = new
ConcurrentHashMap<>();
- for (CacheGroupContext grp : forGroups) {
- sysPool.execute(() -> {
- try {
- Map<Integer, Long> processed =
grp.offheap().restorePartitionStates(partStates);
+ final int topPartRefLimit = 5;
- totalProcessed.addAndGet(processed.size());
+ for (CacheGroupContext grpCtx : forGroups) {
+ for (int i = 0; i < grpCtx.affinity().partitions(); i++) {
+ final int partId = i;
- if (log.isInfoEnabled()) {
- TreeSet<T3<Long, Long, GroupPartitionId>> top =
- new TreeSet<>(processedPartitionComparator());
+ sysPool.execute(() -> {
+ GroupPartitionId grpPartId = new
GroupPartitionId(grpCtx.groupId(), partId);
- long ts = System.currentTimeMillis();
+ try {
+ long time =
grpCtx.offheap().restoreStateOfPartition(grpPartId.getPartitionId(),
+ partStates.get(grpPartId));
- for (Map.Entry<Integer, Long> e :
processed.entrySet()) {
- top.add(new T3<>(e.getValue(), ts, new
GroupPartitionId(grp.groupId(), e.getKey())));
+ if (log.isInfoEnabled()) {
+ T3<Long, Long, GroupPartitionId> curPart = new
T3<>(time, U.currentTimeMillis(), grpPartId);
- trimToSize(top, 5);
- }
+ RestorePartitionStateThreadContext threadCtx =
threadCtxs.computeIfAbsent(
+ Thread.currentThread(),
+ t -> new
RestorePartitionStateThreadContext()
+ );
- topPartRef.updateAndGet(top0 -> {
- if (top0 == null)
- return top;
+ Comparator<T3<Long, Long, GroupPartitionId>>
cmp = processedPartitionComparator();
- for (T3<Long, Long, GroupPartitionId> t2 :
top0) {
- top.add(t2);
+ threadCtx.topPartRef.updateAndGet(top0 -> {
Review comment:
Please rename `top0` -> `prev`.
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
##########
@@ -5519,66 +5519,76 @@ private void restorePartitionStates(
if (log.isInfoEnabled())
log.info("Restoring partition state for local groups.");
- AtomicLong totalProcessed = new AtomicLong();
-
AtomicReference<IgniteCheckedException> restoreStateError = new
AtomicReference<>();
ExecutorService sysPool = ctx.pools().getSystemExecutorService();
- CountDownLatch completionLatch = new
CountDownLatch(forGroups.size());
+ final int totalPart = forGroups.stream().mapToInt(grpCtx ->
grpCtx.affinity().partitions()).sum();
- AtomicReference<SortedSet<T3<Long, Long, GroupPartitionId>>>
topPartRef = new AtomicReference<>();
+ CountDownLatch completionLatch = new CountDownLatch(totalPart);
- long totalPart = forGroups.stream().mapToLong(grpCtx ->
grpCtx.affinity().partitions()).sum();
+ Map<Thread, RestorePartitionStateThreadContext> threadCtxs = new
ConcurrentHashMap<>();
- for (CacheGroupContext grp : forGroups) {
- sysPool.execute(() -> {
- try {
- Map<Integer, Long> processed =
grp.offheap().restorePartitionStates(partStates);
+ final int topPartRefLimit = 5;
- totalProcessed.addAndGet(processed.size());
+ for (CacheGroupContext grpCtx : forGroups) {
+ for (int i = 0; i < grpCtx.affinity().partitions(); i++) {
+ final int partId = i;
- if (log.isInfoEnabled()) {
- TreeSet<T3<Long, Long, GroupPartitionId>> top =
- new TreeSet<>(processedPartitionComparator());
+ sysPool.execute(() -> {
+ GroupPartitionId grpPartId = new
GroupPartitionId(grpCtx.groupId(), partId);
- long ts = System.currentTimeMillis();
+ try {
+ long time =
grpCtx.offheap().restoreStateOfPartition(grpPartId.getPartitionId(),
+ partStates.get(grpPartId));
- for (Map.Entry<Integer, Long> e :
processed.entrySet()) {
- top.add(new T3<>(e.getValue(), ts, new
GroupPartitionId(grp.groupId(), e.getKey())));
+ if (log.isInfoEnabled()) {
+ T3<Long, Long, GroupPartitionId> curPart = new
T3<>(time, U.currentTimeMillis(), grpPartId);
- trimToSize(top, 5);
- }
+ RestorePartitionStateThreadContext threadCtx =
threadCtxs.computeIfAbsent(
+ Thread.currentThread(),
+ t -> new
RestorePartitionStateThreadContext()
+ );
- topPartRef.updateAndGet(top0 -> {
- if (top0 == null)
- return top;
+ Comparator<T3<Long, Long, GroupPartitionId>>
cmp = processedPartitionComparator();
- for (T3<Long, Long, GroupPartitionId> t2 :
top0) {
- top.add(t2);
+ threadCtx.topPartRef.updateAndGet(top0 -> {
+ if (threadCtx.topPartRef.get() == null ||
Review comment:
Use `prev == null`
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
##########
@@ -89,16 +88,28 @@
*/
public void stop();
+ /**
+ * Pre-create single partition that resides in page memory or WAL and
restores their state.
+ *
+ * @param p Partition id.
+ * @param recoveryState Partition recovery state.
+ * @return Processing time in millis.
+ * @throws IgniteCheckedException If failed.
+ */
+ long restoreStateOfPartition(int p, @Nullable Integer recoveryState)
throws IgniteCheckedException;
+
/**
* Pre-create partitions that resides in page memory or WAL and restores
their state.
*
- * @param partRecoveryStates Partition recovery states.
- * @return Processed partitions: partition id -> processing time in millis.
* @throws IgniteCheckedException If failed.
*/
- Map<Integer, Long> restorePartitionStates(
- Map<GroupPartitionId, Integer> partRecoveryStates
- ) throws IgniteCheckedException;
+ void restorePartitionStates() throws IgniteCheckedException;
+
+ /**
+ * Confirm that partition states are restored. This method should be
called after restoring state of all partitions
+ * in group using {@link #restoreStateOfPartition(int, Integer)}.
+ */
+ void confirmPartitionStatesRestored();
Review comment:
It's look like callback: `onRestorePartitionStates`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]