ascherbakoff commented on a change in pull request #7705:
URL: https://github.com/apache/ignite/pull/7705#discussion_r473817435
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
##########
@@ -824,13 +876,15 @@ private void preloadEntries(AffinityTopologyVersion
topVer, int p,
* @param row Data row.
* @param topVer Topology version.
* @return {@code True} if the initial value was set for the specified
cache entry.
+ * @param node Node which sent entry.
* @throws IgniteInterruptedCheckedException If interrupted.
*/
- private boolean preloadEntry(CacheDataRow row, AffinityTopologyVersion
topVer) throws IgniteCheckedException {
+ private boolean preloadEntry(CacheDataRow row, AffinityTopologyVersion
topVer,
+ ClusterNode node) throws IgniteCheckedException {
assert !grp.mvccEnabled();
assert ctx.database().checkpointLockIsHeldByThread();
- rebalanceFut.receivedKeys.incrementAndGet();
+ rebalanceFut.onReceivedKeys(row.partition(), node);
Review comment:
onReceivedKeys can be called for a whole batch to avoid overhead:
```
preloadEntries(topVer, p, infos);
rebalanceFut.onReceivedKeys(p, infos.size(), node);
```
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
##########
@@ -1641,17 +1723,41 @@ private synchronized void partitionDone(UUID nodeId,
int p, boolean own) {
assert rmvd : "Partition already done [grp=" +
grp.cacheOrGroupName() + ", fromNode=" + nodeId +
", part=" + p + ", left=" + parts + "]";
- if (rmvd)
- partitionsLeft.decrementAndGet();
+ if (rmvd)
+ partitionsLeft.decrementAndGet();
if (parts.isEmpty()) {
int remainingRoutines = remaining.size() - 1;
- U.log(log, "Completed " + ((remainingRoutines == 0 ? "(final)
" : "") +
+ int fullParts = fullRebalancingParts.get(nodeId).size();
+ int histParts = histRebalancingParts.get(nodeId).size();
+
+ long fullEntries = fullReceivedKeys.get(nodeId).sum();
Review comment:
I mean make a separate function for preparing and printing a log message
starting from line 1721 until 1752.
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
##########
@@ -1837,6 +1935,87 @@ public boolean
compatibleWith(GridDhtPreloaderAssignments newAssignments) {
return true;
}
+ /**
+ * Callback when getting entry from supplier.
+ *
+ * @param p Partition.
+ * @param node Supplier.
+ */
+ private void onReceivedKeys(int p, ClusterNode node) {
+ receivedKeys.incrementAndGet();
+
+ boolean hist = assignments.get(node).partitions().hasHistorical(p);
+ (hist ? histReceivedKeys :
fullReceivedKeys).get(node.id()).increment();
+ }
+
+ /**
+ * Callback when getting size of received entries in bytes.
+ *
+ * @param p Partition.
+ * @param bytes Byte count.
+ * @param node Supplier.
+ */
+ private void onReceivedBytes(int p, long bytes, ClusterNode node) {
+ boolean hist = assignments.get(node).partitions().hasHistorical(p);
+ (hist ? histReceivedBytes :
fullReceivedBytes).get(node.id()).add(bytes);
+ }
+
+ /**
+ * Checks if rebalance chain has completed.
+ *
+ * @return {@code true} if rebalance chain completed.
+ */
+ private boolean isChainFinish() {
+ RebalanceFuture fut = this;
+
+ while (nonNull(fut)) {
+ if (fut.isInitial() || !fut.isDone() || !fut.result())
+ return false;
+ else
+ fut = fut.next;
+ }
+
+ return true;
+ }
Review comment:
It still doesn't look correct to me.
Consider a rebalance chain g1 -> g2 ->g3 for topVer=N, g2 is currently
rebalancing.
Topology is changed to N + 1, g3 is incompatible and cancelled.
After g2 has been finished rebalancing a chain will not be completed because
g3.rebFut.result=false, but it should.
Also, no need to copy reference to _this_.
The correct implementation for me looks something like:
```
private boolean isChainFinished() {
assert isDone() : this;
if (isInitial() || !result())
return false;
RebalanceFuture fut = next;
while (fut != null) {
if (fut.isInitial() || !fut.isDone())
return false;
else
fut = fut.next;
}
return true;
}
```
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
##########
@@ -1837,6 +1935,87 @@ public boolean
compatibleWith(GridDhtPreloaderAssignments newAssignments) {
return true;
}
+ /**
+ * Callback when getting entry from supplier.
+ *
+ * @param p Partition.
+ * @param node Supplier.
+ */
+ private void onReceivedKeys(int p, ClusterNode node) {
+ receivedKeys.incrementAndGet();
+
+ boolean hist = assignments.get(node).partitions().hasHistorical(p);
+ (hist ? histReceivedKeys :
fullReceivedKeys).get(node.id()).increment();
+ }
+
+ /**
+ * Callback when getting size of received entries in bytes.
+ *
+ * @param p Partition.
+ * @param bytes Byte count.
+ * @param node Supplier.
+ */
+ private void onReceivedBytes(int p, long bytes, ClusterNode node) {
+ boolean hist = assignments.get(node).partitions().hasHistorical(p);
+ (hist ? histReceivedBytes :
fullReceivedBytes).get(node.id()).add(bytes);
+ }
+
+ /**
+ * Checks if rebalance chain has completed.
+ *
+ * @return {@code true} if rebalance chain completed.
+ */
+ private boolean isChainFinish() {
+ RebalanceFuture fut = this;
+
+ while (nonNull(fut)) {
+ if (fut.isInitial() || !fut.isDone() || !fut.result())
+ return false;
+ else
+ fut = fut.next;
+ }
+
+ return true;
+ }
+
+ /**
+ * Callback when rebalance chain ends.
+ */
+ private void onChainFinish() {
Review comment:
better onChainFinished
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]