alex-plekhanov commented on a change in pull request #8846:
URL: https://github.com/apache/ignite/pull/8846#discussion_r600404995
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/Checkpointer.java
##########
@@ -713,31 +713,34 @@ public void schedulePartitionDestroy(@Nullable
CacheGroupContext grpCtx, int grp
/**
* @param grpId Group ID.
* @param partId Partition ID.
+ * @return {@code True} if the request to destroy the partition was
canceled.
*/
- public void cancelOrWaitPartitionDestroy(int grpId, int partId) throws
IgniteCheckedException {
+ public boolean cancelOrWaitPartitionDestroy(int grpId, int partId) throws
IgniteCheckedException {
PartitionDestroyRequest req;
+ boolean canceled = false;
synchronized (this) {
- req = scheduledCp.getDestroyQueue().cancelDestroy(grpId, partId);
- }
-
- if (req != null)
- req.waitCompleted();
+ req = scheduledCp.getDestroyQueue().removeRequest(grpId, partId);
- CheckpointProgressImpl cur;
+ if (req == null) {
+ CheckpointProgressImpl cur = curCpProgress;
- synchronized (this) {
- cur = curCpProgress;
+ if (cur != null)
+ req = cur.getDestroyQueue().removeRequest(grpId, partId);
Review comment:
Can we have the same partition destroy request in both scheduledCp and
curCpProgress?
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/managers/encryption/CacheGroupPageScanner.java
##########
@@ -442,74 +485,88 @@ public long remainingPagesCount() {
return remainingPagesCntr.get();
}
- /** {@inheritDoc} */
- @Override public void run() {
- try {
- for (int partId : parts) {
- long state =
ctx.encryption().getEncryptionState(grp.groupId(), partId);
-
- if (state == 0)
- continue;
-
- scanPartition(partId,
ReencryptStateUtils.pageIndex(state), ReencryptStateUtils.pageCount(state));
+ /**
+ * @param partId Partition ID.
+ */
+ private void schedulePartitionScan(int partId) {
+ singleExecSvc.submit(() -> scanPartition(partId));
+ }
- if (isDone())
- return;
- }
+ /**
+ * Check the completeness of the cache group scan.
+ */
+ private synchronized void checkComplete() {
+ if (!isDone() && parts.isEmpty() && !cpWaitGrps.contains(this))
+ cpWaitGrps.add(this);
+ }
- boolean added = cpWaitGrps.add(this);
+ /**
+ * @param partId Partition ID.
+ * @return {@code True} if partition has been evicted.
+ */
+ private boolean evicted(int partId) {
+ if (partId == PageIdAllocator.INDEX_PARTITION)
+ return false;
- assert added;
- }
- catch (Throwable t) {
- if (X.hasCause(t, NodeStoppingException.class))
- onCancelled();
- else
- onDone(t);
- }
+ return !parts.contains(partId) ||
+ grp.topology().localPartition(partId).state() ==
GridDhtPartitionState.EVICTED;
}
/**
* @param partId Partition ID.
- * @param off Start page offset.
- * @param cnt Count of pages to scan.
*/
- private void scanPartition(int partId, int off, int cnt) throws
IgniteCheckedException {
+ private void scanPartition(int partId) {
+ long state = ctx.encryption().getEncryptionState(grp.groupId(),
partId);
+
+ int off = ReencryptStateUtils.pageIndex(state);
+ int cnt = ReencryptStateUtils.pageCount(state);
+
if (log.isDebugEnabled()) {
log.debug("Partition reencryption is started [grpId=" +
grp.groupId() +
", p=" + partId + ", remain=" + (cnt - off) + ", total=" +
cnt + "]");
}
- while (off < cnt) {
- int pagesCnt = Math.min(batchSize, cnt - off);
+ try {
+ while (off < cnt) {
+ int pagesCnt = Math.min(batchSize, cnt - off);
- limiter.acquire(pagesCnt);
+ limiter.acquire(pagesCnt);
- synchronized (this) {
- if (isDone() || !parts.contains(partId))
- break;
+ synchronized (this) {
+ if (isDone() || evicted(partId))
Review comment:
Let's move evicted check under checkpoint read lock. There can be a race
again in the current implementation with the same outcome as fixed by this
patch.
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/Checkpointer.java
##########
@@ -713,31 +713,34 @@ public void schedulePartitionDestroy(@Nullable
CacheGroupContext grpCtx, int grp
/**
* @param grpId Group ID.
* @param partId Partition ID.
+ * @return {@code True} if the request to destroy the partition was
canceled.
*/
- public void cancelOrWaitPartitionDestroy(int grpId, int partId) throws
IgniteCheckedException {
+ public boolean cancelOrWaitPartitionDestroy(int grpId, int partId) throws
IgniteCheckedException {
PartitionDestroyRequest req;
+ boolean canceled = false;
synchronized (this) {
- req = scheduledCp.getDestroyQueue().cancelDestroy(grpId, partId);
- }
-
- if (req != null)
- req.waitCompleted();
+ req = scheduledCp.getDestroyQueue().removeRequest(grpId, partId);
- CheckpointProgressImpl cur;
+ if (req == null) {
+ CheckpointProgressImpl cur = curCpProgress;
- synchronized (this) {
- cur = curCpProgress;
+ if (cur != null)
+ req = cur.getDestroyQueue().removeRequest(grpId, partId);
+ }
- if (cur != null)
- req = cur.getDestroyQueue().cancelDestroy(grpId, partId);
+ if (req != null)
+ canceled = req.cancel();
}
- if (req != null)
- req.waitCompleted();
+ if (req != null) {
+ if (!canceled)
+ req.waitCompleted();
+ else if (log.isInfoEnabled())
+ log.info("Partition file destroy has cancelled [grpId=" +
grpId + ", partId=" + partId + "]");
Review comment:
Lets keep log level as is.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]