This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push: new 5e4e30b68b fixes #3045 remove stale compactions from coordinator (#3059) 5e4e30b68b is described below commit 5e4e30b68bf14b82b48442cab33d1bd58b943a89 Author: Keith Turner <ktur...@apache.org> AuthorDate: Wed Nov 2 16:55:55 2022 +0000 fixes #3045 remove stale compactions from coordinator (#3059) --- pom.xml | 2 +- .../coordinator/CompactionCoordinator.java | 94 +++++++++++++++------- .../coordinator/CompactionCoordinatorTest.java | 64 ++++++++++++++- 3 files changed, 131 insertions(+), 29 deletions(-) diff --git a/pom.xml b/pom.xml index 96833e5eb3..abc036686a 100644 --- a/pom.xml +++ b/pom.xml @@ -1701,7 +1701,7 @@ <jdk>[17,)</jdk> </activation> <properties> - <extraTestArgs>--add-opens java.base/java.lang=ALL-UNNAMED --add-opens java.base/java.util=ALL-UNNAMED --add-opens java.base/java.io=ALL-UNNAMED --add-opens java.base/java.net=ALL-UNNAMED --add-opens java.management/java.lang.management=ALL-UNNAMED --add-opens java.management/sun.management=ALL-UNNAMED --add-opens java.base/java.security=ALL-UNNAMED --add-opens java.base/java.lang.reflect=ALL-UNNAMED --add-opens java.base/java.util.concurrent=ALL-UNNAMED --add-opens java.base/jav [...] + <extraTestArgs>--add-opens java.base/java.lang=ALL-UNNAMED --add-opens java.base/java.util=ALL-UNNAMED --add-opens java.base/java.io=ALL-UNNAMED --add-opens java.base/java.net=ALL-UNNAMED --add-opens java.management/java.lang.management=ALL-UNNAMED --add-opens java.management/sun.management=ALL-UNNAMED --add-opens java.base/java.security=ALL-UNNAMED --add-opens java.base/java.lang.reflect=ALL-UNNAMED --add-opens java.base/java.util.concurrent=ALL-UNNAMED --add-opens java.base/jav [...] </properties> </profile> </profiles> diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java index 27e4881143..232a6066b8 100644 --- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java +++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java @@ -32,6 +32,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import org.apache.accumulo.coordinator.QueueSummaries.PrioTserver; import org.apache.accumulo.core.Constants; @@ -55,7 +56,9 @@ import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent; import org.apache.accumulo.core.fate.zookeeper.ServiceLock; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.metadata.TServerInstance; +import org.apache.accumulo.core.metadata.schema.Ample; import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.metrics.MetricsUtil; import org.apache.accumulo.core.rpc.ThriftUtil; import org.apache.accumulo.core.rpc.clients.ThriftClientTypes; @@ -90,6 +93,7 @@ import org.slf4j.LoggerFactory; import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; +import com.google.common.collect.Sets; public class CompactionCoordinator extends AbstractServer implements CompactionCoordinatorService.Iface, LiveTServerSet.Listener { @@ -100,8 +104,14 @@ public class CompactionCoordinator extends AbstractServer protected static final QueueSummaries QUEUE_SUMMARIES = new QueueSummaries(); - /* Map of compactionId to RunningCompactions */ - protected static final Map<ExternalCompactionId,RunningCompaction> RUNNING = + /* + * Map of compactionId to RunningCompactions. This is an informational cache of what external + * compactions may be running. Its possible it may contain external compactions that are not + * actually running. It may not contain compactions that are actually running. The metadata table + * is the most authoritative source of what external compactions are currently running, but it + * does not have the stats that this map has. + */ + protected static final Map<ExternalCompactionId,RunningCompaction> RUNNING_CACHE = new ConcurrentHashMap<>(); private static final Cache<ExternalCompactionId,RunningCompaction> COMPLETED = @@ -137,6 +147,7 @@ public class CompactionCoordinator extends AbstractServer startGCLogger(schedExecutor); printStartupMsg(); startCompactionCleaner(schedExecutor); + startRunningCleaner(schedExecutor); } @Override @@ -170,6 +181,12 @@ public class CompactionCoordinator extends AbstractServer ThreadPools.watchNonCriticalScheduledTask(future); } + protected void startRunningCleaner(ScheduledThreadPoolExecutor schedExecutor) { + ScheduledFuture<?> future = + schedExecutor.scheduleWithFixedDelay(this::cleanUpRunning, 0, 5, TimeUnit.MINUTES); + ThreadPools.watchNonCriticalScheduledTask(future); + } + protected void printStartupMsg() { LOG.info("Version " + Constants.VERSION); LOG.info("Instance " + getContext().getInstanceID()); @@ -277,7 +294,7 @@ public class CompactionCoordinator extends AbstractServer update.setState(TCompactionState.IN_PROGRESS); update.setMessage("Coordinator restarted, compaction found in progress"); rc.addUpdate(System.currentTimeMillis(), update); - RUNNING.put(ExternalCompactionId.of(rc.getJob().getExternalCompactionId()), rc); + RUNNING_CACHE.put(ExternalCompactionId.of(rc.getJob().getExternalCompactionId()), rc); }); } @@ -446,7 +463,10 @@ public class CompactionCoordinator extends AbstractServer prioTserver = QUEUE_SUMMARIES.getNextTserver(queue); continue; } - RUNNING.put(ExternalCompactionId.of(job.getExternalCompactionId()), + // It is possible that by the time this added that the tablet has already canceled the + // compaction or the compactor that made this request is dead. In these cases the compaction + // is not actually running. + RUNNING_CACHE.put(ExternalCompactionId.of(job.getExternalCompactionId()), new RunningCompaction(job, compactorAddress, queue)); LOG.debug("Returning external job {} to {}", job.externalCompactionId, compactorAddress); result = job; @@ -523,15 +543,7 @@ public class CompactionCoordinator extends AbstractServer // It's possible that RUNNING might not have an entry for this ecid in the case // of a coordinator restart when the Coordinator can't find the TServer for the // corresponding external compaction. - final RunningCompaction rc = RUNNING.get(ecid); - if (null != rc) { - RUNNING.remove(ecid, rc); - COMPLETED.put(ecid, rc); - } else { - LOG.warn( - "Compaction completed called by Compactor for {}, but no running compaction for that id.", - externalCompactionId); - } + recordCompletion(ecid); } @Override @@ -549,17 +561,7 @@ public class CompactionCoordinator extends AbstractServer void compactionFailed(Map<ExternalCompactionId,KeyExtent> compactions) { compactionFinalizer.failCompactions(compactions); - compactions.forEach((k, v) -> { - final RunningCompaction rc = RUNNING.get(k); - if (null != rc) { - RUNNING.remove(k, rc); - COMPLETED.put(k, rc); - } else { - LOG.warn( - "Compaction failed called by Compactor for {}, but no running compaction for that id.", - k); - } - }); + compactions.forEach((k, v) -> recordCompletion(k)); } /** @@ -589,12 +591,49 @@ public class CompactionCoordinator extends AbstractServer } LOG.debug("Compaction status update, id: {}, timestamp: {}, update: {}", externalCompactionId, timestamp, update); - final RunningCompaction rc = RUNNING.get(ExternalCompactionId.of(externalCompactionId)); + final RunningCompaction rc = RUNNING_CACHE.get(ExternalCompactionId.of(externalCompactionId)); if (null != rc) { rc.addUpdate(timestamp, update); } } + private void recordCompletion(ExternalCompactionId ecid) { + var rc = RUNNING_CACHE.remove(ecid); + if (rc != null) { + COMPLETED.put(ecid, rc); + } + } + + protected Set<ExternalCompactionId> readExternalCompactionIds() { + return getContext().getAmple().readTablets().forLevel(Ample.DataLevel.USER) + .fetch(TabletMetadata.ColumnType.ECOMP).build().stream() + .flatMap(tm -> tm.getExternalCompactions().keySet().stream()).collect(Collectors.toSet()); + } + + /** + * The RUNNING_CACHE set may contain external compactions that are not actually running. This + * method periodically cleans those up. + */ + protected void cleanUpRunning() { + + // grab a snapshot of the ids in the set before reading the metadata table. This is done to + // avoid removing things that are added while reading the metadata. + Set<ExternalCompactionId> idsSnapshot = Set.copyOf(RUNNING_CACHE.keySet()); + + // grab the ids that are listed as running in the metadata table. It important that this is done + // after getting the snapshot. + Set<ExternalCompactionId> idsInMetadata = readExternalCompactionIds(); + + var idsToRemove = Sets.difference(idsSnapshot, idsInMetadata); + + // remove ids that are in the running set but not in the metadata table + idsToRemove.forEach(ecid -> recordCompletion(ecid)); + + if (idsToRemove.size() > 0) { + LOG.debug("Removed stale entries from RUNNING_CACHE : {}", idsToRemove); + } + } + /** * Return information about running compactions * @@ -614,8 +653,9 @@ public class CompactionCoordinator extends AbstractServer throw new AccumuloSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED).asThriftException(); } + final TExternalCompactionList result = new TExternalCompactionList(); - RUNNING.forEach((ecid, rc) -> { + RUNNING_CACHE.forEach((ecid, rc) -> { TExternalCompaction trc = new TExternalCompaction(); trc.setQueueName(rc.getQueueName()); trc.setCompactor(rc.getCompactorAddress()); @@ -660,7 +700,7 @@ public class CompactionCoordinator extends AbstractServer @Override public void cancel(TInfo tinfo, TCredentials credentials, String externalCompactionId) throws TException { - var runningCompaction = RUNNING.get(ExternalCompactionId.of(externalCompactionId)); + var runningCompaction = RUNNING_CACHE.get(ExternalCompactionId.of(externalCompactionId)); var extent = KeyExtent.fromThrift(runningCompaction.getJob().getExtent()); try { NamespaceId nsId = getContext().getNamespaceId(extent.tableId()); diff --git a/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java b/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java index 1715d84af5..f2c57358d1 100644 --- a/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java +++ b/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java @@ -38,6 +38,7 @@ import java.util.UUID; import java.util.concurrent.ScheduledThreadPoolExecutor; import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException; +import org.apache.accumulo.core.compaction.thrift.TExternalCompaction; import org.apache.accumulo.core.conf.DefaultConfiguration; import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent; import org.apache.accumulo.core.metadata.TServerInstance; @@ -87,6 +88,8 @@ public class CompactionCoordinatorTest { private final ServerAddress client; private final TabletClientService.Client tabletServerClient; + private Set<ExternalCompactionId> metadataCompactionIds = null; + protected TestCoordinator(CompactionFinalizer finalizer, LiveTServerSet tservers, ServerAddress client, TabletClientService.Client tabletServerClient, ServerContext context, AuditedSecurityOperation security) { @@ -158,6 +161,18 @@ public class CompactionCoordinatorTest { public void compactionFailed(TInfo tinfo, TCredentials credentials, String externalCompactionId, TKeyExtent extent) throws ThriftSecurityException {} + void setMetadataCompactionIds(Set<ExternalCompactionId> mci) { + metadataCompactionIds = mci; + } + + protected Set<ExternalCompactionId> readExternalCompactionIds() { + if (metadataCompactionIds == null) { + return RUNNING_CACHE.keySet(); + } else { + return metadataCompactionIds; + } + } + public Map<String,TreeMap<Short,TreeSet<TServerInstance>>> getQueues() { return CompactionCoordinator.QUEUE_SUMMARIES.QUEUES; } @@ -167,13 +182,14 @@ public class CompactionCoordinatorTest { } public Map<ExternalCompactionId,RunningCompaction> getRunning() { - return RUNNING; + return RUNNING_CACHE; } public void resetInternals() { getQueues().clear(); getIndex().clear(); getRunning().clear(); + metadataCompactionIds = null; } } @@ -586,4 +602,50 @@ public class CompactionCoordinatorTest { coordinator.close(); } + @Test + public void testCleanUpRunning() throws Exception { + PowerMock.resetAll(); + PowerMock.suppress(PowerMock.constructor(AbstractServer.class)); + + ServerContext context = PowerMock.createNiceMock(ServerContext.class); + expect(context.getConfiguration()).andReturn(DefaultConfiguration.getInstance()).anyTimes(); + + TCredentials creds = PowerMock.createNiceMock(TCredentials.class); + + CompactionFinalizer finalizer = PowerMock.createNiceMock(CompactionFinalizer.class); + LiveTServerSet tservers = PowerMock.createNiceMock(LiveTServerSet.class); + + ServerAddress client = PowerMock.createNiceMock(ServerAddress.class); + HostAndPort address = HostAndPort.fromString("localhost:10240"); + expect(client.getAddress()).andReturn(address).anyTimes(); + + TabletClientService.Client tsc = PowerMock.createNiceMock(TabletClientService.Client.class); + + AuditedSecurityOperation security = PowerMock.createNiceMock(AuditedSecurityOperation.class); + expect(security.canPerformSystemActions(creds)).andReturn(true); + + PowerMock.replayAll(); + + var coordinator = new TestCoordinator(finalizer, tservers, client, tsc, context, security); + coordinator.resetInternals(); + + var ecid1 = ExternalCompactionId.generate(UUID.randomUUID()); + var ecid2 = ExternalCompactionId.generate(UUID.randomUUID()); + var ecid3 = ExternalCompactionId.generate(UUID.randomUUID()); + + coordinator.getRunning().put(ecid1, new RunningCompaction(new TExternalCompaction())); + coordinator.getRunning().put(ecid2, new RunningCompaction(new TExternalCompaction())); + coordinator.getRunning().put(ecid3, new RunningCompaction(new TExternalCompaction())); + + coordinator.cleanUpRunning(); + + assertEquals(Set.of(ecid1, ecid2, ecid3), coordinator.getRunning().keySet()); + + coordinator.setMetadataCompactionIds(Set.of(ecid1, ecid2)); + + coordinator.cleanUpRunning(); + + assertEquals(Set.of(ecid1, ecid2), coordinator.getRunning().keySet()); + + } }