This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch 1451-external-compactions-feature in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/1451-external-compactions-feature by this push: new 81fee5c Persist external compactions in metadata table 81fee5c is described below commit 81fee5c19340c8cd794178365263b661365425b1 Author: Keith Turner <ktur...@apache.org> AuthorDate: Fri Mar 19 19:58:48 2021 -0400 Persist external compactions in metadata table --- .../accumulo/core/metadata/schema/Ample.java | 5 + .../metadata/schema}/ExternalCompactionId.java | 2 +- .../schema/ExternalCompactionMetadata.java | 121 +++++++++++++++++++++ .../core/metadata/schema/MetadataSchema.java | 5 + .../core/metadata/schema/TabletMetadata.java | 18 ++- .../core/metadata/schema/TabletsMetadata.java | 4 + .../core/spi/compaction/CompactionExecutorId.java | 1 + .../core/util/compaction/CompactionJobImpl.java | 20 +++- .../server/constraints/MetadataConstraints.java | 4 +- .../server/metadata/TabletMutatorBase.java | 17 +++ .../accumulo/server/util/ManagerMetadataUtil.java | 7 +- .../coordinator/CompactionCoordinator.java | 2 +- .../accumulo/tserver/ThriftClientHandler.java | 2 +- .../accumulo/tserver/compactions/Compactable.java | 2 +- .../tserver/compactions/CompactionManager.java | 6 +- .../tserver/compactions/ExternalCompactionJob.java | 2 +- .../accumulo/tserver/tablet/CompactableImpl.java | 105 ++++++++++++------ .../accumulo/tserver/tablet/CompactableUtils.java | 2 +- .../accumulo/tserver/tablet/DatafileManager.java | 8 +- .../org/apache/accumulo/tserver/tablet/Tablet.java | 20 +++- .../apache/accumulo/tserver/tablet/TabletData.java | 10 ++ 21 files changed, 312 insertions(+), 51 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java index 1860122..334dac2 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java @@ -270,6 +270,11 @@ public interface Ample { TabletMutator deleteSuspension(); + TabletMutator putExternalCompaction(ExternalCompactionId ecid, + ExternalCompactionMetadata ecMeta); + + TabletMutator deleteExternalCompaction(ExternalCompactionId ecid); + /** * This method persist (or queues for persisting) previous put and deletes against this object. * Unless this method is called, previous calls will never be persisted. The purpose of this diff --git a/server/base/src/main/java/org/apache/accumulo/server/compaction/ExternalCompactionId.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/ExternalCompactionId.java similarity index 97% rename from server/base/src/main/java/org/apache/accumulo/server/compaction/ExternalCompactionId.java rename to core/src/main/java/org/apache/accumulo/core/metadata/schema/ExternalCompactionId.java index e6880c4..c5ae588 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/compaction/ExternalCompactionId.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/ExternalCompactionId.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.accumulo.server.compaction; +package org.apache.accumulo.core.metadata.schema; import java.util.UUID; diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/ExternalCompactionMetadata.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/ExternalCompactionMetadata.java new file mode 100644 index 0000000..6398bf7 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/ExternalCompactionMetadata.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.metadata.schema; + +import static java.util.stream.Collectors.toList; +import static java.util.stream.Collectors.toSet; + +import java.util.List; +import java.util.Objects; +import java.util.Set; + +import org.apache.accumulo.core.metadata.StoredTabletFile; +import org.apache.accumulo.core.metadata.TabletFile; +import org.apache.accumulo.core.spi.compaction.CompactionExecutorId; +import org.apache.accumulo.core.spi.compaction.CompactionKind; +import org.apache.hadoop.fs.Path; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; + +public class ExternalCompactionMetadata { + + private static final Gson GSON = new GsonBuilder().create(); + + private final Set<StoredTabletFile> jobFiles; + private final TabletFile compactTmpName; + private final TabletFile newFile; + private final String compactorId; + private final CompactionKind kind; + private final long priority; + private final CompactionExecutorId ceid; + + public ExternalCompactionMetadata(Set<StoredTabletFile> jobFiles, TabletFile compactTmpName, + TabletFile newFile, String compactorId, CompactionKind kind, long priority, + CompactionExecutorId ceid) { + this.jobFiles = Objects.requireNonNull(jobFiles); + this.compactTmpName = Objects.requireNonNull(compactTmpName); + this.newFile = Objects.requireNonNull(newFile); + this.compactorId = Objects.requireNonNull(compactorId); + this.kind = Objects.requireNonNull(kind); + this.priority = priority; + this.ceid = Objects.requireNonNull(ceid); + } + + public Set<StoredTabletFile> getJobFiles() { + return jobFiles; + } + + public TabletFile getCompactTmpName() { + return compactTmpName; + } + + public TabletFile getNewFile() { + return newFile; + } + + public String getCompactorId() { + return compactorId; + } + + public CompactionKind getKind() { + return kind; + } + + public long getPriority() { + return priority; + } + + public CompactionExecutorId getCompactionExecutorId() { + return ceid; + } + + // This class is used to serialize and deserialize this class using GSon. Any changes to this + // class must consider persisted data. + private static class GSonData { + List<String> inputs; + String tmp; + String dest; + String compactor; + String kind; + String executorId; + long priority; + } + + public String toJson() { + GSonData jData = new GSonData(); + jData.inputs = jobFiles.stream().map(StoredTabletFile::getMetaUpdateDelete).collect(toList()); + jData.tmp = compactTmpName.getMetaInsert(); + jData.dest = newFile.getMetaInsert(); + jData.compactor = compactorId; + jData.kind = kind.name(); + jData.executorId = ceid.getExernalName(); + jData.priority = priority; + return GSON.toJson(jData); + } + + public static ExternalCompactionMetadata fromJson(String json) { + GSonData jData = GSON.fromJson(json, GSonData.class); + return new ExternalCompactionMetadata( + jData.inputs.stream().map(StoredTabletFile::new).collect(toSet()), + new TabletFile(new Path(jData.tmp)), new TabletFile(new Path(jData.dest)), jData.compactor, + CompactionKind.valueOf(jData.kind), jData.priority, + CompactionExecutorId.externalId(jData.executorId)); + } +} diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java index 795ea03..02f8470 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java @@ -327,6 +327,11 @@ public class MetadataSchema { public static final Text NAME = new Text("chopped"); public static final ColumnFQ CHOPPED_COLUMN = new ColumnFQ(NAME, new Text("chopped")); } + + public static class ExternalCompactionColumnFamily { + public static final String STR_NAME = "ecomp"; + public static final Text NAME = new Text(STR_NAME); + } } /** diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java index ee5283c..e075eff 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java @@ -19,7 +19,6 @@ package org.apache.accumulo.core.metadata.schema; import static java.nio.charset.StandardCharsets.UTF_8; -import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.SuspendLocationColumn; import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.COMPACT_QUAL; import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.DIRECTORY_QUAL; import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.FLUSH_QUAL; @@ -62,11 +61,13 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.Bu import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ClonedColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CurrentLocationColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ExternalCompactionColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.FutureLocationColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LastLocationColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ScanFileColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.SuspendLocationColumn; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily; import org.apache.accumulo.core.tabletserver.log.LogEntry; import org.apache.accumulo.core.util.HostAndPort; @@ -109,6 +110,7 @@ public class TabletMetadata { private List<LogEntry> logs; private OptionalLong compact = OptionalLong.empty(); private Double splitRatio = null; + private Map<ExternalCompactionId,ExternalCompactionMetadata> extCompactions; public enum LocationType { CURRENT, FUTURE, LAST @@ -129,7 +131,8 @@ public class TabletMetadata { LOGS, COMPACT_ID, SPLIT_RATIO, - SUSPEND + SUSPEND, + ECOMP } public static class Location extends TServerInstance { @@ -292,6 +295,11 @@ public class TabletMetadata { } } + public Map<ExternalCompactionId,ExternalCompactionMetadata> getExternalCompactions() { + ensureFetched(ColumnType.ECOMP); + return extCompactions; + } + @VisibleForTesting public static TabletMetadata convertRow(Iterator<Entry<Key,Value>> rowIter, EnumSet<ColumnType> fetchedColumns, boolean buildKeyValueMap) { @@ -306,6 +314,7 @@ public class TabletMetadata { var filesBuilder = ImmutableMap.<StoredTabletFile,DataFileValue>builder(); var scansBuilder = ImmutableList.<StoredTabletFile>builder(); var logsBuilder = ImmutableList.<LogEntry>builder(); + var extCompBuilder = ImmutableMap.<ExternalCompactionId,ExternalCompactionMetadata>builder(); final var loadedFilesBuilder = ImmutableMap.<TabletFile,Long>builder(); ByteSequence row = null; @@ -392,6 +401,10 @@ public class TabletMetadata { case LogColumnFamily.STR_NAME: logsBuilder.add(LogEntry.fromMetaWalEntry(kv)); break; + case ExternalCompactionColumnFamily.STR_NAME: + extCompBuilder.put(ExternalCompactionId.of(qual), + ExternalCompactionMetadata.fromJson(val)); + break; default: throw new IllegalStateException("Unexpected family " + fam); } @@ -402,6 +415,7 @@ public class TabletMetadata { te.fetchedCols = fetchedColumns; te.scans = scansBuilder.build(); te.logs = logsBuilder.build(); + te.extCompactions = extCompBuilder.build(); if (buildKeyValueMap) { te.keyValues = kvBuilder.build(); } diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java index f67f81a..1d6fc89 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java @@ -51,6 +51,7 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.Bu import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ClonedColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CurrentLocationColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ExternalCompactionColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.FutureLocationColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LastLocationColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily; @@ -198,6 +199,9 @@ public class TabletsMetadata implements Iterable<TabletMetadata>, AutoCloseable case TIME: qualifiers.add(TIME_COLUMN); break; + case ECOMP: + families.add(ExternalCompactionColumnFamily.NAME); + break; default: throw new IllegalArgumentException("Unknown col type " + colToFetch); diff --git a/core/src/main/java/org/apache/accumulo/core/spi/compaction/CompactionExecutorId.java b/core/src/main/java/org/apache/accumulo/core/spi/compaction/CompactionExecutorId.java index 498c8b4..dd07577 100644 --- a/core/src/main/java/org/apache/accumulo/core/spi/compaction/CompactionExecutorId.java +++ b/core/src/main/java/org/apache/accumulo/core/spi/compaction/CompactionExecutorId.java @@ -36,6 +36,7 @@ public class CompactionExecutorId extends AbstractId<CompactionExecutorId> { super(canonical); } + // CBUG maybe all of the following methods should not be in SPI public boolean isExernalId() { return canonical().startsWith("e."); } diff --git a/core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionJobImpl.java b/core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionJobImpl.java index 62ba955..16628e9 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionJobImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/util/compaction/CompactionJobImpl.java @@ -27,6 +27,8 @@ import org.apache.accumulo.core.spi.compaction.CompactionExecutorId; import org.apache.accumulo.core.spi.compaction.CompactionJob; import org.apache.accumulo.core.spi.compaction.CompactionKind; +import com.google.common.base.Preconditions; + /** * An immutable object that describes what files to compact and where to compact them. * @@ -40,14 +42,25 @@ public class CompactionJobImpl implements CompactionJob { private final Set<CompactableFile> files; private final CompactionKind kind; private boolean selectedAll; + private boolean hasSelectedAll; - CompactionJobImpl(long priority, CompactionExecutorId executor, Collection<CompactableFile> files, - CompactionKind kind, boolean selectedAllFiles) { + public CompactionJobImpl(long priority, CompactionExecutorId executor, + Collection<CompactableFile> files, CompactionKind kind, boolean selectedAllFiles) { this.priority = priority; this.executor = Objects.requireNonNull(executor); this.files = Set.copyOf(files); - this.kind = kind; + this.kind = Objects.requireNonNull(kind); this.selectedAll = selectedAllFiles; + this.hasSelectedAll = true; + } + + public CompactionJobImpl(long priority, CompactionExecutorId executor, + Collection<CompactableFile> files, CompactionKind kind) { + this.priority = priority; + this.executor = Objects.requireNonNull(executor); + this.files = Set.copyOf(files); + this.kind = Objects.requireNonNull(kind); + this.hasSelectedAll = false; } @Override @@ -85,6 +98,7 @@ public class CompactionJobImpl implements CompactionJob { } public boolean selectedAll() { + Preconditions.checkState(hasSelectedAll); return selectedAll; } diff --git a/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java b/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java index c9acb53..d18407d 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java +++ b/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java @@ -40,6 +40,7 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.Ch import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ClonedColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CurrentLocationColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ExternalCompactionColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.FutureLocationColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LastLocationColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily; @@ -95,7 +96,8 @@ public class MetadataConstraints implements Constraint { LastLocationColumnFamily.NAME, FutureLocationColumnFamily.NAME, ChoppedColumnFamily.NAME, - ClonedColumnFamily.NAME)); + ClonedColumnFamily.NAME, + ExternalCompactionColumnFamily.NAME)); // @formatter:on private static boolean isValidColumn(ColumnUpdate cu) { diff --git a/server/base/src/main/java/org/apache/accumulo/server/metadata/TabletMutatorBase.java b/server/base/src/main/java/org/apache/accumulo/server/metadata/TabletMutatorBase.java index fda24bd..8da4d41 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/metadata/TabletMutatorBase.java +++ b/server/base/src/main/java/org/apache/accumulo/server/metadata/TabletMutatorBase.java @@ -26,11 +26,15 @@ import org.apache.accumulo.core.metadata.SuspendingTServer; import org.apache.accumulo.core.metadata.TServerInstance; import org.apache.accumulo.core.metadata.TabletFile; import org.apache.accumulo.core.metadata.schema.Ample; +import org.apache.accumulo.core.metadata.schema.Ample.TabletMutator; import org.apache.accumulo.core.metadata.schema.DataFileValue; +import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; +import org.apache.accumulo.core.metadata.schema.ExternalCompactionMetadata; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.BulkFileColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ChoppedColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CurrentLocationColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ExternalCompactionColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.FutureLocationColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LastLocationColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily; @@ -222,6 +226,19 @@ public abstract class TabletMutatorBase implements Ample.TabletMutator { return this; } + @Override + public TabletMutator putExternalCompaction(ExternalCompactionId ecid, + ExternalCompactionMetadata ecMeta) { + mutation.put(ExternalCompactionColumnFamily.STR_NAME, ecid.canonical(), ecMeta.toJson()); + return this; + } + + @Override + public TabletMutator deleteExternalCompaction(ExternalCompactionId ecid) { + mutation.putDelete(ExternalCompactionColumnFamily.STR_NAME, ecid.canonical()); + return this; + } + protected Mutation getMutation() { updatesEnabled = false; return mutation; diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/ManagerMetadataUtil.java b/server/base/src/main/java/org/apache/accumulo/server/util/ManagerMetadataUtil.java index ab07c06..0623020 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/ManagerMetadataUtil.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/ManagerMetadataUtil.java @@ -26,6 +26,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Optional; import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; @@ -46,6 +47,7 @@ import org.apache.accumulo.core.metadata.TServerInstance; import org.apache.accumulo.core.metadata.TabletFile; import org.apache.accumulo.core.metadata.schema.Ample.TabletMutator; import org.apache.accumulo.core.metadata.schema.DataFileValue; +import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataTime; @@ -182,7 +184,7 @@ public class ManagerMetadataUtil { public static void replaceDatafiles(ServerContext context, KeyExtent extent, Set<StoredTabletFile> datafilesToDelete, Set<StoredTabletFile> scanFiles, TabletFile path, Long compactionId, DataFileValue size, String address, TServerInstance lastLocation, - ZooLock zooLock) { + ZooLock zooLock, Optional<ExternalCompactionId> ecid) { context.getAmple().putGcCandidates(extent.tableId(), datafilesToDelete); @@ -204,6 +206,9 @@ public class ManagerMetadataUtil { if (lastLocation != null && !lastLocation.equals(self)) tablet.deleteLocation(lastLocation, LocationType.LAST); + if (ecid.isPresent()) + tablet.deleteExternalCompaction(ecid.get()); + tablet.putZooLock(zooLock); tablet.mutate(); diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java index 4fad8ca..3f39e96 100644 --- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java +++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java @@ -41,6 +41,7 @@ import org.apache.accumulo.core.compaction.thrift.UnknownCompactionIdException; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.metadata.TServerInstance; +import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; import org.apache.accumulo.core.rpc.ThriftUtil; import org.apache.accumulo.core.tabletserver.thrift.CompactionStats; import org.apache.accumulo.core.tabletserver.thrift.TCompactionQueueSummary; @@ -55,7 +56,6 @@ import org.apache.accumulo.server.AbstractServer; import org.apache.accumulo.server.GarbageCollectionLogger; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.ServerOpts; -import org.apache.accumulo.server.compaction.ExternalCompactionId; import org.apache.accumulo.server.compaction.ExternalCompactionUtil; import org.apache.accumulo.server.compaction.RetryableThriftCall; import org.apache.accumulo.server.compaction.RetryableThriftCall.RetriesExceededException; diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftClientHandler.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftClientHandler.java index d8cebd1..71ec2b6 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftClientHandler.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftClientHandler.java @@ -91,6 +91,7 @@ import org.apache.accumulo.core.master.thrift.TabletServerStatus; import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.metadata.RootTable; import org.apache.accumulo.core.metadata.TabletFile; +import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.securityImpl.thrift.TCredentials; @@ -122,7 +123,6 @@ import org.apache.accumulo.fate.zookeeper.ZooUtil; import org.apache.accumulo.server.client.ClientServiceHandler; import org.apache.accumulo.server.compaction.CompactionInfo; import org.apache.accumulo.server.compaction.Compactor; -import org.apache.accumulo.server.compaction.ExternalCompactionId; import org.apache.accumulo.server.conf.TableConfiguration; import org.apache.accumulo.server.data.ServerMutation; import org.apache.accumulo.server.fs.TooManyFilesException; diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/Compactable.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/Compactable.java index f8cc442..07ac59e 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/Compactable.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/Compactable.java @@ -32,11 +32,11 @@ import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.metadata.CompactableFileImpl; import org.apache.accumulo.core.metadata.StoredTabletFile; import org.apache.accumulo.core.metadata.schema.DataFileValue; +import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; import org.apache.accumulo.core.spi.compaction.CompactionJob; import org.apache.accumulo.core.spi.compaction.CompactionKind; import org.apache.accumulo.core.spi.compaction.CompactionServiceId; import org.apache.accumulo.core.util.ratelimit.RateLimiter; -import org.apache.accumulo.server.compaction.ExternalCompactionId; /** * Interface between compaction service and tablet. diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java index db4d97d..cfbb1ee 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java @@ -32,6 +32,7 @@ import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.ConfigurationTypeHelper; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; import org.apache.accumulo.core.spi.compaction.CompactionExecutorId; import org.apache.accumulo.core.spi.compaction.CompactionKind; import org.apache.accumulo.core.spi.compaction.CompactionServiceId; @@ -41,7 +42,6 @@ import org.apache.accumulo.core.tabletserver.thrift.TCompactionQueueSummary; import org.apache.accumulo.core.util.threads.Threads; import org.apache.accumulo.fate.util.Retry; import org.apache.accumulo.server.ServerContext; -import org.apache.accumulo.server.compaction.ExternalCompactionId; import org.apache.accumulo.tserver.metrics.CompactionExecutorsMetrics; import org.apache.accumulo.tserver.tablet.Tablet; import org.slf4j.Logger; @@ -428,6 +428,10 @@ public class CompactionManager { return getExternalExecutor(CompactionExecutorId.externalId(queueName)); } + public void registerExternalCompaction(ExternalCompactionId ecid, KeyExtent externt) { + runningExternalCompactions.put(ecid, externt); + } + public void commitExternalCompaction(ExternalCompactionId extCompactionId, Map<KeyExtent,Tablet> currentTablets, long fileSize, long entries) { KeyExtent extent = runningExternalCompactions.get(extCompactionId); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionJob.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionJob.java index 3c02127..67830d2 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionJob.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/ExternalCompactionJob.java @@ -28,13 +28,13 @@ import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.iteratorsImpl.system.SystemIteratorUtil; import org.apache.accumulo.core.metadata.StoredTabletFile; import org.apache.accumulo.core.metadata.TabletFile; +import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; import org.apache.accumulo.core.spi.compaction.CompactionKind; import org.apache.accumulo.core.tabletserver.thrift.CompactionReason; import org.apache.accumulo.core.tabletserver.thrift.CompactionType; import org.apache.accumulo.core.tabletserver.thrift.InputFile; import org.apache.accumulo.core.tabletserver.thrift.IteratorConfig; import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob; -import org.apache.accumulo.server.compaction.ExternalCompactionId; public class ExternalCompactionJob { diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java index b9439a7..21ac1de 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java @@ -49,6 +49,8 @@ import org.apache.accumulo.core.metadata.CompactableFileImpl; import org.apache.accumulo.core.metadata.StoredTabletFile; import org.apache.accumulo.core.metadata.TabletFile; import org.apache.accumulo.core.metadata.schema.DataFileValue; +import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; +import org.apache.accumulo.core.metadata.schema.ExternalCompactionMetadata; import org.apache.accumulo.core.spi.common.ServiceEnvironment; import org.apache.accumulo.core.spi.compaction.CompactionDispatcher.DispatchParameters; import org.apache.accumulo.core.spi.compaction.CompactionJob; @@ -60,7 +62,6 @@ import org.apache.accumulo.core.util.ratelimit.RateLimiter; import org.apache.accumulo.server.ServiceEnvironmentImpl; import org.apache.accumulo.server.compaction.CompactionStats; import org.apache.accumulo.server.compaction.Compactor.CompactionCanceledException; -import org.apache.accumulo.server.compaction.ExternalCompactionId; import org.apache.accumulo.server.util.MetadataTableUtil; import org.apache.accumulo.tserver.compactions.Compactable; import org.apache.accumulo.tserver.compactions.CompactionManager; @@ -119,6 +120,15 @@ public class CompactableImpl implements Compactable { private volatile boolean closed = false; + // TODO move to top of class + private static class ExternalCompactionInfo { + ExternalCompactionMetadata meta; + CompactionJob job; + } + + private Map<ExternalCompactionId,ExternalCompactionInfo> externalCompactions = + new ConcurrentHashMap<>(); + // This interface exists for two purposes. First it allows abstraction of new and old // implementations for user pluggable file selection code. Second it facilitates placing code // outside of this class. @@ -131,9 +141,32 @@ public class CompactableImpl implements Compactable { } - public CompactableImpl(Tablet tablet, CompactionManager manager) { + public CompactableImpl(Tablet tablet, CompactionManager manager, + Map<ExternalCompactionId,ExternalCompactionMetadata> extCompactions) { this.tablet = tablet; this.manager = manager; + + var dataFileSizes = tablet.getDatafileManager().getDatafileSizes(); + + extCompactions.forEach((ecid, ecMeta) -> { + // CBUG ensure files for each external compaction are disjoint + allCompactingFiles.addAll(ecMeta.getJobFiles()); + Collection<CompactableFile> files = ecMeta.getJobFiles().stream() + .map(f -> new CompactableFileImpl(f, dataFileSizes.get(f))).collect(Collectors.toList()); + CompactionJob job = new CompactionJobImpl(ecMeta.getPriority(), + ecMeta.getCompactionExecutorId(), files, ecMeta.getKind()); + runnningJobs.add(job); + + ExternalCompactionInfo ecInfo = new ExternalCompactionInfo(); + ecInfo.job = job; + ecInfo.meta = ecMeta; + externalCompactions.put(ecid, ecInfo); + + manager.registerExternalCompaction(ecid, getExtent()); + }); + + compactionRunning = !allCompactingFiles.isEmpty(); + this.servicesInUse = Suppliers.memoizeWithExpiration(() -> { HashSet<CompactionServiceId> servicesIds = new HashSet<>(); for (CompactionKind kind : CompactionKind.values()) { @@ -553,9 +586,6 @@ public class CompactableImpl implements Compactable { CompactionHelper localHelper; List<IteratorSetting> iters = List.of(); CompactionConfig localCompactionCfg; - public TabletFile compactTmpName; - public CompactionJob job; - public TabletFile newFile; } private CompactionInfo reserveFilesForCompaction(CompactionServiceId service, CompactionJob job) { @@ -720,9 +750,6 @@ public class CompactableImpl implements Compactable { } } - // TODO move to top of class - private Map<ExternalCompactionId,CompactionInfo> externalCompactions = new ConcurrentHashMap<>(); - @Override public ExternalCompactionJob reserveExternalCompaction(CompactionServiceId service, CompactionJob job, String compactorId) { @@ -734,18 +761,25 @@ public class CompactableImpl implements Compactable { // CBUG add external compaction info to metadata table try { // CBUG share code w/ CompactableUtil and/or move there - cInfo.newFile = tablet.getNextMapFilename(!cInfo.propogateDeletes ? "A" : "C"); - cInfo.compactTmpName = new TabletFile(new Path(cInfo.newFile.getMetaInsert() + "_tmp")); + var newFile = tablet.getNextMapFilename(!cInfo.propogateDeletes ? "A" : "C"); + var compactTmpName = new TabletFile(new Path(newFile.getMetaInsert() + "_tmp")); ExternalCompactionId externalCompactionId = ExternalCompactionId.generate(); - cInfo.job = job; + ExternalCompactionInfo ecInfo = new ExternalCompactionInfo(); + + ecInfo.meta = new ExternalCompactionMetadata(cInfo.jobFiles, compactTmpName, newFile, + compactorId, job.getKind(), job.getPriority(), job.getExecutor()); + tablet.getContext().getAmple().mutateTablet(getExtent()) + .putExternalCompaction(externalCompactionId, ecInfo.meta).mutate(); - externalCompactions.put(externalCompactionId, cInfo); + ecInfo.job = job; + + externalCompactions.put(externalCompactionId, ecInfo); // CBUG because this is an RPC the return may never get to the caller... however the caller // may be alive.... maybe the caller can set the externalCompactionId it working on in ZK - return new ExternalCompactionJob(cInfo.jobFiles, cInfo.propogateDeletes, cInfo.compactTmpName, + return new ExternalCompactionJob(cInfo.jobFiles, cInfo.propogateDeletes, compactTmpName, getExtent(), externalCompactionId, job.getPriority(), job.getKind(), cInfo.iters); } catch (Exception e) { @@ -758,25 +792,34 @@ public class CompactableImpl implements Compactable { public void commitExternalCompaction(ExternalCompactionId extCompactionId, long fileSize, long entries) { // CBUG double check w/ java docs that only one thread can remove - CompactionInfo cInfo = externalCompactions.remove(extCompactionId); - - if (cInfo != null) { - log.debug("Attempting to commit external compaction {}", extCompactionId); - // TODO do a sanity check that files exists in dfs? - StoredTabletFile metaFile = null; - try { - metaFile = tablet.getDatafileManager().bringMajorCompactionOnline(cInfo.jobFiles, - cInfo.compactTmpName, cInfo.newFile, compactionId, - new DataFileValue(fileSize, entries)); - TabletLogger.compacted(getExtent(), cInfo.job, metaFile); - } catch (Exception e) { - metaFile = null; - log.error("Error committing external compaction {}", extCompactionId, e); - throw new RuntimeException(e); - } finally { - completeCompaction(cInfo.job, cInfo.jobFiles, metaFile); - log.debug("Completed commit of external compaction{}", extCompactionId); + ExternalCompactionInfo ecInfo = externalCompactions.get(extCompactionId); + + if (ecInfo != null) { + synchronized (ecInfo) { + if (!externalCompactions.containsKey(extCompactionId)) { + // since this method is called by RPCs there could be multiple concurrent calls so defend + // against that + return; + } + log.debug("Attempting to commit external compaction {}", extCompactionId); + // TODO do a sanity check that files exists in dfs? + StoredTabletFile metaFile = null; + try { + metaFile = tablet.getDatafileManager().bringMajorCompactionOnline( + ecInfo.meta.getJobFiles(), ecInfo.meta.getCompactTmpName(), ecInfo.meta.getNewFile(), + compactionId, new DataFileValue(fileSize, entries), Optional.of(extCompactionId)); + TabletLogger.compacted(getExtent(), ecInfo.job, metaFile); + } catch (Exception e) { + metaFile = null; + log.error("Error committing external compaction {}", extCompactionId, e); + throw new RuntimeException(e); + } finally { + completeCompaction(ecInfo.job, ecInfo.meta.getJobFiles(), metaFile); + externalCompactions.remove(extCompactionId); + log.debug("Completed commit of external compaction{}", extCompactionId); + } } + } else { log.debug("Ignoring request to commit external compaction that is unknown {}", extCompactionId); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java index d6da2f8..f5b0b5e 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java @@ -616,7 +616,7 @@ public class CompactableUtils { metaFile = tablet.getDatafileManager().bringMajorCompactionOnline(compactFiles.keySet(), compactTmpName, newFile, compactionId, - new DataFileValue(mcs.getFileSize(), mcs.getEntriesWritten())); + new DataFileValue(mcs.getFileSize(), mcs.getEntriesWritten()), Optional.empty()); return metaFile; } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java index 8a00228..56483bc 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java @@ -27,6 +27,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Map.Entry; +import java.util.Optional; import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; @@ -40,6 +41,7 @@ import org.apache.accumulo.core.metadata.StoredTabletFile; import org.apache.accumulo.core.metadata.TServerInstance; import org.apache.accumulo.core.metadata.TabletFile; import org.apache.accumulo.core.metadata.schema.DataFileValue; +import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; import org.apache.accumulo.core.replication.ReplicationConfigurationUtil; import org.apache.accumulo.core.util.MapCounter; import org.apache.accumulo.core.util.Pair; @@ -396,8 +398,8 @@ class DatafileManager { } StoredTabletFile bringMajorCompactionOnline(Set<StoredTabletFile> oldDatafiles, - TabletFile tmpDatafile, TabletFile newDatafile, Long compactionId, DataFileValue dfv) - throws IOException { + TabletFile tmpDatafile, TabletFile newDatafile, Long compactionId, DataFileValue dfv, + Optional<ExternalCompactionId> ecid) throws IOException { final KeyExtent extent = tablet.getExtent(); VolumeManager vm = tablet.getTabletServer().getContext().getVolumeManager(); long t1, t2; @@ -453,7 +455,7 @@ class DatafileManager { ManagerMetadataUtil.replaceDatafiles(tablet.getContext(), extent, oldDatafiles, filesInUseByScans, newFile, compactionId, dfv, tablet.getTabletServer().getClientAddressString(), lastLocation, - tablet.getTabletServer().getLock()); + tablet.getTabletServer().getLock(), ecid); tablet.setLastCompactionID(compactionId); removeFilesAfterScan(filesInUseByScans); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java index c3cd98d..b50b48e 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java @@ -44,6 +44,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.Durability; @@ -75,6 +76,8 @@ import org.apache.accumulo.core.metadata.StoredTabletFile; import org.apache.accumulo.core.metadata.TServerInstance; import org.apache.accumulo.core.metadata.TabletFile; import org.apache.accumulo.core.metadata.schema.DataFileValue; +import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; +import org.apache.accumulo.core.metadata.schema.ExternalCompactionMetadata; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataTime; import org.apache.accumulo.core.protobuf.ProtobufUtil; @@ -422,24 +425,35 @@ public class Tablet { // look for hints of a failure on the previous tablet server if (!logEntries.isEmpty()) { // look for any temp files hanging around - removeOldTemporaryFiles(); + removeOldTemporaryFiles(data.getExternalCompactions()); } - this.compactable = new CompactableImpl(this, tabletServer.getCompactionManager()); + this.compactable = new CompactableImpl(this, tabletServer.getCompactionManager(), + data.getExternalCompactions()); } public ServerContext getContext() { return context; } - private void removeOldTemporaryFiles() { + private void removeOldTemporaryFiles( + Map<ExternalCompactionId,ExternalCompactionMetadata> externalCompactions) { // remove any temporary files created by a previous tablet server try { + + var extCompactionFiles = externalCompactions.values().stream() + .map(ecMeta -> ecMeta.getCompactTmpName().getPath()).collect(Collectors.toSet()); + for (Volume volume : getTabletServer().getVolumeManager().getVolumes()) { String dirUri = volume.getBasePath() + Constants.HDFS_TABLES_DIR + Path.SEPARATOR + extent.tableId() + Path.SEPARATOR + dirName; for (FileStatus tmp : volume.getFileSystem().globStatus(new Path(dirUri, "*_tmp"))) { + + if (extCompactionFiles.contains(tmp.getPath())) { + continue; + } + try { log.debug("Removing old temp file {}", tmp.getPath()); volume.getFileSystem().delete(tmp.getPath(), false); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletData.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletData.java index 2f7cac3..59d9360 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletData.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletData.java @@ -30,6 +30,8 @@ import org.apache.accumulo.core.metadata.StoredTabletFile; import org.apache.accumulo.core.metadata.TServerInstance; import org.apache.accumulo.core.metadata.TabletFile; import org.apache.accumulo.core.metadata.schema.DataFileValue; +import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; +import org.apache.accumulo.core.metadata.schema.ExternalCompactionMetadata; import org.apache.accumulo.core.metadata.schema.MetadataTime; import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.tabletserver.log.LogEntry; @@ -48,6 +50,7 @@ public class TabletData { private Map<Long,List<TabletFile>> bulkImported = new HashMap<>(); private long splitTime = 0; private String directoryName = null; + private Map<ExternalCompactionId,ExternalCompactionMetadata> extCompactions; // Read tablet data from metadata tables public TabletData(TabletMetadata meta) { @@ -67,6 +70,8 @@ public class TabletData { meta.getLoaded().forEach((path, txid) -> { bulkImported.computeIfAbsent(txid, k -> new ArrayList<>()).add(path); }); + + this.extCompactions = meta.getExternalCompactions(); } // Data pulled from an existing tablet to make a split @@ -81,6 +86,7 @@ public class TabletData { this.lastLocation = lastLocation; this.bulkImported = bulkIngestedFiles; this.splitTime = System.currentTimeMillis(); + this.extCompactions = Map.of(); } public MetadataTime getTime() { @@ -122,4 +128,8 @@ public class TabletData { public long getSplitTime() { return splitTime; } + + public Map<ExternalCompactionId,ExternalCompactionMetadata> getExternalCompactions() { + return extCompactions; + } }