This is an automated email from the ASF dual-hosted git repository. mmiller pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push: new 681139e Create RunningCompactionInfo in core for reuse (#2403) 681139e is described below commit 681139e786787bdc5c9778bbb2a6062962e63f51 Author: Mike Miller <mmil...@apache.org> AuthorDate: Mon Jan 3 08:57:15 2022 -0500 Create RunningCompactionInfo in core for reuse (#2403) * Create new object in core for reuse * Make private vars of CompactorInfo final --- .../core/util/compaction/RunningCompaction.java | 6 +- .../util/compaction/RunningCompactionInfo.java | 93 +++++++++++----------- .../rest/compactions/external/CompactorInfo.java | 8 +- .../rest/compactions/external/ECResource.java | 5 +- .../compactions/external/RunningCompactions.java | 6 +- .../external/RunningCompactorDetails.java | 15 ++-- .../compaction/ExternalCompactionProgressIT.java | 8 +- 7 files changed, 69 insertions(+), 72 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/util/compaction/RunningCompaction.java b/core/src/main/java/org/apache/accumulo/core/util/compaction/RunningCompaction.java index 5ed976d..d057da1 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/compaction/RunningCompaction.java +++ b/core/src/main/java/org/apache/accumulo/core/util/compaction/RunningCompaction.java @@ -22,6 +22,7 @@ import java.util.Map; import java.util.TreeMap; import org.apache.accumulo.core.compaction.thrift.TCompactionStatusUpdate; +import org.apache.accumulo.core.compaction.thrift.TExternalCompaction; import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob; public class RunningCompaction { @@ -32,12 +33,15 @@ public class RunningCompaction { private final Map<Long,TCompactionStatusUpdate> updates = new TreeMap<>(); public RunningCompaction(TExternalCompactionJob job, String compactorAddress, String queueName) { - super(); this.job = job; this.compactorAddress = compactorAddress; this.queueName = queueName; } + public RunningCompaction(TExternalCompaction tEC) { + this(tEC.getJob(), tEC.getCompactor(), tEC.getQueueName()); + } + public Map<Long,TCompactionStatusUpdate> getUpdates() { return updates; } diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/RunningCompactorInfo.java b/core/src/main/java/org/apache/accumulo/core/util/compaction/RunningCompactionInfo.java similarity index 67% rename from server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/RunningCompactorInfo.java rename to core/src/main/java/org/apache/accumulo/core/util/compaction/RunningCompactionInfo.java index 7ef070f..4f67417 100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/RunningCompactorInfo.java +++ b/core/src/main/java/org/apache/accumulo/core/util/compaction/RunningCompactionInfo.java @@ -16,60 +16,54 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.accumulo.monitor.rest.compactions.external; +package org.apache.accumulo.core.util.compaction; + +import static java.util.Objects.requireNonNull; -import java.util.Map; import java.util.TreeMap; import java.util.concurrent.TimeUnit; -import jakarta.validation.constraints.NotNull; - import org.apache.accumulo.core.compaction.thrift.TCompactionStatusUpdate; import org.apache.accumulo.core.compaction.thrift.TExternalCompaction; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class RunningCompactorInfo extends CompactorInfo { - private static final Logger log = LoggerFactory.getLogger(RunningCompactorInfo.class); +public class RunningCompactionInfo { + private static final Logger log = LoggerFactory.getLogger(RunningCompactionInfo.class); - // Variable names become JSON keys - public String ecid; - public String kind; - public String tableId; - public int numFiles; - public float progress = 0f; - public long duration; - public String status; - public long lastUpdate; + // DO NOT CHANGE Variable names - they map to JSON keys in the Monitor + public final String server; + public final String queueName; + public final String ecid; + public final String kind; + public final String tableId; + public final int numFiles; + public final float progress; + public final long duration; + public final String status; + public final long lastUpdate; - public RunningCompactorInfo() { - super(); - } + /** + * Info parsed about the external running compaction. Calculate the progress, which is defined as + * the percentage of bytesRead / bytesToBeCompacted of the last update. + */ + public RunningCompactionInfo(TExternalCompaction ec) { + requireNonNull(ec, "Thrift external compaction is null."); + var updates = requireNonNull(ec.getUpdates(), "Missing Thrift external compaction updates"); + var job = requireNonNull(ec.getJob(), "Thrift external compaction job is null"); - public RunningCompactorInfo(long fetchedTime, String ecid, @NotNull TExternalCompaction ec) { - super(fetchedTime, ec.getQueueName(), ec.getCompactor()); - this.ecid = ecid; - var updates = ec.getUpdates(); - var job = ec.getJob(); + server = ec.getCompactor(); + queueName = ec.getQueueName(); + ecid = job.getExternalCompactionId(); kind = job.getKind().name(); - tableId = KeyExtent.fromThrift(job.extent).tableId().canonical(); - numFiles = job.files.size(); - updateProgress(updates); - log.debug("Parsed running compaction {} for {} with progress = {}%", status, ecid, progress); - } + tableId = KeyExtent.fromThrift(job.getExtent()).tableId().canonical(); + numFiles = job.getFiles().size(); - /** - * Calculate progress: the percentage of bytesRead out of bytesToBeCompacted of the last update. - * Also update the status. - */ - private void updateProgress(Map<Long,TCompactionStatusUpdate> updates) { - if (updates.isEmpty()) { - progress = 0f; - status = "na"; - } + // parse the updates map long nowMillis = System.currentTimeMillis(); long startedMillis = nowMillis; + float percent = 0f; long updateMillis; TCompactionStatusUpdate last; @@ -92,28 +86,33 @@ public class RunningCompactorInfo extends CompactorInfo { updateMillis = lastEntry.getKey(); } else { log.debug("No updates found for {}", ecid); + lastUpdate = nowMillis; + progress = percent; + status = "na"; return; } long sinceLastUpdateSeconds = TimeUnit.MILLISECONDS.toSeconds(nowMillis - updateMillis); log.debug("Time since Last update {} - {} = {} seconds", nowMillis, updateMillis, sinceLastUpdateSeconds); - if (sinceLastUpdateSeconds > 30) { - log.debug("Compaction hasn't progressed from {} in {} seconds.", progress, - sinceLastUpdateSeconds); - } - float percent; var total = last.getEntriesToBeCompacted(); - if (total <= 0) { - percent = 0f; - } else { + if (total > 0) { percent = (last.getEntriesRead() / (float) total) * 100; } - lastUpdate = nowMillis - updateMillis; - status = last.state.name(); progress = percent; + + if (updates.isEmpty()) { + status = "na"; + } else { + status = last.state.name(); + } + log.debug("Parsed running compaction {} for {} with progress = {}%", status, ecid, progress); + if (sinceLastUpdateSeconds > 30) { + log.debug("Compaction hasn't progressed from {} in {} seconds.", progress, + sinceLastUpdateSeconds); + } } @Override diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/CompactorInfo.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/CompactorInfo.java index a7dc56a..5e7c36f 100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/CompactorInfo.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/CompactorInfo.java @@ -21,11 +21,9 @@ package org.apache.accumulo.monitor.rest.compactions.external; public class CompactorInfo { // Variable names become JSON keys - public long lastContact = 0L; - public String server = ""; - public String queueName = ""; - - public CompactorInfo() {} + public final long lastContact; + public final String server; + public final String queueName; public CompactorInfo(long fetchedTimeMillis, String queue, String hostAndPort) { lastContact = System.currentTimeMillis() - fetchedTimeMillis; diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/ECResource.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/ECResource.java index 131abc8..b17a702 100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/ECResource.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/ECResource.java @@ -77,10 +77,9 @@ public class ECResource { ecMap = monitor.fetchRunningInfo(); externalCompaction = ecMap.get(ecid); if (externalCompaction == null) { - log.warn("Failed to find details for ECID: {}", ecid); - return new RunningCompactorDetails(); + throw new IllegalStateException("Failed to find details for ECID: " + ecid); } } - return new RunningCompactorDetails(System.currentTimeMillis(), ecid, externalCompaction); + return new RunningCompactorDetails(externalCompaction); } } diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/RunningCompactions.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/RunningCompactions.java index 294b91c..1f813f1 100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/RunningCompactions.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/RunningCompactions.java @@ -23,16 +23,16 @@ import java.util.List; import java.util.Map; import org.apache.accumulo.core.compaction.thrift.TExternalCompaction; +import org.apache.accumulo.core.util.compaction.RunningCompactionInfo; public class RunningCompactions { - public final List<RunningCompactorInfo> running = new ArrayList<>(); + public final List<RunningCompactionInfo> running = new ArrayList<>(); public RunningCompactions(Map<String,TExternalCompaction> rMap) { if (rMap != null) { - var fetchedTime = System.currentTimeMillis(); for (var entry : rMap.entrySet()) { - running.add(new RunningCompactorInfo(fetchedTime, entry.getKey(), entry.getValue())); + running.add(new RunningCompactionInfo(entry.getValue())); } } } diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/RunningCompactorDetails.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/RunningCompactorDetails.java index c6c3af2..ad6cbff 100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/RunningCompactorDetails.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/rest/compactions/external/RunningCompactorDetails.java @@ -23,18 +23,15 @@ import java.util.List; import org.apache.accumulo.core.compaction.thrift.TExternalCompaction; import org.apache.accumulo.core.tabletserver.thrift.InputFile; +import org.apache.accumulo.core.util.compaction.RunningCompactionInfo; -public class RunningCompactorDetails extends RunningCompactorInfo { +public class RunningCompactorDetails extends RunningCompactionInfo { // Variable names become JSON keys - public List<CompactionInputFile> inputFiles = new ArrayList<>(); - public String outputFile; + public final List<CompactionInputFile> inputFiles; + public final String outputFile; - public RunningCompactorDetails() { - super(); - } - - public RunningCompactorDetails(long fetchedTime, String ecid, TExternalCompaction ec) { - super(fetchedTime, ecid, ec); + public RunningCompactorDetails(TExternalCompaction ec) { + super(ec); var job = ec.getJob(); inputFiles = convertInputFiles(job.files); outputFile = job.outputFile; diff --git a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java index 20f974c..27004d3 100644 --- a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java +++ b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java @@ -36,11 +36,11 @@ import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.compaction.thrift.TCompactionState; import org.apache.accumulo.core.iterators.IteratorUtil; +import org.apache.accumulo.core.util.compaction.RunningCompactionInfo; import org.apache.accumulo.core.util.threads.Threads; import org.apache.accumulo.harness.AccumuloClusterHarness; import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl; import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; -import org.apache.accumulo.monitor.rest.compactions.external.RunningCompactorInfo; import org.apache.accumulo.test.functional.SlowIterator; import org.apache.hadoop.conf.Configuration; import org.apache.thrift.TException; @@ -63,7 +63,7 @@ public class ExternalCompactionProgressIT extends AccumuloClusterHarness { STARTED, QUARTER, HALF, THREE_QUARTERS } - Map<String,RunningCompactorInfo> runningMap = new HashMap<>(); + Map<String,RunningCompactionInfo> runningMap = new HashMap<>(); List<EC_PROGRESS> progressList = new ArrayList<>(); private final AtomicBoolean compactionFinished = new AtomicBoolean(false); @@ -128,8 +128,8 @@ public class ExternalCompactionProgressIT extends AccumuloClusterHarness { if (ecMap != null) { ecMap.forEach((ecid, ec) -> { // returns null if it's a new mapping - RunningCompactorInfo rci = new RunningCompactorInfo(System.currentTimeMillis(), ecid, ec); - RunningCompactorInfo previousRci = runningMap.put(ecid, rci); + RunningCompactionInfo rci = new RunningCompactionInfo(ec); + RunningCompactionInfo previousRci = runningMap.put(ecid, rci); if (previousRci == null) { log.debug("New ECID {} with inputFiles: {}", ecid, rci.numFiles); } else {