[GitHub] [ignite] nizhikov commented on a change in pull request #6982: IGNITE-12295: Index partition purge for file-based rebalancing

2019-10-21 Thread GitBox
nizhikov commented on a change in pull request #6982: IGNITE-12295: Index 
partition purge for file-based rebalancing
URL: https://github.com/apache/ignite/pull/6982#discussion_r337077458
 
 

 ##
 File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionsEvictManager.java
 ##
 @@ -791,10 +951,312 @@ private int calculateBucket() {
 switch (QUEUE_TYPE) {
 case 1:
 return new PriorityBlockingQueue<>(
-1000, Comparator.comparingLong(p -> 
p.part.fullSize()));
+1000,
+Comparator.comparing((AbstractEvictionTask t) -> 
t.id.type.ordinal())
+.reversed()
+.thenComparing((AbstractEvictionTask t) -> t.part 
== null ? 0 : t.part.fullSize()));
 default:
 return new LinkedBlockingQueue<>();
 }
 }
 }
+
+/**
+ * Context of exclusive partition purge.
+ */
+private class ExclusivePurgeFuture extends GridFutureAdapter {
+/** Group eviction context. */
+GroupEvictionContext grpEvictionCtx;
+
+/** Partition ids. */
+Set partIds;
+
+/** Partitions. */
+List parts;
+
+/** All partitions initialized counter. */
+AtomicInteger initCounter;
+
+/** */
+AtomicInteger resultCounter;
+
+/** Tasks future */
+GridCompoundFuture resFut = new GridCompoundFuture<>();
+
+/** Allows to cancel index tasks. */
+GridCompoundFuture cancelFut = new GridCompoundFuture<>();
+
+/**
+ *
+ * @param grpEvictionCtx Group eviction context.
+ * @param parts Partitions.
+ */
+public ExclusivePurgeFuture(GroupEvictionContext grpEvictionCtx, 
List parts) {
+this.grpEvictionCtx = grpEvictionCtx;
+this.parts = parts;
+
+partIds = Collections.unmodifiableSet(
+
parts.stream().mapToInt(GridDhtLocalPartition::id).boxed().collect(Collectors.toSet()));
+
+initCounter = new AtomicInteger(parts.size());
+
+resultCounter = new AtomicInteger(parts.size());
+}
+
+/**
+ * Initializes partition for exclusive purge.
+ *
+ * @param part Partition.
+ */
+public void onInit(GridDhtLocalPartition part) {
+if (log.isInfoEnabled())
+log.info("Partition confirmed exclusive purge. [partId=" + 
part.id() + "]");
+
+part.onClearFinished(f -> {
+if (resultCounter.decrementAndGet() == 0) {
+assert resFut.isDone();
+
+if (resFut.error() == null)
+onDone();
+else
+onDone(resFut.error());
+}
+});
+
+if (initCounter.decrementAndGet() == 0)
+initTasks();
+}
+
+/**
+ * Initializes tasks.
+ */
+private void initTasks() {
+List tasks = new ArrayList<>();
+List idxTasks = new ArrayList<>();
+
+for (GridDhtLocalPartition p : parts) {
+ClearOnheapEntriesTask task = new 
ClearOnheapEntriesTask(grpEvictionCtx, p);
+
+tasks.add(task);
+
+resFut.add(task.finishFut);
+}
+
+CacheGroupContext grp = grpEvictionCtx.grp;
+
+GridCompoundFuture idxFut = null;
+
+if (grp.shared().kernalContext().query().moduleEnabled()) {
+GridQueryIndexing idx = 
grp.shared().kernalContext().query().getIndexing();
+
+GridQueryRowCacheCleaner rowCacheCleaner = 
idx.rowCacheCleaner(grp.groupId());
+
+if (rowCacheCleaner != null) {
+RowCacheCleanerTask task = new 
RowCacheCleanerTask(grpEvictionCtx, rowCacheCleaner, partIds);
+
+tasks.add(task);
+
+resFut.add(task.finishFut);
+}
+
+List>> 
idxPurgeList =
+idx.purgeIndexPartitions(grp, partIds);
+
+assert idxPurgeList != null;
+
+if (!idxPurgeList.isEmpty()) {
+idxFut = new GridCompoundFuture<>();
+
+for (IgniteBiTuple> r 
: idxPurgeList) {
+AbstractEvictionTask idxTask = new 
IndexPurgeTask(grpEvictionCtx, r.get1(), r.get2());
+
+cancelFut.add(r.get2());
+
+idxFut.add(idxTask.finishFut);
+
+idxTasks.add(idxTask);
+}
+
+cancelFut.markInitialized();
+
+idxFut.markInitialized();
+
+resFut.add(idxFut);
+}
+}
+
+resFut.markInitialized();
+
+

[GitHub] [ignite] nizhikov commented on a change in pull request #6982: IGNITE-12295: Index partition purge for file-based rebalancing

2019-10-21 Thread GitBox
nizhikov commented on a change in pull request #6982: IGNITE-12295: Index 
partition purge for file-based rebalancing
URL: https://github.com/apache/ignite/pull/6982#discussion_r337076921
 
 

 ##
 File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionsEvictManager.java
 ##
 @@ -791,10 +951,312 @@ private int calculateBucket() {
 switch (QUEUE_TYPE) {
 case 1:
 return new PriorityBlockingQueue<>(
-1000, Comparator.comparingLong(p -> 
p.part.fullSize()));
+1000,
+Comparator.comparing((AbstractEvictionTask t) -> 
t.id.type.ordinal())
+.reversed()
+.thenComparing((AbstractEvictionTask t) -> t.part 
== null ? 0 : t.part.fullSize()));
 default:
 return new LinkedBlockingQueue<>();
 }
 }
 }
