keith-turner commented on code in PR #5185:
URL: https://github.com/apache/accumulo/pull/5185#discussion_r1885812079
##########
server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java:
##########
@@ -528,82 +551,138 @@ protected CompactionMetadata
createExternalCompactionMetadata(CompactionJob job,
}
- protected CompactionMetadata reserveCompaction(CompactionJobQueues.MetaJob
metaJob,
- String compactorAddress, ExternalCompactionId externalCompactionId) {
+ private class ReserveCompactionTask implements Supplier<CompactionMetadata> {
+
+ // Use a soft reference for this in case free memory gets low while this
is sitting in the queue
+ // waiting to process. This object can contain the tablets list of files
and if there are lots
+ // of tablet with lots of files then that could start to cause memory
problems.
+ private final SoftReference<CompactionJobQueues.MetaJob> metaJobRef;
+ private final String compactorAddress;
+ private final ExternalCompactionId externalCompactionId;
+
+ private ReserveCompactionTask(CompactionJobQueues.MetaJob metaJob, String
compactorAddress,
+ ExternalCompactionId externalCompactionId) {
+ Preconditions.checkArgument(metaJob.getJob().getKind() ==
CompactionKind.SYSTEM
+ || metaJob.getJob().getKind() == CompactionKind.USER);
+ this.metaJobRef = new SoftReference<>(Objects.requireNonNull(metaJob));
+ this.compactorAddress = Objects.requireNonNull(compactorAddress);
+ this.externalCompactionId = Objects.requireNonNull(externalCompactionId);
+
Preconditions.checkState(activeCompactorReservationRequest.add(compactorAddress));
+ }
- Preconditions.checkArgument(metaJob.getJob().getKind() ==
CompactionKind.SYSTEM
- || metaJob.getJob().getKind() == CompactionKind.USER);
+ @Override
+ public CompactionMetadata get() {
+ try {
+ var metaJob = metaJobRef.get();
+ if (metaJob == null) {
+ LOG.warn("Compaction reservation request for {} {} was garbage
collected.",
+ compactorAddress, externalCompactionId);
+ return null;
+ }
- var tabletMetadata = metaJob.getTabletMetadata();
+ var tabletMetadata = metaJob.getTabletMetadata();
- var jobFiles =
metaJob.getJob().getFiles().stream().map(CompactableFileImpl::toStoredTabletFile)
- .collect(Collectors.toSet());
+ var jobFiles = metaJob.getJob().getFiles().stream()
+
.map(CompactableFileImpl::toStoredTabletFile).collect(Collectors.toSet());
- Retry retry =
Retry.builder().maxRetries(5).retryAfter(Duration.ofMillis(100))
-
.incrementBy(Duration.ofMillis(100)).maxWait(Duration.ofSeconds(10)).backOffFactor(1.5)
- .logInterval(Duration.ofMinutes(3)).createRetry();
+ Retry retry =
Retry.builder().maxRetries(5).retryAfter(Duration.ofMillis(100))
+
.incrementBy(Duration.ofMillis(100)).maxWait(Duration.ofSeconds(10)).backOffFactor(1.5)
+ .logInterval(Duration.ofMinutes(3)).createRetry();
- while (retry.canRetry()) {
- try (var tabletsMutator = ctx.getAmple().conditionallyMutateTablets()) {
- var extent = metaJob.getTabletMetadata().getExtent();
+ while (retry.canRetry()) {
+ try (var tabletsMutator =
ctx.getAmple().conditionallyMutateTablets()) {
+ var extent = metaJob.getTabletMetadata().getExtent();
- if (!canReserveCompaction(tabletMetadata, metaJob.getJob().getKind(),
jobFiles, ctx,
- manager.getSteadyTime())) {
- return null;
- }
+ if (!canReserveCompaction(tabletMetadata,
metaJob.getJob().getKind(), jobFiles, ctx,
+ manager.getSteadyTime())) {
+ return null;
+ }
- var ecm = createExternalCompactionMetadata(metaJob.getJob(), jobFiles,
tabletMetadata,
- compactorAddress, externalCompactionId);
-
- // any data that is read from the tablet to make a decision about if
it can compact or not
- // must be checked for changes in the conditional mutation.
- var tabletMutator =
tabletsMutator.mutateTablet(extent).requireAbsentOperation()
- .requireFiles(jobFiles).requireNotCompacting(jobFiles);
- if (metaJob.getJob().getKind() == CompactionKind.SYSTEM) {
- // For system compactions the user compaction requested column is
examined when deciding
- // if a compaction can start so need to check for changes to this
column.
- tabletMutator.requireSame(tabletMetadata, SELECTED,
USER_COMPACTION_REQUESTED);
- } else {
- tabletMutator.requireSame(tabletMetadata, SELECTED);
- }
+ var ecm = createExternalCompactionMetadata(metaJob.getJob(),
jobFiles, tabletMetadata,
+ compactorAddress, externalCompactionId);
+
+ // any data that is read from the tablet to make a decision about
if it can compact or
+ // not
+ // must be checked for changes in the conditional mutation.
+ var tabletMutator =
tabletsMutator.mutateTablet(extent).requireAbsentOperation()
+ .requireFiles(jobFiles).requireNotCompacting(jobFiles);
+ if (metaJob.getJob().getKind() == CompactionKind.SYSTEM) {
+ // For system compactions the user compaction requested column
is examined when
+ // deciding
+ // if a compaction can start so need to check for changes to
this column.
+ tabletMutator.requireSame(tabletMetadata, SELECTED,
USER_COMPACTION_REQUESTED);
+ } else {
+ tabletMutator.requireSame(tabletMetadata, SELECTED);
+ }
- if (metaJob.getJob().getKind() == CompactionKind.SYSTEM) {
- var selectedFiles = tabletMetadata.getSelectedFiles();
- var reserved = getFilesReservedBySelection(tabletMetadata,
manager.getSteadyTime(), ctx);
-
- // If there is a selectedFiles column, and the reserved set is empty
this means that
- // either no user jobs were completed yet or the selection
expiration time has passed
- // so the column is eligible to be deleted so a system job can run
instead
- if (selectedFiles != null && reserved.isEmpty()
- && !Collections.disjoint(jobFiles, selectedFiles.getFiles())) {
- LOG.debug("Deleting user compaction selected files for {} {}",
extent,
- externalCompactionId);
- tabletMutator.deleteSelectedFiles();
- }
- }
+ if (metaJob.getJob().getKind() == CompactionKind.SYSTEM) {
+ var selectedFiles = tabletMetadata.getSelectedFiles();
+ var reserved =
+ getFilesReservedBySelection(tabletMetadata,
manager.getSteadyTime(), ctx);
+
+ // If there is a selectedFiles column, and the reserved set is
empty this means that
+ // either no user jobs were completed yet or the selection
expiration time has passed
+ // so the column is eligible to be deleted so a system job can
run instead
+ if (selectedFiles != null && reserved.isEmpty()
+ && !Collections.disjoint(jobFiles,
selectedFiles.getFiles())) {
+ LOG.debug("Deleting user compaction selected files for {} {}",
extent,
+ externalCompactionId);
+ tabletMutator.deleteSelectedFiles();
+ }
+ }
- tabletMutator.putExternalCompaction(externalCompactionId, ecm);
- tabletMutator.submit(tm ->
tm.getExternalCompactions().containsKey(externalCompactionId));
+ tabletMutator.putExternalCompaction(externalCompactionId, ecm);
+ tabletMutator
+ .submit(tm ->
tm.getExternalCompactions().containsKey(externalCompactionId));
- var result = tabletsMutator.process().get(extent);
+ var result = tabletsMutator.process().get(extent);
- if (result.getStatus() == Ample.ConditionalResult.Status.ACCEPTED) {
- return ecm;
- } else {
- tabletMetadata = result.readMetadata();
+ if (result.getStatus() == Ample.ConditionalResult.Status.ACCEPTED)
{
+ return ecm;
+ } else {
+ tabletMetadata = result.readMetadata();
+ }
+ }
+
+ retry.useRetry();
+ try {
+ retry.waitForNextAttempt(LOG,
+ "Reserved compaction for " +
metaJob.getTabletMetadata().getExtent());
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
}
- }
- retry.useRetry();
- try {
- retry.waitForNextAttempt(LOG,
- "Reserved compaction for " +
metaJob.getTabletMetadata().getExtent());
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
+ return null;
+ } finally {
+
Preconditions.checkState(activeCompactorReservationRequest.remove(compactorAddress));
}
}
+ }
+
+ protected CompactionMetadata reserveCompaction(CompactionJobQueues.MetaJob
metaJob,
+ String compactorAddress, ExternalCompactionId externalCompactionId) {
+
+ if (activeCompactorReservationRequest.contains(compactorAddress)) {
Review Comment:
Yeah if something really unusual is going on that Precondition check would
catch it. Under normal case of compactor retrying the first check will catch
it. It would probably be good add some info to the preconditions check in case
it does fire.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]