keith-turner commented on a change in pull request #2132:
URL: https://github.com/apache/accumulo/pull/2132#discussion_r644308904



##########
File path: 
server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/DeadCompactionDetector.java
##########
@@ -85,31 +103,55 @@ private void detectDeadCompactions() {
 
     running.forEach((ecid) -> {
       if (tabletCompactions.remove(ecid) != null) {
-        log.trace("Removed {} running on a compactor", ecid);
+        log.trace("Removed compaction {} running on a compactor", ecid);
+      }
+      if (this.deadCompactions.remove(ecid) != null) {
+        log.trace("Removed {} from the dead compaction map, it's running on a 
compactor", ecid);
       }
     });
 
     // Determine which compactions are currently committing and remove those
     context.getAmple().getExternalCompactionFinalStates()
-        .map(ecfs -> 
ecfs.getExternalCompactionId()).forEach(tabletCompactions::remove);
+        .map(ecfs -> ecfs.getExternalCompactionId()).forEach(ecid -> {
+          if (tabletCompactions.remove(ecid) != null) {
+            log.trace("Removed compaction {} that is committing", ecid);
+          }
+          if (this.deadCompactions.remove(ecid) != null) {
+            log.trace("Removed {} from the dead compaction map, it's 
committing", ecid);
+          }
+        });
 
-    tabletCompactions
-        .forEach((ecid, extent) -> log.debug("Detected dead compaction {} {}", 
ecid, extent));
+    tabletCompactions.forEach((ecid, extent) -> {
+      log.debug("Possible dead compaction detected {} {}", ecid, extent);
+      this.deadCompactions.putIfAbsent(ecid, System.currentTimeMillis());

Review comment:
       Using a count instead of time may simplify code.  Not sure if the 
following is correct, I think it may increment a count or set it to 1 if it 
does not exists.
   
   ```suggestion
         this.deadCompactions.merge(ecid, 1, Long::sum);
   ```

##########
File path: 
server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/DeadCompactionDetector.java
##########
@@ -85,31 +103,55 @@ private void detectDeadCompactions() {
 
     running.forEach((ecid) -> {
       if (tabletCompactions.remove(ecid) != null) {
-        log.trace("Removed {} running on a compactor", ecid);
+        log.trace("Removed compaction {} running on a compactor", ecid);
+      }
+      if (this.deadCompactions.remove(ecid) != null) {
+        log.trace("Removed {} from the dead compaction map, it's running on a 
compactor", ecid);
       }
     });
 
     // Determine which compactions are currently committing and remove those
     context.getAmple().getExternalCompactionFinalStates()
-        .map(ecfs -> 
ecfs.getExternalCompactionId()).forEach(tabletCompactions::remove);
+        .map(ecfs -> ecfs.getExternalCompactionId()).forEach(ecid -> {
+          if (tabletCompactions.remove(ecid) != null) {
+            log.trace("Removed compaction {} that is committing", ecid);
+          }
+          if (this.deadCompactions.remove(ecid) != null) {
+            log.trace("Removed {} from the dead compaction map, it's 
committing", ecid);
+          }
+        });
 
-    tabletCompactions
-        .forEach((ecid, extent) -> log.debug("Detected dead compaction {} {}", 
ecid, extent));
+    tabletCompactions.forEach((ecid, extent) -> {
+      log.debug("Possible dead compaction detected {} {}", ecid, extent);
+      this.deadCompactions.putIfAbsent(ecid, System.currentTimeMillis());
+    });
 
     // Everything left in tabletCompactions is no longer running anywhere and 
should be failed.
     // Its possible that a compaction committed while going through the steps 
above, if so then
     // that is ok and marking it failed will end up being a no-op.
-    try {
-      coordinator.compactionFailed(tabletCompactions);
-    } catch (UnknownCompactionIdException e) {
-      // One or more Ids was not in the Running compaction list. This is ok to 
ignore.
-    }
+    long now = System.currentTimeMillis();
+    this.deadCompactions.forEach((eci, startTime) -> {
+      if ((now - startTime) > threshold) {
+        // Compaction believed to be dead for two cycles. Fail it.
+        try {
+          log.warn(
+              "Failing compaction {} which is believed to be dead. Last seen 
at {} and not seen since.",
+              eci, startTime);
+          coordinator.compactionFailed(tabletCompactions);
+          this.deadCompactions.remove(eci);
+        } catch (UnknownCompactionIdException e) {
+          // One or more Ids was not in the Running compaction list. This is 
ok to ignore.
+        }
+      }
+    });

Review comment:
       It would be good to only call `coordinator.compactionFailed()` once 
because it uses a batch writer in the impl instead of per entry.  The following 
may do this and uses a count.
   
   ```suggestion
       var compactionsToFail = deadCompactions.entrySet().stream().filter(e -> 
e.getValue() >= 2).map(e -> e.getKey()).collect(toSet());
      tabletCompactions.keySet().retainAll(compactionsToFail);
      coordinator.compactionFailed(tabletCompactions);
      deadCompactions.keySet().removeAll(compactionsToFail);
   ```

##########
File path: 
server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/DeadCompactionDetector.java
##########
@@ -75,6 +82,17 @@ private void detectDeadCompactions() {
       tabletCompactions.forEach((ecid, extent) -> log.trace("Saw {} for {}", 
ecid, extent));
     }
 
+    // Remove from the dead map any compactions that the Tablet's
+    // do not think are running any more.
+    this.deadCompactions.keySet().forEach(eci -> {
+      if (!tabletCompactions.containsKey(eci)) {
+        if (this.deadCompactions.remove(eci) != null)
+          log.trace(
+              "Removed {} from the dead compaction map, no tablet thinks this 
compaction is running",
+              eci);
+      }
+    });

Review comment:
       I think the following does the same thing.
   
   ```suggestion
       this.deadCompactions.keySet().retailAll(tabletCompactions.keySet());
   ```

##########
File path: 
server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/DeadCompactionDetector.java
##########
@@ -84,21 +97,42 @@ private void detectDeadCompactions() {
 
     running.forEach((ecid) -> {
       if (tabletCompactions.remove(ecid) != null) {
-        log.trace("Removed {} running on a compactor", ecid);
+        log.trace("Removed compaction {} running on a compactor", ecid);
+      }
+      if (this.deadCompactions.remove(ecid) != null) {
+        log.trace("Removed {} from the dead compaction map, it's running on a 
compactor", ecid);
       }
     });
 
     // Determine which compactions are currently committing and remove those
     context.getAmple().getExternalCompactionFinalStates()
-        .map(ecfs -> 
ecfs.getExternalCompactionId()).forEach(tabletCompactions::remove);
+        .map(ecfs -> ecfs.getExternalCompactionId()).forEach(ecid -> {
+          if (tabletCompactions.remove(ecid) != null) {
+            log.trace("Removed compaction {} that is committing", ecid);
+          }
+          if (this.deadCompactions.remove(ecid) != null) {
+            log.trace("Removed {} from the dead compaction map, it's 
committing", ecid);
+          }
+        });
 
-    tabletCompactions
-        .forEach((ecid, extent) -> log.debug("Detected dead compaction {} {}", 
ecid, extent));
+    tabletCompactions.forEach((ecid, extent) -> {
+      log.debug("Possible dead compaction detected {} {}", ecid, extent);
+      this.deadCompactions.putIfAbsent(ecid, System.currentTimeMillis());
+      this.deadCompactions.merge(ecid, 1L, Long::sum);
+    });
 
     // Everything left in tabletCompactions is no longer running anywhere and 
should be failed.
     // Its possible that a compaction committed while going through the steps 
above, if so then
     // that is ok and marking it failed will end up being a no-op.
+    Set<ExternalCompactionId> toFail =
+        this.deadCompactions.entrySet().stream().filter(e -> e.getValue() > 
2).map(e -> e.getKey())
+            .collect(Collectors.toCollection(TreeSet::new));
+    tabletCompactions.keySet().retainAll(toFail);
+    tabletCompactions.forEach((eci, v) -> {
+      log.warn("Compaction {} believed to be dead, failing it.", eci);

Review comment:
       Maybe make this info.  Thinking that compactor processes dying will be 
somewhat normal/expected so may not want to warn.
   
   ```suggestion
         log.info("Compaction {} believed to be dead, failing it.", eci);
   ```

##########
File path: 
server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/DeadCompactionDetector.java
##########
@@ -84,21 +97,42 @@ private void detectDeadCompactions() {
 
     running.forEach((ecid) -> {
       if (tabletCompactions.remove(ecid) != null) {
-        log.trace("Removed {} running on a compactor", ecid);
+        log.trace("Removed compaction {} running on a compactor", ecid);
+      }
+      if (this.deadCompactions.remove(ecid) != null) {
+        log.trace("Removed {} from the dead compaction map, it's running on a 
compactor", ecid);
       }
     });
 
     // Determine which compactions are currently committing and remove those
     context.getAmple().getExternalCompactionFinalStates()
-        .map(ecfs -> 
ecfs.getExternalCompactionId()).forEach(tabletCompactions::remove);
+        .map(ecfs -> ecfs.getExternalCompactionId()).forEach(ecid -> {
+          if (tabletCompactions.remove(ecid) != null) {
+            log.trace("Removed compaction {} that is committing", ecid);
+          }
+          if (this.deadCompactions.remove(ecid) != null) {
+            log.trace("Removed {} from the dead compaction map, it's 
committing", ecid);
+          }
+        });
 
-    tabletCompactions
-        .forEach((ecid, extent) -> log.debug("Detected dead compaction {} {}", 
ecid, extent));
+    tabletCompactions.forEach((ecid, extent) -> {
+      log.debug("Possible dead compaction detected {} {}", ecid, extent);
+      this.deadCompactions.putIfAbsent(ecid, System.currentTimeMillis());

Review comment:
       Think this line can be removed
   
   ```suggestion
   ```

##########
File path: 
server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/DeadCompactionDetector.java
##########
@@ -84,21 +97,42 @@ private void detectDeadCompactions() {
 
     running.forEach((ecid) -> {
       if (tabletCompactions.remove(ecid) != null) {
-        log.trace("Removed {} running on a compactor", ecid);
+        log.trace("Removed compaction {} running on a compactor", ecid);
+      }
+      if (this.deadCompactions.remove(ecid) != null) {
+        log.trace("Removed {} from the dead compaction map, it's running on a 
compactor", ecid);
       }
     });
 
     // Determine which compactions are currently committing and remove those
     context.getAmple().getExternalCompactionFinalStates()
-        .map(ecfs -> 
ecfs.getExternalCompactionId()).forEach(tabletCompactions::remove);
+        .map(ecfs -> ecfs.getExternalCompactionId()).forEach(ecid -> {
+          if (tabletCompactions.remove(ecid) != null) {
+            log.trace("Removed compaction {} that is committing", ecid);
+          }
+          if (this.deadCompactions.remove(ecid) != null) {
+            log.trace("Removed {} from the dead compaction map, it's 
committing", ecid);
+          }
+        });
 
-    tabletCompactions
-        .forEach((ecid, extent) -> log.debug("Detected dead compaction {} {}", 
ecid, extent));
+    tabletCompactions.forEach((ecid, extent) -> {
+      log.debug("Possible dead compaction detected {} {}", ecid, extent);
+      this.deadCompactions.putIfAbsent(ecid, System.currentTimeMillis());
+      this.deadCompactions.merge(ecid, 1L, Long::sum);
+    });
 
     // Everything left in tabletCompactions is no longer running anywhere and 
should be failed.
     // Its possible that a compaction committed while going through the steps 
above, if so then
     // that is ok and marking it failed will end up being a no-op.
+    Set<ExternalCompactionId> toFail =
+        this.deadCompactions.entrySet().stream().filter(e -> e.getValue() > 
2).map(e -> e.getKey())
+            .collect(Collectors.toCollection(TreeSet::new));
+    tabletCompactions.keySet().retainAll(toFail);
+    tabletCompactions.forEach((eci, v) -> {
+      log.warn("Compaction {} believed to be dead, failing it.", eci);

Review comment:
       Yeah I can see that case for making it warn.  I feel it should be INFO 
or WARN, but not sure which. WARN is fine w/ me.

##########
File path: 
server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/DeadCompactionDetector.java
##########
@@ -84,21 +97,42 @@ private void detectDeadCompactions() {
 
     running.forEach((ecid) -> {
       if (tabletCompactions.remove(ecid) != null) {
-        log.trace("Removed {} running on a compactor", ecid);
+        log.trace("Removed compaction {} running on a compactor", ecid);
+      }
+      if (this.deadCompactions.remove(ecid) != null) {
+        log.trace("Removed {} from the dead compaction map, it's running on a 
compactor", ecid);
       }
     });
 
     // Determine which compactions are currently committing and remove those
     context.getAmple().getExternalCompactionFinalStates()
-        .map(ecfs -> 
ecfs.getExternalCompactionId()).forEach(tabletCompactions::remove);
+        .map(ecfs -> ecfs.getExternalCompactionId()).forEach(ecid -> {
+          if (tabletCompactions.remove(ecid) != null) {
+            log.trace("Removed compaction {} that is committing", ecid);
+          }
+          if (this.deadCompactions.remove(ecid) != null) {
+            log.trace("Removed {} from the dead compaction map, it's 
committing", ecid);
+          }
+        });
 
-    tabletCompactions
-        .forEach((ecid, extent) -> log.debug("Detected dead compaction {} {}", 
ecid, extent));
+    tabletCompactions.forEach((ecid, extent) -> {
+      log.debug("Possible dead compaction detected {} {}", ecid, extent);
+      this.deadCompactions.putIfAbsent(ecid, System.currentTimeMillis());
+      this.deadCompactions.merge(ecid, 1L, Long::sum);
+    });
 
     // Everything left in tabletCompactions is no longer running anywhere and 
should be failed.
     // Its possible that a compaction committed while going through the steps 
above, if so then
     // that is ok and marking it failed will end up being a no-op.
+    Set<ExternalCompactionId> toFail =
+        this.deadCompactions.entrySet().stream().filter(e -> e.getValue() > 
2).map(e -> e.getKey())
+            .collect(Collectors.toCollection(TreeSet::new));
+    tabletCompactions.keySet().retainAll(toFail);
+    tabletCompactions.forEach((eci, v) -> {
+      log.warn("Compaction {} believed to be dead, failing it.", eci);

Review comment:
       > Why would that be expected?
   
   I was thinking that on a large cluster, host and processes dying is 
expected.  In general I think Accumulo has a bit too much logging at the warn 
level for things that occur as a routine part of operating a cluster.  If so, 
maybe this makes people ignore warnings.
   
   >  It might not be obvious that if you are having issues with external 
compaction you need to set logging to trace to obtain enough info to figure out 
what the issue might be.
   
   I was thinking that detection of a dead compaction should be at WARN or 
INFO, but was not sure which after @dlmarion comment.  The stuff at trace is 
related to the source information on how it finds dead compactions and will 
routinely log information even when it does not detect a dead compaction.
   

##########
File path: 
server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/DeadCompactionDetector.java
##########
@@ -84,21 +97,42 @@ private void detectDeadCompactions() {
 
     running.forEach((ecid) -> {
       if (tabletCompactions.remove(ecid) != null) {
-        log.trace("Removed {} running on a compactor", ecid);
+        log.trace("Removed compaction {} running on a compactor", ecid);
+      }
+      if (this.deadCompactions.remove(ecid) != null) {
+        log.trace("Removed {} from the dead compaction map, it's running on a 
compactor", ecid);
       }
     });
 
     // Determine which compactions are currently committing and remove those
     context.getAmple().getExternalCompactionFinalStates()
-        .map(ecfs -> 
ecfs.getExternalCompactionId()).forEach(tabletCompactions::remove);
+        .map(ecfs -> ecfs.getExternalCompactionId()).forEach(ecid -> {
+          if (tabletCompactions.remove(ecid) != null) {
+            log.trace("Removed compaction {} that is committing", ecid);
+          }
+          if (this.deadCompactions.remove(ecid) != null) {
+            log.trace("Removed {} from the dead compaction map, it's 
committing", ecid);
+          }
+        });
 
-    tabletCompactions
-        .forEach((ecid, extent) -> log.debug("Detected dead compaction {} {}", 
ecid, extent));
+    tabletCompactions.forEach((ecid, extent) -> {
+      log.debug("Possible dead compaction detected {} {}", ecid, extent);
+      this.deadCompactions.putIfAbsent(ecid, System.currentTimeMillis());
+      this.deadCompactions.merge(ecid, 1L, Long::sum);
+    });
 
     // Everything left in tabletCompactions is no longer running anywhere and 
should be failed.
     // Its possible that a compaction committed while going through the steps 
above, if so then
     // that is ok and marking it failed will end up being a no-op.
+    Set<ExternalCompactionId> toFail =
+        this.deadCompactions.entrySet().stream().filter(e -> e.getValue() > 
2).map(e -> e.getKey())
+            .collect(Collectors.toCollection(TreeSet::new));
+    tabletCompactions.keySet().retainAll(toFail);
+    tabletCompactions.forEach((eci, v) -> {
+      log.warn("Compaction {} believed to be dead, failing it.", eci);

Review comment:
       I like how the change logs at debug when it finds a candidate dead 
compaction and adds that to the map.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to