+
+/**
+ * Context of exclusive partition purge.
+ */
+private class ExclusivePurgeFuture extends GridFutureAdapter {
+/** Group eviction context. */
+GroupEvictionContext grpEvictionCtx;
+
+/** Partition ids. */
+Set partIds;
+
+/** Partitions. */
+List parts;
+
+/** All partitions initialized counter. */
+AtomicInteger initCounter;
+
+/** */
+AtomicInteger resultCounter;
+
+/** Tasks future */
+GridCompoundFuture resFut = new GridCompoundFuture<>();
+
+/** Allows to cancel index tasks. */
+GridCompoundFuture cancelFut = new GridCompoundFuture<>();
+
+/**
+ *
+ * @param grpEvictionCtx Group eviction context.
+ * @param parts Partitions.
+ */
+public ExclusivePurgeFuture(GroupEvictionContext grpEvictionCtx, 
List parts) {
+this.grpEvictionCtx = grpEvictionCtx;
+this.parts = parts;
+
+partIds = Collections.unmodifiableSet(
+
parts.stream().mapToInt(GridDhtLocalPartition::id).boxed().collect(Collectors.toSet()));
+
+initCounter = new AtomicInteger(parts.size());
+
+resultCounter = new AtomicInteger(parts.size());
+}
+
+/**
+ * Initializes partition for exclusive purge.
+ *
+ * @param part Partition.
+ */
+public void onInit(GridDhtLocalPartition part) {
+if (log.isInfoEnabled())
+log.info("Partition confirmed exclusive purge. [partId=" + 
part.id() + "]");
+
+part.onClearFinished(f -> {
+if (resultCounter.decrementAndGet() == 0) {
+assert resFut.isDone();
+
+if (resFut.error() == null)
+onDone();
+else
+onDone(resFut.error());
+}
+});
+
+if (initCounter.decrementAndGet() == 0)
+initTasks();
+}
+
+/**
+ * Initializes tasks.
+ */
+private void initTasks() {
+List tasks = new ArrayList<>();
+List idxTasks = new ArrayList<>();
+
+for (GridDhtLocalPartition p : parts) {
+ClearOnheapEntriesTask task = new 
ClearOnheapEntriesTask(grpEvictionCtx, p);
+
+tasks.add(task);
+
+resFut.add(task.finishFut);
+}
+
+CacheGroupContext grp = grpEvictionCtx.grp;
+
+GridCompoundFuture idxFut = null;
+
+if (grp.shared().kernalContext().query().moduleEnabled()) {
+GridQueryIndexing idx = 
grp.shared().kernalContext().query().getIndexing();
+
+GridQueryRowCacheCleaner rowCacheCleaner = 
idx.rowCacheCleaner(grp.groupId());
+
+if (rowCacheCleaner != null) {
+RowCacheCleanerTask task = new 
RowCacheCleanerTask(grpEvictionCtx, rowCacheCleaner, partIds);
+
+tasks.add(task);
+
+resFut.add(task.finishFut);
+}
+
+List>> 
idxPurgeList =
+idx.purgeIndexPartitions(grp, partIds);
+
+assert idxPurgeList != null;
+
+if (!idxPurgeList.isEmpty()) {
+idxFut = new GridCompoundFuture<>();
+
+for (IgniteBiTuple> r 
: idxPurgeList) {
+AbstractEvictionTask idxTask = new 
IndexPurgeTask(grpEvictionCtx, r.get1(), r.get2());
+
+cancelFut.add(r.get2());
+
+idxFut.add(idxTask.finishFut);
+
+idxTasks.add(idxTask);
+}
+
+cancelFut.markInitialized();
+
+idxFut.markInitialized();
+
+resFut.add(idxFut);
+}
+}
+
+resFut.markInitialized();
+
+

[GitHub] [ignite] nizhikov commented on a change in pull request #6982: IGNITE-12295: Index partition purge for file-based rebalancing

2019-10-21 Thread GitBox
nizhikov commented on a change in pull request #6982: IGNITE-12295: Index 
partition purge for file-based rebalancing
URL: https://github.com/apache/ignite/pull/6982#discussion_r337059220
 
 

 ##
 File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
 ##
 @@ -1826,6 +1827,23 @@ public final boolean removex(L row) throws 
IgniteCheckedException {
 return res != null ? res : false;
 }
 
+/**
+ * Returns a cursor that removes rows when being iterated.
+ * Uses a specified closure as a predicate to filter rows.
+ *
+ * @param clo TreeRowClosure Closure to filter rows.
+ * @throws IgniteCheckedException If failed.
+ */
+public GridCursor purge(TreeRowClosure clo) throws 
IgniteCheckedException {
 
 Review comment:
   I don't get why we should share "cursor that removes rows when being 
iterated"?
   Can we implement regular `Iterator` with support of `remove` operation?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [ignite] nizhikov commented on a change in pull request #6982: IGNITE-12295: Index partition purge for file-based rebalancing

2019-10-21 Thread GitBox
nizhikov commented on a change in pull request #6982: IGNITE-12295: Index 
partition purge for file-based rebalancing
URL: https://github.com/apache/ignite/pull/6982#discussion_r337052694
 
 

 ##
 File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionsEvictManager.java
 ##
 @@ -133,6 +151,81 @@ public void evictPartitionAsync(CacheGroupContext grp, 
GridDhtLocalPartition par
 }
 }
 
+/**
+ *
+ * @param grp Cache group context.
+ * @param parts Partitions.
+ * @throws IgniteCheckedException If failed.
+ * @return Future.
+ */
+public IgniteInternalFuture 
purgePartitionsExclusively(CacheGroupContext grp,
+List parts) throws IgniteCheckedException {
+validateCacheGroupForExclusivePurge(grp);
+
+if (F.isEmpty(parts))
+return new GridFinishedFuture<>();
+
+GroupEvictionContext grpEvictionCtx = 
evictionGroupsMap.computeIfAbsent(
+grp.groupId(), (k) -> new GroupEvictionContext(grp));
+
+ExclusivePurgeFuture fut = new ExclusivePurgeFuture(grpEvictionCtx, 
parts);
+
+synchronized (mux) {
+if (grpEvictionCtx.exclPurgeFut != null)
+throw new IgniteCheckedException("Only one exclusive purge can 
be scheduled for the same cache group." +
+" [grpId=" + grp.groupId() + "]");
+
+Set res = new HashSet<>();
+
+for (TaskId taskId : grpEvictionCtx.taskIds)
+res.add(taskId.part);
+
+res.retainAll(fut.partIds);
+
+if (!res.isEmpty())
+throw new IgniteCheckedException("Can't schedule exclusive 
purge for a partition " +
 
 Review comment:
   1. We should add {} braces for each multi-line if branch.
   2. How do we expect callee handle the IgniteCheckedException we throw?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [ignite] nizhikov commented on a change in pull request #6982: IGNITE-12295: Index partition purge for file-based rebalancing

2019-10-21 Thread GitBox
nizhikov commented on a change in pull request #6982: IGNITE-12295: Index 
partition purge for file-based rebalancing
URL: https://github.com/apache/ignite/pull/6982#discussion_r337002778
 
 

 ##
 File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/PartitionsEvictManager.java
 ##
 @@ -133,6 +151,81 @@ public void evictPartitionAsync(CacheGroupContext grp, 
GridDhtLocalPartition par
 }
 }
 
+/**
+ *
+ * @param grp Cache group context.
+ * @param parts Partitions.
+ * @throws IgniteCheckedException If failed.
+ * @return Future.
+ */
+public IgniteInternalFuture 
purgePartitionsExclusively(CacheGroupContext grp,
+List parts) throws IgniteCheckedException {
+validateCacheGroupForExclusivePurge(grp);
+
+if (F.isEmpty(parts))
+return new GridFinishedFuture<>();
+
+GroupEvictionContext grpEvictionCtx = 
evictionGroupsMap.computeIfAbsent(
+grp.groupId(), (k) -> new GroupEvictionContext(grp));
+
+ExclusivePurgeFuture fut = new ExclusivePurgeFuture(grpEvictionCtx, 
parts);
+
+synchronized (mux) {
+if (grpEvictionCtx.exclPurgeFut != null)
 
 Review comment:
   1. We should add {} braces for each multi-line if branch.
   
   2. How we expect callee handle the `IgniteCheckedException` we throw?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [ignite] nizhikov commented on a change in pull request #6982: IGNITE-12295: Index partition purge for file-based rebalancing

2019-10-21 Thread GitBox
nizhikov commented on a change in pull request #6982: IGNITE-12295: Index 
partition purge for file-based rebalancing
URL: https://github.com/apache/ignite/pull/6982#discussion_r336988779
 
 

 ##
 File path: 
modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2Tree.java
 ##
 @@ -676,4 +681,61 @@ public boolean created() {
 
 return e;
 }
+
+/**
+ * Purge all rows that belong to specific partitions from the index.
+ *
+ * @param parts Partitions.
+ * @param shouldStop Allows to check stop condition.
+ */
+public void purge(Set parts, IgniteCallable shouldStop) 
throws IgniteCheckedException {
+assert !F.isEmpty(parts) : "empty parts";
+assert shouldStop != null : "shouldStop is null";
 
 Review comment:
   Do we need this comment?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [ignite] nizhikov commented on a change in pull request #6982: IGNITE-12295: Index partition purge for file-based rebalancing

2019-10-21 Thread GitBox
nizhikov commented on a change in pull request #6982: IGNITE-12295: Index 
partition purge for file-based rebalancing
URL: https://github.com/apache/ignite/pull/6982#discussion_r336987839
 
 

 ##
 File path: 
modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java
 ##
 @@ -887,6 +890,32 @@ public GridH2RowMessage toSearchRowMessage(SearchRow row) 
{
 return res;
 }
 
+/**
+ * Removes all index rows that belong to the specific data partitions.
+ *
+ * @param parts Partitions.
+ * @param shouldStop Allows to check stop condition.
+ */
+public void purge(Set parts, IgniteCallable shouldStop) 
throws IgniteCheckedException {
+if (F.isEmpty(parts))
+return;
+
+if (segments.length == 1)
 
 Review comment:
   What we gain with this optimization?
   Can we simply keep the for loop to make this code more clear?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [ignite] nizhikov commented on a change in pull request #6982: IGNITE-12295: Index partition purge for file-based rebalancing

2019-10-21 Thread GitBox
nizhikov commented on a change in pull request #6982: IGNITE-12295: Index 
partition purge for file-based rebalancing
URL: https://github.com/apache/ignite/pull/6982#discussion_r336986892
 
 

 ##
 File path: 
modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java
 ##
 @@ -887,6 +890,32 @@ public GridH2RowMessage toSearchRowMessage(SearchRow row) 
{
 return res;
 }
 
+/**
+ * Removes all index rows that belong to the specific data partitions.
+ *
+ * @param parts Partitions.
+ * @param shouldStop Allows to check stop condition.
+ */
+public void purge(Set parts, IgniteCallable shouldStop) 
throws IgniteCheckedException {
+if (F.isEmpty(parts))
 
 Review comment:
   We can't get here if `parts` is empty.
   Seems, we should replace this check with the assert.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [ignite] nizhikov commented on a change in pull request #6982: IGNITE-12295: Index partition purge for file-based rebalancing

2019-10-21 Thread GitBox
nizhikov commented on a change in pull request #6982: IGNITE-12295: Index 
partition purge for file-based rebalancing
URL: https://github.com/apache/ignite/pull/6982#discussion_r336985055
 
 

 ##
 File path: 
modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2Tree.java
 ##
 @@ -676,4 +681,61 @@ public boolean created() {
 
 return e;
 }
+
+/**
+ * Purge all rows that belong to specific partitions from the index.
+ *
+ * @param parts Partitions.
+ * @param shouldStop Allows to check stop condition.
+ */
+public void purge(Set parts, IgniteCallable shouldStop) 
throws IgniteCheckedException {
+assert !F.isEmpty(parts) : "empty parts";
 
 Review comment:
   Sentences should start with the capital letter.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [ignite] nizhikov commented on a change in pull request #6982: IGNITE-12295: Index partition purge for file-based rebalancing

2019-10-21 Thread GitBox
nizhikov commented on a change in pull request #6982: IGNITE-12295: Index 
partition purge for file-based rebalancing
URL: https://github.com/apache/ignite/pull/6982#discussion_r336984067
 
 

 ##
 File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
 ##
 @@ -5728,6 +5746,332 @@ protected abstract Result run0(long pageId, long page, 
long pageAddr, BPlusIO
 }
 }
 
+/**
+ * Cursor that filters and removes rows.
+ *
+ * Most rows are expected to be deleted "in place" on a leaf page.
+ * The last row and the rightmost row on a page are deleted starting from 
root.
+ */
+private class PurgeCursor implements GridCursor {
+/** Row filter. */
+private final TreeRowClosure clo;
+
+/** Current page id. */
+private long curPageId = 0L;
+
+/** Last row to get back to if current page turns out removed. */
+private L lastRow = null;
+
+/** Row to remove starting "from root". */
+private L removeRow = null;
+
+/**
+ * Constructor.
+ *
+ * @param clo Row filter.
+ */
+PurgeCursor(TreeRowClosure clo) {
+this.clo = clo;
+}
+
+/**
+ * Initializes the cursor.
+ *
+ * @throws IgniteCheckedException If failed.
+ */
+void start() throws IgniteCheckedException {
+checkDestroyed();
+
+long metaPage = acquirePage(metaPageId);
+try {
+curPageId = getFirstPageId(metaPageId, metaPage, 0);
+}
+finally {
+releasePage(metaPageId, metaPage);
+}
+}
+
+/** {@inheritDoc} */
+@Override public boolean next() throws IgniteCheckedException {
+checkDestroyed();
+
+if (curPageId == 0L && lastRow == null)
+return false;
+
+return next0(curPageId);
+}
+
+/** {@inheritDoc} */
+@Override public Void get() throws IgniteCheckedException {
+checkDestroyed();
+
+try {
+doPurge(curPageId);
+
+if (removeRow != null) {
+doRemove(removeRow, false);
+
+lastRow = removeRow;
+removeRow = null;
+}
+}
+catch (RuntimeException | AssertionError e) {
+throw new BPlusTreeRuntimeException(e, grpId);
+}
+
+return null;
+}
+
+/**
+ * Continues scanning the leaf pages of the tree.
+ *
+ * @param pageId Page id to start with.
+ * @return {@code true} if there is a row(s) to remove.
+ * @throws IgniteCheckedException If failed.
+ */
+private boolean next0(long pageId) throws IgniteCheckedException {
+for (;;) {
+if (pageId == 0L && lastRow != null) {
+ReinitHelper reinit = new ReinitHelper(lastRow);
+
+doFind(reinit);
+
+pageId = reinit.pageId();
+}
+
+long page = acquirePage(pageId);
+try {
+long pageAddr = readLock(pageId, page);
+
+if (pageAddr == 0L) { // this page is deleted. Find from 
root using lastRow.
+pageId = 0L;
+
+continue;
+}
+
+try {
+for (;;) {
+BPlusIO io = io(pageAddr);
+
+assert io.isLeaf();
+
+long nextPageId = io.getForward(pageAddr);
+
+int cnt = io.getCount(pageAddr);
+
+if (cnt == 0) {
+assert nextPageId == 0L;
+
+return false;
+}
+
+for (int idx = 0; idx < cnt; idx++) {
+if (clo.apply(BPlusTree.this, io, pageAddr, 
idx)) {
+lastRow = io.getLookupRow(BPlusTree.this, 
pageAddr, cnt - 1);
+curPageId = pageId;
+
+return true;
+}
+}
+lastRow = null;
+
+if (nextPageId == 0L)
+return false;
+
+long nextPage = acquirePage(nextPageId);
+try {
+long nextPageAddr = readLock(nextPageId, 
nextPage);
+
+assert nextPageAddr != 0L : "next page removed 
while back page is being locked";
+
+try {
+long pa 

[GitHub] [ignite] nizhikov commented on a change in pull request #6982: IGNITE-12295: Index partition purge for file-based rebalancing

2019-10-21 Thread GitBox
nizhikov commented on a change in pull request #6982: IGNITE-12295: Index 
partition purge for file-based rebalancing
URL: https://github.com/apache/ignite/pull/6982#discussion_r336983284
 
 

 ##
 File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
 ##
 @@ -5728,6 +5746,332 @@ protected abstract Result run0(long pageId, long page, 
long pageAddr, BPlusIO
 }
 }
 
+/**
+ * Cursor that filters and removes rows.
+ *
+ * Most rows are expected to be deleted "in place" on a leaf page.
+ * The last row and the rightmost row on a page are deleted starting from 
root.
+ */
+private class PurgeCursor implements GridCursor {
+/** Row filter. */
+private final TreeRowClosure clo;
+
+/** Current page id. */
+private long curPageId = 0L;
+
+/** Last row to get back to if current page turns out removed. */
+private L lastRow = null;
+
+/** Row to remove starting "from root". */
+private L removeRow = null;
+
+/**
+ * Constructor.
+ *
+ * @param clo Row filter.
+ */
+PurgeCursor(TreeRowClosure clo) {
+this.clo = clo;
+}
+
+/**
+ * Initializes the cursor.
+ *
+ * @throws IgniteCheckedException If failed.
+ */
+void start() throws IgniteCheckedException {
+checkDestroyed();
+
+long metaPage = acquirePage(metaPageId);
+try {
+curPageId = getFirstPageId(metaPageId, metaPage, 0);
+}
+finally {
+releasePage(metaPageId, metaPage);
+}
+}
+
+/** {@inheritDoc} */
+@Override public boolean next() throws IgniteCheckedException {
+checkDestroyed();
+
+if (curPageId == 0L && lastRow == null)
+return false;
+
+return next0(curPageId);
+}
+
+/** {@inheritDoc} */
+@Override public Void get() throws IgniteCheckedException {
+checkDestroyed();
+
+try {
+doPurge(curPageId);
+
+if (removeRow != null) {
+doRemove(removeRow, false);
+
+lastRow = removeRow;
+removeRow = null;
+}
+}
+catch (RuntimeException | AssertionError e) {
+throw new BPlusTreeRuntimeException(e, grpId);
+}
+
+return null;
+}
+
+/**
+ * Continues scanning the leaf pages of the tree.
+ *
+ * @param pageId Page id to start with.
+ * @return {@code true} if there is a row(s) to remove.
+ * @throws IgniteCheckedException If failed.
+ */
+private boolean next0(long pageId) throws IgniteCheckedException {
+for (;;) {
+if (pageId == 0L && lastRow != null) {
+ReinitHelper reinit = new ReinitHelper(lastRow);
+
+doFind(reinit);
+
+pageId = reinit.pageId();
+}
+
+long page = acquirePage(pageId);
+try {
+long pageAddr = readLock(pageId, page);
+
+if (pageAddr == 0L) { // this page is deleted. Find from 
root using lastRow.
+pageId = 0L;
+
+continue;
+}
+
+try {
+for (;;) {
+BPlusIO io = io(pageAddr);
+
+assert io.isLeaf();
+
+long nextPageId = io.getForward(pageAddr);
+
+int cnt = io.getCount(pageAddr);
+
+if (cnt == 0) {
+assert nextPageId == 0L;
+
+return false;
+}
+
+for (int idx = 0; idx < cnt; idx++) {
+if (clo.apply(BPlusTree.this, io, pageAddr, 
idx)) {
+lastRow = io.getLookupRow(BPlusTree.this, 
pageAddr, cnt - 1);
+curPageId = pageId;
+
+return true;
+}
+}
+lastRow = null;
+
+if (nextPageId == 0L)
+return false;
+
+long nextPage = acquirePage(nextPageId);
+try {
+long nextPageAddr = readLock(nextPageId, 
nextPage);
+
+assert nextPageAddr != 0L : "next page removed 
while back page is being locked";
+
+try {
+long pa 

[GitHub] [ignite] nizhikov commented on a change in pull request #6982: IGNITE-12295: Index partition purge for file-based rebalancing

2019-10-21 Thread GitBox
nizhikov commented on a change in pull request #6982: IGNITE-12295: Index 
partition purge for file-based rebalancing
URL: https://github.com/apache/ignite/pull/6982#discussion_r336983048
 
 

 ##
 File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
 ##
 @@ -5728,6 +5746,332 @@ protected abstract Result run0(long pageId, long page, 
long pageAddr, BPlusIO
 }
 }
 
+/**
+ * Cursor that filters and removes rows.
+ *
+ * Most rows are expected to be deleted "in place" on a leaf page.
+ * The last row and the rightmost row on a page are deleted starting from 
root.
+ */
+private class PurgeCursor implements GridCursor {
+/** Row filter. */
+private final TreeRowClosure clo;
+
+/** Current page id. */
+private long curPageId = 0L;
+
+/** Last row to get back to if current page turns out removed. */
+private L lastRow = null;
+
+/** Row to remove starting "from root". */
+private L removeRow = null;
+
+/**
+ * Constructor.
+ *
+ * @param clo Row filter.
+ */
+PurgeCursor(TreeRowClosure clo) {
+this.clo = clo;
+}
+
+/**
+ * Initializes the cursor.
+ *
+ * @throws IgniteCheckedException If failed.
+ */
+void start() throws IgniteCheckedException {
+checkDestroyed();
+
+long metaPage = acquirePage(metaPageId);
+try {
+curPageId = getFirstPageId(metaPageId, metaPage, 0);
+}
+finally {
+releasePage(metaPageId, metaPage);
+}
+}
+
+/** {@inheritDoc} */
+@Override public boolean next() throws IgniteCheckedException {
+checkDestroyed();
+
+if (curPageId == 0L && lastRow == null)
+return false;
+
+return next0(curPageId);
+}
+
+/** {@inheritDoc} */
+@Override public Void get() throws IgniteCheckedException {
+checkDestroyed();
+
+try {
+doPurge(curPageId);
+
+if (removeRow != null) {
+doRemove(removeRow, false);
+
+lastRow = removeRow;
+removeRow = null;
+}
+}
+catch (RuntimeException | AssertionError e) {
+throw new BPlusTreeRuntimeException(e, grpId);
+}
+
+return null;
+}
+
+/**
+ * Continues scanning the leaf pages of the tree.
+ *
+ * @param pageId Page id to start with.
+ * @return {@code true} if there is a row(s) to remove.
+ * @throws IgniteCheckedException If failed.
+ */
+private boolean next0(long pageId) throws IgniteCheckedException {
+for (;;) {
+if (pageId == 0L && lastRow != null) {
+ReinitHelper reinit = new ReinitHelper(lastRow);
+
+doFind(reinit);
+
+pageId = reinit.pageId();
+}
+
+long page = acquirePage(pageId);
+try {
+long pageAddr = readLock(pageId, page);
+
+if (pageAddr == 0L) { // this page is deleted. Find from 
root using lastRow.
+pageId = 0L;
+
+continue;
+}
+
+try {
+for (;;) {
+BPlusIO io = io(pageAddr);
+
+assert io.isLeaf();
+
+long nextPageId = io.getForward(pageAddr);
+
+int cnt = io.getCount(pageAddr);
+
+if (cnt == 0) {
+assert nextPageId == 0L;
+
+return false;
+}
+
+for (int idx = 0; idx < cnt; idx++) {
+if (clo.apply(BPlusTree.this, io, pageAddr, 
idx)) {
+lastRow = io.getLookupRow(BPlusTree.this, 
pageAddr, cnt - 1);
+curPageId = pageId;
+
+return true;
+}
+}
+lastRow = null;
+
+if (nextPageId == 0L)
+return false;
+
+long nextPage = acquirePage(nextPageId);
+try {
+long nextPageAddr = readLock(nextPageId, 
nextPage);
+
+assert nextPageAddr != 0L : "next page removed 
while back page is being locked";
+
+try {
+long pa 

[GitHub] [ignite] nizhikov commented on a change in pull request #6982: IGNITE-12295: Index partition purge for file-based rebalancing

2019-10-21 Thread GitBox
nizhikov commented on a change in pull request #6982: IGNITE-12295: Index 
partition purge for file-based rebalancing
URL: https://github.com/apache/ignite/pull/6982#discussion_r336940474
 
 

 ##
 File path: 
modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/PurgeRecord.java
 ##
 @@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagemem.wal.record.delta;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.pagemem.PageMemory;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * Removal of multiple items (designated by their 0-based indexes) in an index 
leaf page.
+ */
+public class PurgeRecord extends PageDeltaRecord {
+/** Indexes of items to remove from the page. */
+private int[] items;
+
+/** Number of used elements in items array. */
+private int itemsCnt;
+
+/** Resulting count of items that should remain on the page. */
+private int cnt;
+
+/**
+ * @param grpId Cache group ID.
+ * @param pageId Page ID.
+ * @param items Indexes of items to remove on the page.
+ * @param itemsCnt Number of used elements in {@code items} array.
+ * @param cnt Resulting count of items that should remain on the page.
+ */
+public PurgeRecord(int grpId, long pageId, int[] items, int itemsCnt, int 
cnt) {
+super(grpId, pageId);
+
+assert itemsCnt > 0 && itemsCnt <= items.length;
+
+this.items = items;
+this.itemsCnt = itemsCnt;
+this.cnt = cnt;
+}
+
+/** {@inheritDoc} */
+@Override public void applyDelta(PageMemory pageMem, long pageAddr) throws 
IgniteCheckedException {
+BPlusIO io = PageIO.getBPlusIO(pageAddr);
+
+int cnt0 = io.getCount(pageAddr);
+
+assert cnt0 == cnt + itemsCnt : "unexpected count: cnt0=" + cnt0 + ", 
cnt=" + cnt + ", itemsCnt=" + itemsCnt;
+
+for (int i = 0; i < itemsCnt; ++i) {
 
 Review comment:
   Why it's "++i" instead of regular "i++"?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [ignite] nizhikov commented on a change in pull request #6982: IGNITE-12295: Index partition purge for file-based rebalancing

2019-10-21 Thread GitBox
nizhikov commented on a change in pull request #6982: IGNITE-12295: Index 
partition purge for file-based rebalancing
URL: https://github.com/apache/ignite/pull/6982#discussion_r336939868
 
 

 ##
 File path: 
modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/PurgeRecord.java
 ##
 @@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagemem.wal.record.delta;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.pagemem.PageMemory;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * Removal of multiple items (designated by their 0-based indexes) in an index 
leaf page.
+ */
+public class PurgeRecord extends PageDeltaRecord {
+/** Indexes of items to remove from the page. */
+private int[] items;
+
+/** Number of used elements in items array. */
+private int itemsCnt;
+
+/** Resulting count of items that should remain on the page. */
+private int cnt;
+
+/**
+ * @param grpId Cache group ID.
+ * @param pageId Page ID.
+ * @param items Indexes of items to remove on the page.
+ * @param itemsCnt Number of used elements in {@code items} array.
+ * @param cnt Resulting count of items that should remain on the page.
+ */
+public PurgeRecord(int grpId, long pageId, int[] items, int itemsCnt, int 
cnt) {
+super(grpId, pageId);
+
+assert itemsCnt > 0 && itemsCnt <= items.length;
+
+this.items = items;
+this.itemsCnt = itemsCnt;
+this.cnt = cnt;
+}
+
+/** {@inheritDoc} */
+@Override public void applyDelta(PageMemory pageMem, long pageAddr) throws 
IgniteCheckedException {
+BPlusIO io = PageIO.getBPlusIO(pageAddr);
+
+int cnt0 = io.getCount(pageAddr);
+
+assert cnt0 == cnt + itemsCnt : "unexpected count: cnt0=" + cnt0 + ", 
cnt=" + cnt + ", itemsCnt=" + itemsCnt;
 
 Review comment:
   This already checked by `PageIO.getBPlusIO` so we don't need additional 
check here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [ignite] nizhikov commented on a change in pull request #6982: IGNITE-12295: Index partition purge for file-based rebalancing

2019-10-21 Thread GitBox
nizhikov commented on a change in pull request #6982: IGNITE-12295: Index 
partition purge for file-based rebalancing
URL: https://github.com/apache/ignite/pull/6982#discussion_r336926788
 
 

 ##
 File path: 
modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/PurgeRecord.java
 ##
 @@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagemem.wal.record.delta;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.pagemem.PageMemory;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * Removal of multiple items (designated by their 0-based indexes) in an index 
leaf page.
+ */
+public class PurgeRecord extends PageDeltaRecord {
+/** Indexes of items to remove from the page. */
+private int[] items;
+
+/** Number of used elements in items array. */
+private int itemsCnt;
+
+/** Resulting count of items that should remain on the page. */
+private int cnt;
 
 Review comment:
   For now, we use `cnt` only for assertion.
   I think we shouldn't store some data on the disk if it used only for 
assertion that can be disabled in production environment.
   
   Please, clarify, why we need this check
   
   ```
   assert cnt0 == cnt + itemsCnt : "unexpected count: cnt0=" + cnt0 + ", cnt=" 
+ cnt + ", itemsCnt=" + itemsCnt;
   ```
   
   If we really need this check it should not be `assert` but regular `if` 
statement with the throw if true .


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [ignite] nizhikov commented on a change in pull request #6982: IGNITE-12295: Index partition purge for file-based rebalancing

2019-10-21 Thread GitBox
nizhikov commented on a change in pull request #6982: IGNITE-12295: Index 
partition purge for file-based rebalancing
URL: https://github.com/apache/ignite/pull/6982#discussion_r336924085
 
 

 ##
 File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
 ##
 @@ -5728,6 +5746,332 @@ protected abstract Result run0(long pageId, long page, 
long pageAddr, BPlusIO
 }
 }
 
+/**
+ * Cursor that filters and removes rows.
+ *
+ * Most rows are expected to be deleted "in place" on a leaf page.
+ * The last row and the rightmost row on a page are deleted starting from 
root.
+ */
+private class PurgeCursor implements GridCursor {
+/** Row filter. */
+private final TreeRowClosure clo;
+
+/** Current page id. */
+private long curPageId = 0L;
+
+/** Last row to get back to if current page turns out removed. */
+private L lastRow = null;
+
+/** Row to remove starting "from root". */
+private L removeRow = null;
+
+/**
+ * Constructor.
+ *
+ * @param clo Row filter.
+ */
+PurgeCursor(TreeRowClosure clo) {
+this.clo = clo;
+}
+
+/**
+ * Initializes the cursor.
+ *
+ * @throws IgniteCheckedException If failed.
+ */
+void start() throws IgniteCheckedException {
+checkDestroyed();
+
+long metaPage = acquirePage(metaPageId);
+try {
+curPageId = getFirstPageId(metaPageId, metaPage, 0);
+}
+finally {
+releasePage(metaPageId, metaPage);
+}
+}
+
+/** {@inheritDoc} */
+@Override public boolean next() throws IgniteCheckedException {
+checkDestroyed();
+
+if (curPageId == 0L && lastRow == null)
+return false;
+
+return next0(curPageId);
+}
+
+/** {@inheritDoc} */
+@Override public Void get() throws IgniteCheckedException {
+checkDestroyed();
+
+try {
+doPurge(curPageId);
+
+if (removeRow != null) {
+doRemove(removeRow, false);
+
+lastRow = removeRow;
+removeRow = null;
+}
+}
+catch (RuntimeException | AssertionError e) {
+throw new BPlusTreeRuntimeException(e, grpId);
+}
+
+return null;
+}
+
+/**
+ * Continues scanning the leaf pages of the tree.
+ *
+ * @param pageId Page id to start with.
+ * @return {@code true} if there is a row(s) to remove.
+ * @throws IgniteCheckedException If failed.
+ */
+private boolean next0(long pageId) throws IgniteCheckedException {
+for (;;) {
+if (pageId == 0L && lastRow != null) {
+ReinitHelper reinit = new ReinitHelper(lastRow);
+
+doFind(reinit);
+
+pageId = reinit.pageId();
+}
+
+long page = acquirePage(pageId);
+try {
+long pageAddr = readLock(pageId, page);
+
+if (pageAddr == 0L) { // this page is deleted. Find from 
root using lastRow.
+pageId = 0L;
+
+continue;
+}
+
+try {
+for (;;) {
+BPlusIO io = io(pageAddr);
+
+assert io.isLeaf();
+
+long nextPageId = io.getForward(pageAddr);
+
+int cnt = io.getCount(pageAddr);
+
+if (cnt == 0) {
+assert nextPageId == 0L;
+
+return false;
+}
+
+for (int idx = 0; idx < cnt; idx++) {
+if (clo.apply(BPlusTree.this, io, pageAddr, 
idx)) {
+lastRow = io.getLookupRow(BPlusTree.this, 
pageAddr, cnt - 1);
+curPageId = pageId;
+
+return true;
+}
+}
+lastRow = null;
+
+if (nextPageId == 0L)
+return false;
+
+long nextPage = acquirePage(nextPageId);
+try {
+long nextPageAddr = readLock(nextPageId, 
nextPage);
+
+assert nextPageAddr != 0L : "next page removed 
while back page is being locked";
+
+try {
+long pa 

[GitHub] [ignite] nizhikov commented on a change in pull request #6982: IGNITE-12295: Index partition purge for file-based rebalancing

2019-10-21 Thread GitBox
nizhikov commented on a change in pull request #6982: IGNITE-12295: Index 
partition purge for file-based rebalancing
URL: https://github.com/apache/ignite/pull/6982#discussion_r336924085
 
 

 ##
 File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
 ##
 @@ -5728,6 +5746,332 @@ protected abstract Result run0(long pageId, long page, 
long pageAddr, BPlusIO
 }
 }
 
+/**
+ * Cursor that filters and removes rows.
+ *
+ * Most rows are expected to be deleted "in place" on a leaf page.
+ * The last row and the rightmost row on a page are deleted starting from 
root.
+ */
+private class PurgeCursor implements GridCursor {
+/** Row filter. */
+private final TreeRowClosure clo;
+
+/** Current page id. */
+private long curPageId = 0L;
+
+/** Last row to get back to if current page turns out removed. */
+private L lastRow = null;
+
+/** Row to remove starting "from root". */
+private L removeRow = null;
+
+/**
+ * Constructor.
+ *
+ * @param clo Row filter.
+ */
+PurgeCursor(TreeRowClosure clo) {
+this.clo = clo;
+}
+
+/**
+ * Initializes the cursor.
+ *
+ * @throws IgniteCheckedException If failed.
+ */
+void start() throws IgniteCheckedException {
+checkDestroyed();
+
+long metaPage = acquirePage(metaPageId);
+try {
+curPageId = getFirstPageId(metaPageId, metaPage, 0);
+}
+finally {
+releasePage(metaPageId, metaPage);
+}
+}
+
+/** {@inheritDoc} */
+@Override public boolean next() throws IgniteCheckedException {
+checkDestroyed();
+
+if (curPageId == 0L && lastRow == null)
+return false;
+
+return next0(curPageId);
+}
+
+/** {@inheritDoc} */
+@Override public Void get() throws IgniteCheckedException {
+checkDestroyed();
+
+try {
+doPurge(curPageId);
+
+if (removeRow != null) {
+doRemove(removeRow, false);
+
+lastRow = removeRow;
+removeRow = null;
+}
+}
+catch (RuntimeException | AssertionError e) {
+throw new BPlusTreeRuntimeException(e, grpId);
+}
+
+return null;
+}
+
+/**
+ * Continues scanning the leaf pages of the tree.
+ *
+ * @param pageId Page id to start with.
+ * @return {@code true} if there is a row(s) to remove.
+ * @throws IgniteCheckedException If failed.
+ */
+private boolean next0(long pageId) throws IgniteCheckedException {
+for (;;) {
+if (pageId == 0L && lastRow != null) {
+ReinitHelper reinit = new ReinitHelper(lastRow);
+
+doFind(reinit);
+
+pageId = reinit.pageId();
+}
+
+long page = acquirePage(pageId);
+try {
+long pageAddr = readLock(pageId, page);
+
+if (pageAddr == 0L) { // this page is deleted. Find from 
root using lastRow.
+pageId = 0L;
+
+continue;
+}
+
+try {
+for (;;) {
+BPlusIO io = io(pageAddr);
+
+assert io.isLeaf();
+
+long nextPageId = io.getForward(pageAddr);
+
+int cnt = io.getCount(pageAddr);
+
+if (cnt == 0) {
+assert nextPageId == 0L;
+
+return false;
+}
+
+for (int idx = 0; idx < cnt; idx++) {
+if (clo.apply(BPlusTree.this, io, pageAddr, 
idx)) {
+lastRow = io.getLookupRow(BPlusTree.this, 
pageAddr, cnt - 1);
+curPageId = pageId;
+
+return true;
+}
+}
+lastRow = null;
+
+if (nextPageId == 0L)
+return false;
+
+long nextPage = acquirePage(nextPageId);
+try {
+long nextPageAddr = readLock(nextPageId, 
nextPage);
+
+assert nextPageAddr != 0L : "next page removed 
while back page is being locked";
+
+try {
+long pa 

[GitHub] [ignite] nizhikov commented on a change in pull request #6982: IGNITE-12295: Index partition purge for file-based rebalancing

2019-10-21 Thread GitBox
nizhikov commented on a change in pull request #6982: IGNITE-12295: Index 
partition purge for file-based rebalancing
URL: https://github.com/apache/ignite/pull/6982#discussion_r336922320
 
 

 ##
 File path: 
modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/PurgeRecord.java
 ##
 @@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagemem.wal.record.delta;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.pagemem.PageMemory;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * Removal of multiple items (designated by their 0-based indexes) in an index 
leaf page.
+ */
+public class PurgeRecord extends PageDeltaRecord {
+/** Indexes of items to remove from the page. */
+private int[] items;
+
+/** Number of used elements in items array. */
+private int itemsCnt;
+
+/** Resulting count of items that should remain on the page. */
+private int cnt;
+
+/**
+ * @param grpId Cache group ID.
+ * @param pageId Page ID.
+ * @param items Indexes of items to remove on the page.
+ * @param itemsCnt Number of used elements in {@code items} array.
+ * @param cnt Resulting count of items that should remain on the page.
 
 Review comment:
   typo: Let's remove `Resulting` word -> "Count of items that should remain on 
the page."


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [ignite] nizhikov commented on a change in pull request #6982: IGNITE-12295: Index partition purge for file-based rebalancing

2019-10-21 Thread GitBox
nizhikov commented on a change in pull request #6982: IGNITE-12295: Index 
partition purge for file-based rebalancing
URL: https://github.com/apache/ignite/pull/6982#discussion_r336920198
 
 

 ##
 File path: 
modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/PurgeRecord.java
 ##
 @@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagemem.wal.record.delta;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.pagemem.PageMemory;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * Removal of multiple items (designated by their 0-based indexes) in an index 
leaf page.
+ */
+public class PurgeRecord extends PageDeltaRecord {
+/** Indexes of items to remove from the page. */
+private int[] items;
+
+/** Number of used elements in items array. */
+private int itemsCnt;
+
+/** Resulting count of items that should remain on the page. */
+private int cnt;
+
+/**
+ * @param grpId Cache group ID.
+ * @param pageId Page ID.
+ * @param items Indexes of items to remove on the page.
 
 Review comment:
   typo `on the page` -> `from the page`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [ignite] nizhikov commented on a change in pull request #6982: IGNITE-12295: Index partition purge for file-based rebalancing

2019-10-21 Thread GitBox
nizhikov commented on a change in pull request #6982: IGNITE-12295: Index 
partition purge for file-based rebalancing
URL: https://github.com/apache/ignite/pull/6982#discussion_r336915993
 
 

 ##
 File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
 ##
 @@ -5728,6 +5746,332 @@ protected abstract Result run0(long pageId, long page, 
long pageAddr, BPlusIO
 }
 }
 
+/**
+ * Cursor that filters and removes rows.
+ *
+ * Most rows are expected to be deleted "in place" on a leaf page.
+ * The last row and the rightmost row on a page are deleted starting from 
root.
+ */
+private class PurgeCursor implements GridCursor {
+/** Row filter. */
+private final TreeRowClosure clo;
+
+/** Current page id. */
+private long curPageId = 0L;
+
+/** Last row to get back to if current page turns out removed. */
+private L lastRow = null;
+
+/** Row to remove starting "from root". */
+private L removeRow = null;
+
+/**
+ * Constructor.
+ *
+ * @param clo Row filter.
+ */
+PurgeCursor(TreeRowClosure clo) {
+this.clo = clo;
+}
+
+/**
+ * Initializes the cursor.
+ *
+ * @throws IgniteCheckedException If failed.
+ */
+void start() throws IgniteCheckedException {
+checkDestroyed();
+
+long metaPage = acquirePage(metaPageId);
+try {
+curPageId = getFirstPageId(metaPageId, metaPage, 0);
+}
+finally {
+releasePage(metaPageId, metaPage);
+}
+}
+
+/** {@inheritDoc} */
+@Override public boolean next() throws IgniteCheckedException {
+checkDestroyed();
+
+if (curPageId == 0L && lastRow == null)
+return false;
+
+return next0(curPageId);
+}
+
+/** {@inheritDoc} */
+@Override public Void get() throws IgniteCheckedException {
+checkDestroyed();
+
+try {
+doPurge(curPageId);
+
+if (removeRow != null) {
+doRemove(removeRow, false);
+
+lastRow = removeRow;
+removeRow = null;
+}
+}
+catch (RuntimeException | AssertionError e) {
+throw new BPlusTreeRuntimeException(e, grpId);
+}
+
+return null;
+}
+
+/**
+ * Continues scanning the leaf pages of the tree.
+ *
+ * @param pageId Page id to start with.
+ * @return {@code true} if there is a row(s) to remove.
+ * @throws IgniteCheckedException If failed.
+ */
+private boolean next0(long pageId) throws IgniteCheckedException {
+for (;;) {
+if (pageId == 0L && lastRow != null) {
+ReinitHelper reinit = new ReinitHelper(lastRow);
+
+doFind(reinit);
+
+pageId = reinit.pageId();
+}
+
+long page = acquirePage(pageId);
+try {
+long pageAddr = readLock(pageId, page);
+
+if (pageAddr == 0L) { // this page is deleted. Find from 
root using lastRow.
+pageId = 0L;
+
+continue;
+}
+
+try {
+for (;;) {
+BPlusIO io = io(pageAddr);
+
+assert io.isLeaf();
+
+long nextPageId = io.getForward(pageAddr);
+
+int cnt = io.getCount(pageAddr);
+
+if (cnt == 0) {
+assert nextPageId == 0L;
+
+return false;
+}
+
+for (int idx = 0; idx < cnt; idx++) {
+if (clo.apply(BPlusTree.this, io, pageAddr, 
idx)) {
+lastRow = io.getLookupRow(BPlusTree.this, 
pageAddr, cnt - 1);
+curPageId = pageId;
+
+return true;
+}
+}
+lastRow = null;
+
+if (nextPageId == 0L)
+return false;
+
+long nextPage = acquirePage(nextPageId);
+try {
+long nextPageAddr = readLock(nextPageId, 
nextPage);
+
+assert nextPageAddr != 0L : "next page removed 
while back page is being locked";
+
+try {
+long pa 

[GitHub] [ignite] nizhikov commented on a change in pull request #6982: IGNITE-12295: Index partition purge for file-based rebalancing

2019-10-21 Thread GitBox
nizhikov commented on a change in pull request #6982: IGNITE-12295: Index 
partition purge for file-based rebalancing
URL: https://github.com/apache/ignite/pull/6982#discussion_r336910596
 
 

 ##
 File path: 
modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/PurgeRecord.java
 ##
 @@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagemem.wal.record.delta;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.pagemem.PageMemory;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * Removal of multiple items (designated by their 0-based indexes) in an index 
leaf page.
+ */
+public class PurgeRecord extends PageDeltaRecord {
+/** Indexes of items to remove from the page. */
+private int[] items;
+
+/** Number of used elements in items array. */
+private int itemsCnt;
 
 Review comment:
   We assume that `itemsCnt` can be written as a short, but stores it as an int.
   
   1. Let's store it as `short` field.
   2. Le'ts assert in the constructor that `itemsCnt >= Short.MIN_VALUE && 
itemsCnt <= Short.MAX_VALUE` to prevent convertion error.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [ignite] nizhikov commented on a change in pull request #6982: IGNITE-12295: Index partition purge for file-based rebalancing

2019-10-21 Thread GitBox
nizhikov commented on a change in pull request #6982: IGNITE-12295: Index 
partition purge for file-based rebalancing
URL: https://github.com/apache/ignite/pull/6982#discussion_r336907273
 
 

 ##
 File path: 
modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/PurgeRecord.java
 ##
 @@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagemem.wal.record.delta;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.pagemem.PageMemory;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * Removal of multiple items (designated by their 0-based indexes) in an index 
leaf page.
+ */
+public class PurgeRecord extends PageDeltaRecord {
+/** Indexes of items to remove from the page. */
+private int[] items;
+
+/** Number of used elements in items array. */
+private int itemsCnt;
+
+/** Resulting count of items that should remain on the page. */
+private int cnt;
+
+/**
+ * @param grpId Cache group ID.
+ * @param pageId Page ID.
+ * @param items Indexes of items to remove on the page.
+ * @param itemsCnt Number of used elements in {@code items} array.
+ * @param cnt Resulting count of items that should remain on the page.
+ */
+public PurgeRecord(int grpId, long pageId, int[] items, int itemsCnt, int 
cnt) {
+super(grpId, pageId);
+
+assert itemsCnt > 0 && itemsCnt <= items.length;
+
+this.items = items;
+this.itemsCnt = itemsCnt;
+this.cnt = cnt;
+}
+
+/** {@inheritDoc} */
+@Override public void applyDelta(PageMemory pageMem, long pageAddr) throws 
IgniteCheckedException {
+BPlusIO io = PageIO.getBPlusIO(pageAddr);
+
+int cnt0 = io.getCount(pageAddr);
+
+assert cnt0 == cnt + itemsCnt : "unexpected count: cnt0=" + cnt0 + ", 
cnt=" + cnt + ", itemsCnt=" + itemsCnt;
 
 Review comment:
   Let's assert that `pageAddr` points to the expected page type.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services