This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/elasticity by this push:
     new 202198a588 Updates compaction to use TabletLogger (#4333)
202198a588 is described below

commit 202198a588aae1096611ff9b5513075054e2a876
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Mon Mar 4 10:55:50 2024 -0500

    Updates compaction to use TabletLogger (#4333)
---
 .../apache/accumulo/core/logging/TabletLogger.java | 49 +++++++++++-----------
 .../coordinator/CompactionCoordinator.java         |  9 ++--
 .../coordinator/commit/CommitCompaction.java       |  4 ++
 .../manager/tableOps/compact/CompactionDriver.java | 22 ++++++++--
 test/src/main/resources/log4j2-test.properties     |  3 ++
 5 files changed, 55 insertions(+), 32 deletions(-)

diff --git 
a/core/src/main/java/org/apache/accumulo/core/logging/TabletLogger.java 
b/core/src/main/java/org/apache/accumulo/core/logging/TabletLogger.java
index 349d29b19f..e76c62a6c9 100644
--- a/core/src/main/java/org/apache/accumulo/core/logging/TabletLogger.java
+++ b/core/src/main/java/org/apache/accumulo/core/logging/TabletLogger.java
@@ -27,14 +27,16 @@ import java.util.SortedSet;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.accumulo.core.client.admin.CompactionConfig;
 import org.apache.accumulo.core.client.admin.compaction.CompactableFile;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.fate.FateId;
 import org.apache.accumulo.core.metadata.CompactableFileImpl;
+import org.apache.accumulo.core.metadata.ReferencedTabletFile;
 import org.apache.accumulo.core.metadata.StoredTabletFile;
 import org.apache.accumulo.core.metadata.TServerInstance;
 import org.apache.accumulo.core.metadata.TabletFile;
 import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
 import org.apache.accumulo.core.spi.compaction.CompactionJob;
 import org.apache.accumulo.core.spi.compaction.CompactionKind;
 import org.apache.accumulo.core.tabletserver.log.LogEntry;
@@ -127,40 +129,36 @@ public class TabletLogger {
         cf -> CompactableFileImpl.toStoredTabletFile(cf).toMinimalString());
   }
 
-  public static void selected(KeyExtent extent, CompactionKind kind,
+  public static void selected(FateId fateId, KeyExtent extent,
       Collection<StoredTabletFile> inputs) {
-    fileLog.trace("{} changed compaction selection set for {} new set {}", 
extent, kind,
+    fileLog.trace("Selected files {} {} {}", extent, fateId,
         Collections2.transform(inputs, StoredTabletFile::toMinimalString));
   }
 
-  public static void compacting(KeyExtent extent, CompactionJob job, 
CompactionConfig config) {
+  public static void compacting(TabletMetadata tabletMetadata, 
ExternalCompactionId cid,
+      String compactorAddress, CompactionJob job) {
     if (fileLog.isDebugEnabled()) {
-      if (config == null) {
-        fileLog.debug("Compacting {} on {} for {} from {} size {}", extent, 
job.getGroup(),
-            job.getKind(), asMinimalString(job.getFiles()), 
getSize(job.getFiles()));
+      if (job.getKind() == CompactionKind.USER) {
+        var fateId = tabletMetadata.getSelectedFiles().getFateId();
+        fileLog.debug(
+            "Compacting {} driver:{} id:{} group:{} compactor:{} priority:{} 
size:{} kind:{} files:{}",
+            tabletMetadata.getExtent(), fateId, cid, job.getGroup(), 
compactorAddress,
+            job.getPriority(), getSize(job.getFiles()), job.getKind(),
+            asMinimalString(job.getFiles()));
       } else {
-        fileLog.debug("Compacting {} on {} for {} from {} size {} config {}", 
extent,
-            job.getGroup(), job.getKind(), asMinimalString(job.getFiles()), 
getSize(job.getFiles()),
-            config);
+        fileLog.debug(
+            "Compacting {} id:{} group:{} compactor:{} priority:{} size:{} 
kind:{} files:{}",
+            tabletMetadata.getExtent(), cid, job.getGroup(), compactorAddress, 
job.getPriority(),
+            getSize(job.getFiles()), job.getKind(), 
asMinimalString(job.getFiles()));
       }
     }
   }
 
-  public static void compacted(KeyExtent extent, CompactionJob job, 
StoredTabletFile output) {
-    fileLog.debug("Compacted {} for {} created {} from {}", extent, 
job.getKind(), output,
-        asMinimalString(job.getFiles()));
-  }
-
-  public static void compactionFailed(KeyExtent extent, CompactionJob job,
-      CompactionConfig config) {
-    fileLog.debug("Failed to compact: extent: {}, input files: {}, iterators: 
{}", extent,
-        asMinimalString(job.getFiles()), config.getIterators());
-  }
-
-  public static void externalCompactionFailed(KeyExtent extent, 
ExternalCompactionId id,
-      CompactionJob job, CompactionConfig config) {
-    fileLog.debug("Failed to compact: id: {}, extent: {}, input files: {}, 
iterators: {}", id,
-        extent, asMinimalString(job.getFiles()), config.getIterators());
+  public static void compacted(KeyExtent extent, ExternalCompactionId ecid, 
CompactionKind kind,
+      Collection<StoredTabletFile> inputs, Optional<ReferencedTabletFile> 
output) {
+    var transformed = Collections2.transform(inputs, 
StoredTabletFile::toMinimalString);
+    fileLog.debug("{} compacted {} for {} created {} from {}", ecid, extent, 
kind,
+        output.map(f -> f + "").orElse("no output"), transformed);
   }
 
   public static void flushed(KeyExtent extent, Optional<StoredTabletFile> 
newDatafile) {
@@ -198,4 +196,5 @@ public class TabletLogger {
   public static void walRefsChanged(KeyExtent extent, Collection<String> 
refsSupplier) {
     walsLog.trace("{} has unflushed data in wals: {} ", extent, refsSupplier);
   }
+
 }
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
index a07ff50bc4..d36ec662d5 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
@@ -74,6 +74,7 @@ import org.apache.accumulo.core.fate.FateInstanceType;
 import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
 import org.apache.accumulo.core.iterators.user.HasExternalCompactionsFilter;
 import org.apache.accumulo.core.iteratorsImpl.system.SystemIteratorUtil;
+import org.apache.accumulo.core.logging.TabletLogger;
 import org.apache.accumulo.core.manager.state.tables.TableState;
 import org.apache.accumulo.core.metadata.CompactableFileImpl;
 import org.apache.accumulo.core.metadata.ReferencedTabletFile;
@@ -347,10 +348,10 @@ public class CompactionCoordinator
 
       // Only reserve user compactions when the config is present. When 
compactions are canceled the
       // config is deleted.
+      var cid = ExternalCompactionId.from(externalCompactionId);
       if (kind == CompactionKind.SYSTEM
           || (kind == CompactionKind.USER && compactionConfig.isPresent())) {
-        ecm = reserveCompaction(metaJob, compactorAddress,
-            ExternalCompactionId.from(externalCompactionId));
+        ecm = reserveCompaction(metaJob, compactorAddress, cid);
       }
 
       if (ecm != null) {
@@ -359,8 +360,8 @@ public class CompactionCoordinator
         // is dead. In this cases the compaction is not actually running.
         
RUNNING_CACHE.put(ExternalCompactionId.of(result.getExternalCompactionId()),
             new RunningCompaction(result, compactorAddress, groupName));
-        LOG.debug("Returning external job {} to {} with {} files", 
result.externalCompactionId,
-            compactorAddress, ecm.getJobFiles().size());
+        TabletLogger.compacting(metaJob.getTabletMetadata(), cid, 
compactorAddress,
+            metaJob.getJob());
         break;
       } else {
         LOG.debug(
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CommitCompaction.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CommitCompaction.java
index 2ff3a30386..cc0432d4a0 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CommitCompaction.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CommitCompaction.java
@@ -34,6 +34,7 @@ import java.util.stream.Collectors;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
 import org.apache.accumulo.core.fate.FateId;
 import org.apache.accumulo.core.fate.Repo;
+import org.apache.accumulo.core.logging.TabletLogger;
 import org.apache.accumulo.core.metadata.AbstractTabletFile;
 import org.apache.accumulo.core.metadata.ReferencedTabletFile;
 import org.apache.accumulo.core.metadata.StoredTabletFile;
@@ -136,6 +137,9 @@ public class CommitCompaction extends ManagerRepo {
 
         var result = tabletsMutator.process().get(getExtent());
         if (result.getStatus() == Ample.ConditionalResult.Status.ACCEPTED) {
+          // Compaction was successfully committed to the tablet so log it
+          TabletLogger.compacted(getExtent(), ecid, commitData.kind, 
commitData.getJobFiles(),
+              newDatafile);
           break;
         } else {
           // compaction failed to commit, maybe something changed on the 
tablet so lets reread the
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java
index b3f373cb16..f8ad23172b 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java
@@ -28,6 +28,7 @@ import static 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType
 
 import java.time.Duration;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Consumer;
 import java.util.function.Predicate;
@@ -44,6 +45,7 @@ import org.apache.accumulo.core.dataImpl.KeyExtent;
 import org.apache.accumulo.core.fate.FateId;
 import org.apache.accumulo.core.fate.Repo;
 import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.core.logging.TabletLogger;
 import org.apache.accumulo.core.metadata.AbstractTabletFile;
 import org.apache.accumulo.core.metadata.AccumuloTable;
 import org.apache.accumulo.core.metadata.StoredTabletFile;
@@ -137,12 +139,24 @@ class CompactionDriver extends ManagerRepo {
 
     var ample = manager.getContext().getAmple();
 
-    // ELASTICITY_TODO use existing compaction logging
+    // This map tracks tablets that had a conditional mutation submitted to 
select files. If the
+    // conditional mutation is successful then want to log a message. Use a 
concurrent map as the
+    // result consumer may run in another thread.
+    ConcurrentHashMap<KeyExtent,Set<StoredTabletFile>> selectionsSubmitted =
+        new ConcurrentHashMap<>();
 
     Consumer<Ample.ConditionalResult> resultConsumer = result -> {
       if (result.getStatus() == Status.REJECTED) {
         log.debug("{} update for {} was rejected ", fateId, 
result.getExtent());
       }
+
+      // always remove extents from the map even if not successful in order to 
avoid placing too
+      // many in memory
+      var selected = selectionsSubmitted.remove(result.getExtent());
+      if (selected != null && result.getStatus() == Status.ACCEPTED) {
+        // successfully selected files so log this
+        TabletLogger.selected(fateId, result.getExtent(), selected);
+      }
     };
 
     long t1 = System.currentTimeMillis();
@@ -229,6 +243,8 @@ class CompactionDriver extends ManagerRepo {
 
             mutator.putSelectedFiles(selectedFiles);
 
+            selectionsSubmitted.put(tablet.getExtent(), filesToCompact);
+
             mutator.submit(tabletMetadata -> tabletMetadata.getSelectedFiles() 
!= null
                 && tabletMetadata.getSelectedFiles().getMetadataValue()
                     .equals(selectedFiles.getMetadataValue()));
@@ -261,7 +277,7 @@ class CompactionDriver extends ManagerRepo {
           // If there are compactions preventing selection of files, then add
           // selecting marker that prevents new compactions from starting
           if (!tablet.getUserCompactionsRequested().contains(fateId)) {
-            log.debug(
+            log.trace(
                 "Another compaction exists for {}, Marking {} as needing a 
user requested compaction",
                 tablet.getExtent(), fateId);
             var mutator = 
tabletsMutator.mutateTablet(tablet.getExtent()).requireAbsentOperation()
@@ -271,7 +287,7 @@ class CompactionDriver extends ManagerRepo {
             userCompactionRequested++;
           } else {
             // Marker was already added and we are waiting
-            log.debug("Waiting on {} for previously marked user requested 
compaction {} to run",
+            log.trace("Waiting on {} for previously marked user requested 
compaction {} to run",
                 tablet.getExtent(), fateId);
             userCompactionWaiting++;
           }
diff --git a/test/src/main/resources/log4j2-test.properties 
b/test/src/main/resources/log4j2-test.properties
index 70276bda59..7a92825296 100644
--- a/test/src/main/resources/log4j2-test.properties
+++ b/test/src/main/resources/log4j2-test.properties
@@ -142,6 +142,9 @@ logger.38.level = debug
 logger.39.name = org.apache.accumulo.manager.Manager
 logger.39.level = trace
 
+logger.40.name = org.apache.accumulo.tablet
+logger.40.level = trace
+
 property.metricsFilename = ./target/test-metrics
 
 # appender.metrics.type = Console

Reply via email to