This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push: new 202198a588 Updates compaction to use TabletLogger (#4333) 202198a588 is described below commit 202198a588aae1096611ff9b5513075054e2a876 Author: Keith Turner <ktur...@apache.org> AuthorDate: Mon Mar 4 10:55:50 2024 -0500 Updates compaction to use TabletLogger (#4333) --- .../apache/accumulo/core/logging/TabletLogger.java | 49 +++++++++++----------- .../coordinator/CompactionCoordinator.java | 9 ++-- .../coordinator/commit/CommitCompaction.java | 4 ++ .../manager/tableOps/compact/CompactionDriver.java | 22 ++++++++-- test/src/main/resources/log4j2-test.properties | 3 ++ 5 files changed, 55 insertions(+), 32 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/logging/TabletLogger.java b/core/src/main/java/org/apache/accumulo/core/logging/TabletLogger.java index 349d29b19f..e76c62a6c9 100644 --- a/core/src/main/java/org/apache/accumulo/core/logging/TabletLogger.java +++ b/core/src/main/java/org/apache/accumulo/core/logging/TabletLogger.java @@ -27,14 +27,16 @@ import java.util.SortedSet; import java.util.UUID; import java.util.concurrent.TimeUnit; -import org.apache.accumulo.core.client.admin.CompactionConfig; import org.apache.accumulo.core.client.admin.compaction.CompactableFile; import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.metadata.CompactableFileImpl; +import org.apache.accumulo.core.metadata.ReferencedTabletFile; import org.apache.accumulo.core.metadata.StoredTabletFile; import org.apache.accumulo.core.metadata.TServerInstance; import org.apache.accumulo.core.metadata.TabletFile; import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.spi.compaction.CompactionJob; import org.apache.accumulo.core.spi.compaction.CompactionKind; import org.apache.accumulo.core.tabletserver.log.LogEntry; @@ -127,40 +129,36 @@ public class TabletLogger { cf -> CompactableFileImpl.toStoredTabletFile(cf).toMinimalString()); } - public static void selected(KeyExtent extent, CompactionKind kind, + public static void selected(FateId fateId, KeyExtent extent, Collection<StoredTabletFile> inputs) { - fileLog.trace("{} changed compaction selection set for {} new set {}", extent, kind, + fileLog.trace("Selected files {} {} {}", extent, fateId, Collections2.transform(inputs, StoredTabletFile::toMinimalString)); } - public static void compacting(KeyExtent extent, CompactionJob job, CompactionConfig config) { + public static void compacting(TabletMetadata tabletMetadata, ExternalCompactionId cid, + String compactorAddress, CompactionJob job) { if (fileLog.isDebugEnabled()) { - if (config == null) { - fileLog.debug("Compacting {} on {} for {} from {} size {}", extent, job.getGroup(), - job.getKind(), asMinimalString(job.getFiles()), getSize(job.getFiles())); + if (job.getKind() == CompactionKind.USER) { + var fateId = tabletMetadata.getSelectedFiles().getFateId(); + fileLog.debug( + "Compacting {} driver:{} id:{} group:{} compactor:{} priority:{} size:{} kind:{} files:{}", + tabletMetadata.getExtent(), fateId, cid, job.getGroup(), compactorAddress, + job.getPriority(), getSize(job.getFiles()), job.getKind(), + asMinimalString(job.getFiles())); } else { - fileLog.debug("Compacting {} on {} for {} from {} size {} config {}", extent, - job.getGroup(), job.getKind(), asMinimalString(job.getFiles()), getSize(job.getFiles()), - config); + fileLog.debug( + "Compacting {} id:{} group:{} compactor:{} priority:{} size:{} kind:{} files:{}", + tabletMetadata.getExtent(), cid, job.getGroup(), compactorAddress, job.getPriority(), + getSize(job.getFiles()), job.getKind(), asMinimalString(job.getFiles())); } } } - public static void compacted(KeyExtent extent, CompactionJob job, StoredTabletFile output) { - fileLog.debug("Compacted {} for {} created {} from {}", extent, job.getKind(), output, - asMinimalString(job.getFiles())); - } - - public static void compactionFailed(KeyExtent extent, CompactionJob job, - CompactionConfig config) { - fileLog.debug("Failed to compact: extent: {}, input files: {}, iterators: {}", extent, - asMinimalString(job.getFiles()), config.getIterators()); - } - - public static void externalCompactionFailed(KeyExtent extent, ExternalCompactionId id, - CompactionJob job, CompactionConfig config) { - fileLog.debug("Failed to compact: id: {}, extent: {}, input files: {}, iterators: {}", id, - extent, asMinimalString(job.getFiles()), config.getIterators()); + public static void compacted(KeyExtent extent, ExternalCompactionId ecid, CompactionKind kind, + Collection<StoredTabletFile> inputs, Optional<ReferencedTabletFile> output) { + var transformed = Collections2.transform(inputs, StoredTabletFile::toMinimalString); + fileLog.debug("{} compacted {} for {} created {} from {}", ecid, extent, kind, + output.map(f -> f + "").orElse("no output"), transformed); } public static void flushed(KeyExtent extent, Optional<StoredTabletFile> newDatafile) { @@ -198,4 +196,5 @@ public class TabletLogger { public static void walRefsChanged(KeyExtent extent, Collection<String> refsSupplier) { walsLog.trace("{} has unflushed data in wals: {} ", extent, refsSupplier); } + } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java index a07ff50bc4..d36ec662d5 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java @@ -74,6 +74,7 @@ import org.apache.accumulo.core.fate.FateInstanceType; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.iterators.user.HasExternalCompactionsFilter; import org.apache.accumulo.core.iteratorsImpl.system.SystemIteratorUtil; +import org.apache.accumulo.core.logging.TabletLogger; import org.apache.accumulo.core.manager.state.tables.TableState; import org.apache.accumulo.core.metadata.CompactableFileImpl; import org.apache.accumulo.core.metadata.ReferencedTabletFile; @@ -347,10 +348,10 @@ public class CompactionCoordinator // Only reserve user compactions when the config is present. When compactions are canceled the // config is deleted. + var cid = ExternalCompactionId.from(externalCompactionId); if (kind == CompactionKind.SYSTEM || (kind == CompactionKind.USER && compactionConfig.isPresent())) { - ecm = reserveCompaction(metaJob, compactorAddress, - ExternalCompactionId.from(externalCompactionId)); + ecm = reserveCompaction(metaJob, compactorAddress, cid); } if (ecm != null) { @@ -359,8 +360,8 @@ public class CompactionCoordinator // is dead. In this cases the compaction is not actually running. RUNNING_CACHE.put(ExternalCompactionId.of(result.getExternalCompactionId()), new RunningCompaction(result, compactorAddress, groupName)); - LOG.debug("Returning external job {} to {} with {} files", result.externalCompactionId, - compactorAddress, ecm.getJobFiles().size()); + TabletLogger.compacting(metaJob.getTabletMetadata(), cid, compactorAddress, + metaJob.getJob()); break; } else { LOG.debug( diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CommitCompaction.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CommitCompaction.java index 2ff3a30386..cc0432d4a0 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CommitCompaction.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/commit/CommitCompaction.java @@ -34,6 +34,7 @@ import java.util.stream.Collectors; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.Repo; +import org.apache.accumulo.core.logging.TabletLogger; import org.apache.accumulo.core.metadata.AbstractTabletFile; import org.apache.accumulo.core.metadata.ReferencedTabletFile; import org.apache.accumulo.core.metadata.StoredTabletFile; @@ -136,6 +137,9 @@ public class CommitCompaction extends ManagerRepo { var result = tabletsMutator.process().get(getExtent()); if (result.getStatus() == Ample.ConditionalResult.Status.ACCEPTED) { + // Compaction was successfully committed to the tablet so log it + TabletLogger.compacted(getExtent(), ecid, commitData.kind, commitData.getJobFiles(), + newDatafile); break; } else { // compaction failed to commit, maybe something changed on the tablet so lets reread the diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java index b3f373cb16..f8ad23172b 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java @@ -28,6 +28,7 @@ import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType import java.time.Duration; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; import java.util.function.Predicate; @@ -44,6 +45,7 @@ import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.Repo; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; +import org.apache.accumulo.core.logging.TabletLogger; import org.apache.accumulo.core.metadata.AbstractTabletFile; import org.apache.accumulo.core.metadata.AccumuloTable; import org.apache.accumulo.core.metadata.StoredTabletFile; @@ -137,12 +139,24 @@ class CompactionDriver extends ManagerRepo { var ample = manager.getContext().getAmple(); - // ELASTICITY_TODO use existing compaction logging + // This map tracks tablets that had a conditional mutation submitted to select files. If the + // conditional mutation is successful then want to log a message. Use a concurrent map as the + // result consumer may run in another thread. + ConcurrentHashMap<KeyExtent,Set<StoredTabletFile>> selectionsSubmitted = + new ConcurrentHashMap<>(); Consumer<Ample.ConditionalResult> resultConsumer = result -> { if (result.getStatus() == Status.REJECTED) { log.debug("{} update for {} was rejected ", fateId, result.getExtent()); } + + // always remove extents from the map even if not successful in order to avoid placing too + // many in memory + var selected = selectionsSubmitted.remove(result.getExtent()); + if (selected != null && result.getStatus() == Status.ACCEPTED) { + // successfully selected files so log this + TabletLogger.selected(fateId, result.getExtent(), selected); + } }; long t1 = System.currentTimeMillis(); @@ -229,6 +243,8 @@ class CompactionDriver extends ManagerRepo { mutator.putSelectedFiles(selectedFiles); + selectionsSubmitted.put(tablet.getExtent(), filesToCompact); + mutator.submit(tabletMetadata -> tabletMetadata.getSelectedFiles() != null && tabletMetadata.getSelectedFiles().getMetadataValue() .equals(selectedFiles.getMetadataValue())); @@ -261,7 +277,7 @@ class CompactionDriver extends ManagerRepo { // If there are compactions preventing selection of files, then add // selecting marker that prevents new compactions from starting if (!tablet.getUserCompactionsRequested().contains(fateId)) { - log.debug( + log.trace( "Another compaction exists for {}, Marking {} as needing a user requested compaction", tablet.getExtent(), fateId); var mutator = tabletsMutator.mutateTablet(tablet.getExtent()).requireAbsentOperation() @@ -271,7 +287,7 @@ class CompactionDriver extends ManagerRepo { userCompactionRequested++; } else { // Marker was already added and we are waiting - log.debug("Waiting on {} for previously marked user requested compaction {} to run", + log.trace("Waiting on {} for previously marked user requested compaction {} to run", tablet.getExtent(), fateId); userCompactionWaiting++; } diff --git a/test/src/main/resources/log4j2-test.properties b/test/src/main/resources/log4j2-test.properties index 70276bda59..7a92825296 100644 --- a/test/src/main/resources/log4j2-test.properties +++ b/test/src/main/resources/log4j2-test.properties @@ -142,6 +142,9 @@ logger.38.level = debug logger.39.name = org.apache.accumulo.manager.Manager logger.39.level = trace +logger.40.name = org.apache.accumulo.tablet +logger.40.level = trace + property.metricsFilename = ./target/test-metrics # appender.metrics.type = Console