ascherbakoff commented on a change in pull request #7705:
URL: https://github.com/apache/ignite/pull/7705#discussion_r468474840



##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
##########
@@ -824,17 +842,31 @@ private void preloadEntries(AffinityTopologyVersion 
topVer, int p,
      * @param row Data row.
      * @param topVer Topology version.
      * @return {@code True} if the initial value was set for the specified 
cache entry.
+     * @param node Node which sent entry.
      * @throws IgniteInterruptedCheckedException If interrupted.
      */
-    private boolean preloadEntry(CacheDataRow row, AffinityTopologyVersion 
topVer) throws IgniteCheckedException {
+    private boolean preloadEntry(CacheDataRow row, AffinityTopologyVersion 
topVer,
+        ClusterNode node) throws IgniteCheckedException {
         assert !grp.mvccEnabled();
         assert ctx.database().checkpointLockIsHeldByThread();
 
-        rebalanceFut.receivedKeys.incrementAndGet();
+        RebalanceFuture fut = rebalanceFut;
+
+        fut.receivedKeys.incrementAndGet();
+
+        CacheObject val = row.value();
+
+        CacheObjectContext cacheObjCtx = grp.cacheObjectContext();
+        long bytes = row.key().valueBytes(cacheObjCtx).length + (isNull(val) ? 
0 : val.valueBytes(cacheObjCtx).length);
+
+        boolean hist = 
fut.assignments.get(node).partitions().hasHistorical(row.partition());
+
+        (hist ? fut.histReceivedKeys : 
fut.fullReceivedKeys).get(node.id()).increment();
+        (hist ? fut.histReceivedBytes : 
fut.fullReceivedBytes).get(node.id()).add(bytes);
 
         updateGroupMetrics();
 
-        GridCacheContext cctx = grp.sharedGroup() ? 
ctx.cacheContext(row.cacheId()) : grp.singleCacheContext();
+        GridCacheContext cctx = grp.sharedGroup() ? 
this.ctx.cacheContext(row.cacheId()) : grp.singleCacheContext();

Review comment:
       That is the point in adding `this.` here ? Why you don't add `this.` in 
the line 851 ?
   If I recall correctly we have no code style rule requiring `this.` for a 
class fields.

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
##########
@@ -68,7 +68,7 @@ public GridCachePreloaderAdapter(CacheGroupContext grp) {
 
         log = ctx.logger(getClass());
 
-        finFut = new GridFinishedFuture();
+        finFut = new GridFinishedFuture<>(true);

Review comment:
       I don't see obvious reason for this change. Can you explain it ?

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
##########
@@ -780,7 +785,19 @@ private void mvccPreloadEntries(AffinityTopologyVersion 
topVer, ClusterNode node
                         if (cctx != null) {
                             mvccPreloadEntry(cctx, node, entryHist, topVer, p);
 
-                            rebalanceFut.receivedKeys.incrementAndGet();
+                            RebalanceFuture fut = rebalanceFut;
+
+                            fut.receivedKeys.incrementAndGet();
+
+                            long bytes = 0;
+
+                            for (GridCacheMvccEntryInfo entryInfo : entryHist)
+                                bytes += 
entryInfo.marshalledSize(grp.cacheObjectContext());
+
+                            boolean hist = 
fut.assignments.get(node).partitions().hasHistorical(p);
+
+                            (hist ? fut.histReceivedKeys : 
fut.fullReceivedKeys).get(node.id()).increment();
+                            (hist ? fut.histReceivedBytes : 
fut.fullReceivedBytes).get(node.id()).add(bytes);

Review comment:
       You have introduced a lot of copy paste code and exposed object's 
internal state, I don't like how this is implemented.
   
   Received bytes size calculation can be done before calling to 
(mvcc)preloadEntries (line  598) in the folloing way, using the trick with 
wrapping iterator to avoid double iteration:
   
   ```
                                   Iterator<GridCacheEntryInfo> infos = 
e.getValue().infos().iterator();
   
                                   final int[] size = new int[1];
                                   GridIterableAdapter<GridCacheEntryInfo> 
wrapper = new GridIterableAdapter<GridCacheEntryInfo>(infos) {
                                       @Override public GridCacheEntryInfo 
nextX() throws IgniteCheckedException {
                                           GridCacheEntryInfo info = 
super.next();
   
                                           size[0] += 
info.marshalledSize(ctx.cacheObjectContext(info.cacheId()));
   
                                           return info;
                                       }
                                   };
   
                                   try {
                                       if (grp.mvccEnabled())
                                           mvccPreloadEntries(topVer, node, p, 
wrapper);
                                       else
                                           preloadEntries(topVer, p, wrapper);
                                   }
                                   catch (GridDhtInvalidPartitionException 
ignored) {
                                       if (log.isDebugEnabled())
                                           log.debug("Partition became invalid 
during rebalancing (will ignore): " + p);
                                   }
   
                                   fut.onReceivedBytes(p, size[0], node);
   ```
   
   The whole block 788-800 can be replaced with:
   
                                   fut.onReceivedKeys(p, 1, node);

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
##########
@@ -1641,17 +1723,41 @@ private synchronized void partitionDone(UUID nodeId, 
int p, boolean own) {
             assert rmvd : "Partition already done [grp=" + 
grp.cacheOrGroupName() + ", fromNode=" + nodeId +
                 ", part=" + p + ", left=" + parts + "]";
 
-                if (rmvd)
-                    partitionsLeft.decrementAndGet();
+            if (rmvd)
+                partitionsLeft.decrementAndGet();
 
             if (parts.isEmpty()) {
                 int remainingRoutines = remaining.size() - 1;
 
-                U.log(log, "Completed " + ((remainingRoutines == 0 ? "(final) 
" : "") +
+                int fullParts = fullRebalancingParts.get(nodeId).size();
+                int histParts = histRebalancingParts.get(nodeId).size();
+
+                long fullEntries = fullReceivedKeys.get(nodeId).sum();
+                long histEntries = histReceivedKeys.get(nodeId).sum();
+
+                long fullBytes = fullReceivedBytes.get(nodeId).sum();
+                long histBytes = histReceivedBytes.get(nodeId).sum();
+
+                long duration = System.currentTimeMillis() - startTime;
+                long durationSec = Math.max(1, 
TimeUnit.MILLISECONDS.toSeconds(duration));
+
+                U.log(log, "Completed " + (remainingRoutines == 0 ? "(final) " 
: "") +
                     "rebalancing [grp=" + grp.cacheOrGroupName() +
                     ", supplier=" + nodeId +
+                    ", partitions=" + (fullParts + histParts) +
+                    ", entries=" + (fullEntries + histEntries) +
+                    ", duration=" + duration + "ms" +
+                    ", bytesRcvd=" + U.humanReadableByteCount(fullBytes + 
histBytes) +
+                    ", avgSpeed=" + U.humanReadableByteCount((fullBytes + 
histBytes) / durationSec) + "/sec" +

Review comment:
       This is not a correct definition of rebalancing speed and may be 
confusing to a user.
   Actually it is `bandwidth`
   

##########
File path: 
modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java
##########
@@ -167,9 +167,9 @@ public void record(Class<?>... recordClasses) {
      * @param stopRecord Stop record flag.
      * @return Recorded messages.
      */
-    public List<Object> recordedMessages(boolean stopRecord) {
+    public List<T2<ClusterNode, Object>> recordedMessages(boolean stopRecord) {

Review comment:
       By this change you have triggered a lot of dependant changes unrelated 
to the patch, making it bigger without real value.
   Can you revert this change and fix your test to avoid using this method ?
   You can easily implement own recording using blockMessages and a predicate 
returning false.

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
##########
@@ -1641,17 +1723,41 @@ private synchronized void partitionDone(UUID nodeId, 
int p, boolean own) {
             assert rmvd : "Partition already done [grp=" + 
grp.cacheOrGroupName() + ", fromNode=" + nodeId +
                 ", part=" + p + ", left=" + parts + "]";
 
-                if (rmvd)
-                    partitionsLeft.decrementAndGet();
+            if (rmvd)
+                partitionsLeft.decrementAndGet();
 
             if (parts.isEmpty()) {
                 int remainingRoutines = remaining.size() - 1;
 
-                U.log(log, "Completed " + ((remainingRoutines == 0 ? "(final) 
" : "") +
+                int fullParts = fullRebalancingParts.get(nodeId).size();
+                int histParts = histRebalancingParts.get(nodeId).size();
+
+                long fullEntries = fullReceivedKeys.get(nodeId).sum();

Review comment:
       Can you refactor 1732-1760 to a separate method ?

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
##########
@@ -1641,17 +1723,41 @@ private synchronized void partitionDone(UUID nodeId, 
int p, boolean own) {
             assert rmvd : "Partition already done [grp=" + 
grp.cacheOrGroupName() + ", fromNode=" + nodeId +
                 ", part=" + p + ", left=" + parts + "]";
 
-                if (rmvd)
-                    partitionsLeft.decrementAndGet();
+            if (rmvd)
+                partitionsLeft.decrementAndGet();
 
             if (parts.isEmpty()) {
                 int remainingRoutines = remaining.size() - 1;
 
-                U.log(log, "Completed " + ((remainingRoutines == 0 ? "(final) 
" : "") +
+                int fullParts = fullRebalancingParts.get(nodeId).size();
+                int histParts = histRebalancingParts.get(nodeId).size();

Review comment:
       Let's get rid of fullRebalancingParts and histRebalancingParts. For 
example, you can get a number of full partitions for a node using information 
provided in `fut.rebalancingParts` and `fut.historical` using the filter like:
   `rebalancingParts.get(null).stream().filter(p -> 
!historical.contains(p)).count();`

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
##########
@@ -1108,8 +1140,23 @@ private void updateGroupMetrics() {
         /** Assigment. */
         private final GridDhtPreloaderAssignments assignments;
 
-        /** Partitions which have been scheduled for rebalance from specific 
supplier. */
-        private final Map<ClusterNode, Set<Integer>> rebalancingParts;
+        /** Partitions which have been scheduled for full rebalance from 
specific supplier. */
+        private final Map<UUID, Set<Integer>> fullRebalancingParts;
+
+        /** Partitions which have been scheduled for historical rebalance from 
specific supplier. */
+        private final Map<UUID, Set<Integer>> histRebalancingParts;
+
+        /** Received keys for full rebalance by supplier. */
+        private final Map<UUID, LongAdder> fullReceivedKeys = new 
ConcurrentHashMap<>();
+
+        /** Received bytes for full rebalance by supplier. */
+        private final Map<UUID, LongAdder> fullReceivedBytes = new 
ConcurrentHashMap<>();
+
+        /** Received keys for historical rebalance by suppliers. */
+        private final Map<UUID, LongAdder> histReceivedKeys = new 
ConcurrentHashMap<>();
+
+        /** Received keys for historical rebalance by suppliers. */
+        private final Map<UUID, LongAdder> histReceivedBytes = new 
ConcurrentHashMap<>();

Review comment:
       These counters look like a good candadates for new metrics.
   Check how, for example, RebalanceFuture#receivedBytes is implemented as a 
metric.
   Let's do the same for these.

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
##########
@@ -1409,6 +1456,41 @@ public void requestPartitions() {
                     // Complete sync future only if rebalancing was not 
cancelled.
                     if (res && !grp.preloader().syncFuture().isDone())
                         
((GridFutureAdapter)grp.preloader().syncFuture()).onDone();
+
+                    // Finish rebalance chain.
+                    if (isNull(next) && log.isInfoEnabled()) {

Review comment:
       Where is already a check for next == null in the line 1496.
   No need to check twice, add this code to else block:
   
   if (next != null)
       ...
   else if (log.isInfoEnabled())
      ...
   
   Additionaly, I have a big concern on how you've defined a rebalance chain 
finish. It looks incorrect.
   Consider the scenario:
   1. A preloading is in progress for a chain g1 -> g2 ->g3 for topVer=N
   2. topVer is changed to N+1 while g1 is preloading.
   3. New chain is compatible with the previous for groups g1 and g2, so g1 and 
g2 future are not cancelled, but g3 is not compatible and a future will be 
cancelled. In this scenario you will report finished chain, but this is not 
true -> g1 is still rebalancing.
   
   Let's define a chain finish in the following way:
   
   A chain for topVer=N is finished, if all rebalance futures in the chain are 
done.
   For the above scenario we have two chains: first should be finished after g2 
is rebalanced, second should be finished after g3 is rebalanced.
   
   A chain duration is a time between first demand message send and the time 
when all rebalance futures in the chain are done. Make sure it's correctly 
computed for compatible chains.
   
   
   
   
   

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
##########
@@ -824,17 +842,31 @@ private void preloadEntries(AffinityTopologyVersion 
topVer, int p,
      * @param row Data row.
      * @param topVer Topology version.
      * @return {@code True} if the initial value was set for the specified 
cache entry.
+     * @param node Node which sent entry.
      * @throws IgniteInterruptedCheckedException If interrupted.
      */
-    private boolean preloadEntry(CacheDataRow row, AffinityTopologyVersion 
topVer) throws IgniteCheckedException {
+    private boolean preloadEntry(CacheDataRow row, AffinityTopologyVersion 
topVer,
+        ClusterNode node) throws IgniteCheckedException {
         assert !grp.mvccEnabled();
         assert ctx.database().checkpointLockIsHeldByThread();
 
-        rebalanceFut.receivedKeys.incrementAndGet();
+        RebalanceFuture fut = rebalanceFut;
+
+        fut.receivedKeys.incrementAndGet();
+
+        CacheObject val = row.value();
+
+        CacheObjectContext cacheObjCtx = grp.cacheObjectContext();
+        long bytes = row.key().valueBytes(cacheObjCtx).length + (isNull(val) ? 
0 : val.valueBytes(cacheObjCtx).length);
+
+        boolean hist = 
fut.assignments.get(node).partitions().hasHistorical(row.partition());
+
+        (hist ? fut.histReceivedKeys : 
fut.fullReceivedKeys).get(node.id()).increment();
+        (hist ? fut.histReceivedBytes : 
fut.fullReceivedBytes).get(node.id()).add(bytes);

Review comment:
       See a similar comment for the line 788.
   The whole block should be removed, instead you can add a line just after 
preloadEntries:
   
   `fut.onKeysReceived(p, e.getValue().infos().size());`

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
##########
@@ -1786,12 +1892,15 @@ public boolean 
compatibleWith(GridDhtPreloaderAssignments newAssignments) {
             Set<Integer> p1 = new HashSet<>();
 
             // Not compatible if a supplier has left.
-            for (ClusterNode node : rebalancingParts.keySet()) {
-                if 
(!grp.cacheObjectContext().kernalContext().discovery().alive(node))
+            for (UUID nodeId : fullRebalancingParts.keySet()) {

Review comment:
       You have broken chain compatibility logic.
   Both full and historical suppliers must be checked.
   Can you revert this change and keep rebalancingParts untouched ?
   fullRebalancingParts and histRebalancingParts also should be removed 
completely, because they are not needed.
   

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
##########
@@ -447,7 +452,7 @@ public void registerSupplyMessage(final UUID nodeId, final 
GridDhtPartitionSuppl
             for (Integer p : supplyMsg.infos().keySet()) {
                 fut.queued.get(p).increment();
 
-                if (fut.historical.contains(p))
+                if (fut.histRebalancingParts.get(nodeId).contains(p))

Review comment:
       This change should be reverted, see below.

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
##########
@@ -1641,17 +1723,41 @@ private synchronized void partitionDone(UUID nodeId, 
int p, boolean own) {
             assert rmvd : "Partition already done [grp=" + 
grp.cacheOrGroupName() + ", fromNode=" + nodeId +
                 ", part=" + p + ", left=" + parts + "]";
 
-                if (rmvd)
-                    partitionsLeft.decrementAndGet();
+            if (rmvd)
+                partitionsLeft.decrementAndGet();
 
             if (parts.isEmpty()) {
                 int remainingRoutines = remaining.size() - 1;
 
-                U.log(log, "Completed " + ((remainingRoutines == 0 ? "(final) 
" : "") +
+                int fullParts = fullRebalancingParts.get(nodeId).size();
+                int histParts = histRebalancingParts.get(nodeId).size();
+
+                long fullEntries = fullReceivedKeys.get(nodeId).sum();
+                long histEntries = histReceivedKeys.get(nodeId).sum();
+
+                long fullBytes = fullReceivedBytes.get(nodeId).sum();
+                long histBytes = histReceivedBytes.get(nodeId).sum();
+
+                long duration = System.currentTimeMillis() - startTime;
+                long durationSec = Math.max(1, 
TimeUnit.MILLISECONDS.toSeconds(duration));
+
+                U.log(log, "Completed " + (remainingRoutines == 0 ? "(final) " 
: "") +
                     "rebalancing [grp=" + grp.cacheOrGroupName() +
                     ", supplier=" + nodeId +
+                    ", partitions=" + (fullParts + histParts) +
+                    ", entries=" + (fullEntries + histEntries) +
+                    ", duration=" + duration + "ms" +

Review comment:
       Rebalancing duration can be rather big and not human readable in millis.
   Can you implement utility method durationToString to humanReadableByteCount ?
   It should output human readable durations, like 1h20m14s. Check for example
   `PeriodFormat.getDefault().print(duration.toPeriod())`

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
##########
@@ -1641,17 +1723,41 @@ private synchronized void partitionDone(UUID nodeId, 
int p, boolean own) {
             assert rmvd : "Partition already done [grp=" + 
grp.cacheOrGroupName() + ", fromNode=" + nodeId +
                 ", part=" + p + ", left=" + parts + "]";
 
-                if (rmvd)
-                    partitionsLeft.decrementAndGet();
+            if (rmvd)
+                partitionsLeft.decrementAndGet();
 
             if (parts.isEmpty()) {
                 int remainingRoutines = remaining.size() - 1;
 
-                U.log(log, "Completed " + ((remainingRoutines == 0 ? "(final) 
" : "") +
+                int fullParts = fullRebalancingParts.get(nodeId).size();
+                int histParts = histRebalancingParts.get(nodeId).size();
+
+                long fullEntries = fullReceivedKeys.get(nodeId).sum();
+                long histEntries = histReceivedKeys.get(nodeId).sum();
+
+                long fullBytes = fullReceivedBytes.get(nodeId).sum();
+                long histBytes = histReceivedBytes.get(nodeId).sum();
+
+                long duration = System.currentTimeMillis() - startTime;
+                long durationSec = Math.max(1, 
TimeUnit.MILLISECONDS.toSeconds(duration));
+
+                U.log(log, "Completed " + (remainingRoutines == 0 ? "(final) " 
: "") +
                     "rebalancing [grp=" + grp.cacheOrGroupName() +
                     ", supplier=" + nodeId +
+                    ", partitions=" + (fullParts + histParts) +
+                    ", entries=" + (fullEntries + histEntries) +
+                    ", duration=" + duration + "ms" +
+                    ", bytesRcvd=" + U.humanReadableByteCount(fullBytes + 
histBytes) +
+                    ", avgSpeed=" + U.humanReadableByteCount((fullBytes + 
histBytes) / durationSec) + "/sec" +
+                    ", histPartitions=" + histParts +
+                    ", histEntries=" + histEntries +
+                    ", histBytesRcvd=" + U.humanReadableByteCount(histBytes) +
+                    ", fullPartitions=" + fullParts +
+                    ", fullEntries=" + fullEntries +
+                    ", fullBytesRcvd=" + U.humanReadableByteCount(fullBytes) +
                     ", topVer=" + topologyVersion() +
-                    ", progress=" + (routines - remainingRoutines) + "/" + 
routines + "]"));
+                    ", progress=" + (routines - remainingRoutines) + "/" + 
routines +
+                    ", rebalanceId=" + rebalanceId + "]");

Review comment:
       Let's put rebalanceId at the start of log string.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to