This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch 1451-external-compactions-feature
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to 
refs/heads/1451-external-compactions-feature by this push:
     new 955c44a  fixed #2019 repopulate selected set for ext compactions w/ 
other improvements
955c44a is described below

commit 955c44a7e409b34f25d9fb5c9786e907083502fa
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Thu Apr 15 20:58:37 2021 -0400

    fixed #2019 repopulate selected set for ext compactions w/ other 
improvements
    
      * repopulated set of selected files for external compaction
      * added lots of sanity checks to existing external compactions on tablet 
load
      * added IT to compact 200 tablets externally, test is timing out
---
 .../schema/ExternalCompactionMetadata.java         |  48 ++++-
 .../coordinator/DeadCompactionDetector.java        |   1 +
 .../accumulo/tserver/tablet/CompactableImpl.java   | 231 ++++++++++++++++++---
 .../accumulo/tserver/tablet/CompactableUtils.java  |  10 +-
 .../accumulo/tserver/tablet/DatafileManager.java   |  16 +-
 .../apache/accumulo/test/ExternalCompactionIT.java |  64 +++++-
 6 files changed, 324 insertions(+), 46 deletions(-)

diff --git 
a/core/src/main/java/org/apache/accumulo/core/metadata/schema/ExternalCompactionMetadata.java
 
b/core/src/main/java/org/apache/accumulo/core/metadata/schema/ExternalCompactionMetadata.java
index d1d127c..d64a9a1 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/metadata/schema/ExternalCompactionMetadata.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/metadata/schema/ExternalCompactionMetadata.java
@@ -39,29 +39,42 @@ public class ExternalCompactionMetadata {
   private static final Gson GSON = new GsonBuilder().create();
 
   private final Set<StoredTabletFile> jobFiles;
+  private final Set<StoredTabletFile> nextFiles;
   private final TabletFile compactTmpName;
   private final TabletFile newFile;
   private final String compactorId;
   private final CompactionKind kind;
   private final long priority;
   private final CompactionExecutorId ceid;
-
-  public ExternalCompactionMetadata(Set<StoredTabletFile> jobFiles, TabletFile 
compactTmpName,
-      TabletFile newFile, String compactorId, CompactionKind kind, long 
priority,
-      CompactionExecutorId ceid) {
+  private final boolean propogateDeletes;
+  private final boolean selectedAll;
+  private final Long compactionId;
+
+  public ExternalCompactionMetadata(Set<StoredTabletFile> jobFiles, 
Set<StoredTabletFile> nextFiles,
+      TabletFile compactTmpName, TabletFile newFile, String compactorId, 
CompactionKind kind,
+      long priority, CompactionExecutorId ceid, boolean propogateDeletes, 
boolean selectedAll,
+      Long compactionId) {
     this.jobFiles = Objects.requireNonNull(jobFiles);
+    this.nextFiles = Objects.requireNonNull(nextFiles);
     this.compactTmpName = Objects.requireNonNull(compactTmpName);
     this.newFile = Objects.requireNonNull(newFile);
     this.compactorId = Objects.requireNonNull(compactorId);
     this.kind = Objects.requireNonNull(kind);
     this.priority = priority;
     this.ceid = Objects.requireNonNull(ceid);
+    this.propogateDeletes = propogateDeletes;
+    this.selectedAll = selectedAll;
+    this.compactionId = compactionId;
   }
 
   public Set<StoredTabletFile> getJobFiles() {
     return jobFiles;
   }
 
+  public Set<StoredTabletFile> getNextFiles() {
+    return nextFiles;
+  }
+
   public TabletFile getCompactTmpName() {
     return compactTmpName;
   }
@@ -86,37 +99,62 @@ public class ExternalCompactionMetadata {
     return ceid;
   }
 
+  public boolean isPropogateDeletes() {
+    return propogateDeletes;
+  }
+
+  public boolean isSelectedAll() {
+    return selectedAll;
+  }
+
+  public Long getCompactionId() {
+    return compactionId;
+  }
+
   // This class is used to serialize and deserialize this class using GSon. 
Any changes to this
   // class must consider persisted data.
   private static class GSonData {
     List<String> inputs;
+    List<String> nextFiles;
     String tmp;
     String dest;
     String compactor;
     String kind;
     String executorId;
     long priority;
+    boolean propDels;
+    boolean selectedAll;
+    Long compactionId;
   }
 
   public String toJson() {
     GSonData jData = new GSonData();
+
     jData.inputs = 
jobFiles.stream().map(StoredTabletFile::getMetaUpdateDelete).collect(toList());
+    jData.nextFiles =
+        
nextFiles.stream().map(StoredTabletFile::getMetaUpdateDelete).collect(toList());
     jData.tmp = compactTmpName.getMetaInsert();
     jData.dest = newFile.getMetaInsert();
     jData.compactor = compactorId;
     jData.kind = kind.name();
     jData.executorId = ceid.getExernalName();
     jData.priority = priority;
+    jData.propDels = propogateDeletes;
+    jData.selectedAll = selectedAll;
+    jData.compactionId = compactionId;
     return GSON.toJson(jData);
   }
 
   public static ExternalCompactionMetadata fromJson(String json) {
     GSonData jData = GSON.fromJson(json, GSonData.class);
+
     return new ExternalCompactionMetadata(
         jData.inputs.stream().map(StoredTabletFile::new).collect(toSet()),
+        jData.nextFiles.stream().map(StoredTabletFile::new).collect(toSet()),
         new TabletFile(new Path(jData.tmp)), new TabletFile(new 
Path(jData.dest)), jData.compactor,
         CompactionKind.valueOf(jData.kind), jData.priority,
-        CompactionExecutorId.externalId(jData.executorId));
+        CompactionExecutorId.externalId(jData.executorId), jData.propDels, 
jData.selectedAll,
+        jData.compactionId);
   }
 
   @Override
diff --git 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/DeadCompactionDetector.java
 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/DeadCompactionDetector.java
index 1feb7f5..61e27c6 100644
--- 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/DeadCompactionDetector.java
+++ 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/DeadCompactionDetector.java
@@ -125,6 +125,7 @@ public class DeadCompactionDetector {
     danglingEcids.forEach(
         ecid -> log.debug("Detected dangling external compaction final state 
marker {}", ecid));
 
+    // todo add logging in impl
     context.getAmple().deleteExternalCompactionFinalStates(danglingEcids);
   }
 
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java
index 177358c..f74a060 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableImpl.java
@@ -22,16 +22,20 @@ import static 
org.apache.accumulo.tserver.TabletStatsKeeper.Operation.MAJOR;
 
 import java.io.IOException;
 import java.io.UncheckedIOException;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Optional;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
@@ -58,6 +62,7 @@ import org.apache.accumulo.core.spi.compaction.CompactionJob;
 import org.apache.accumulo.core.spi.compaction.CompactionKind;
 import org.apache.accumulo.core.spi.compaction.CompactionServiceId;
 import org.apache.accumulo.core.spi.compaction.CompactionServices;
+import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.core.util.compaction.CompactionJobImpl;
 import org.apache.accumulo.core.util.ratelimit.RateLimiter;
 import org.apache.accumulo.server.ServiceEnvironmentImpl;
@@ -69,6 +74,7 @@ import 
org.apache.accumulo.tserver.compactions.CompactionManager;
 import org.apache.accumulo.tserver.compactions.ExternalCompactionJob;
 import org.apache.accumulo.tserver.managermessage.TabletStatusMessage;
 import org.apache.hadoop.fs.Path;
+import org.apache.zookeeper.KeeperException.NoNodeException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -151,23 +157,42 @@ public class CompactableImpl implements Compactable {
 
     var dataFileSizes = tablet.getDatafileManager().getDatafileSizes();
 
+    Map<ExternalCompactionId,String> extCompactionsToRemove = new HashMap<>();
+
+    initializeSelection(extCompactions, tablet, extCompactionsToRemove);
+
+    sanityCheckExternalCompactions(extCompactions, dataFileSizes.keySet(), 
extCompactionsToRemove);
+
+    extCompactionsToRemove.forEach((ecid, reason) -> {
+      log.warn("Removing external compaction {} for {} because {} meta: {}", 
ecid,
+          tablet.getExtent(), reason, extCompactions.get(ecid).toJson());
+    });
+
+    if (!extCompactionsToRemove.isEmpty()) {
+      var tabletMutator = 
tablet.getContext().getAmple().mutateTablet(tablet.getExtent());
+      
extCompactionsToRemove.keySet().forEach(tabletMutator::deleteExternalCompaction);
+      tabletMutator.mutate();
+    }
+
     extCompactions.forEach((ecid, ecMeta) -> {
-      // CBUG ensure files for each external compaction are disjoint
-      allCompactingFiles.addAll(ecMeta.getJobFiles());
-      Collection<CompactableFile> files = ecMeta.getJobFiles().stream()
-          .map(f -> new CompactableFileImpl(f, 
dataFileSizes.get(f))).collect(Collectors.toList());
-      CompactionJob job = new CompactionJobImpl(ecMeta.getPriority(),
-          ecMeta.getCompactionExecutorId(), files, ecMeta.getKind());
-      runnningJobs.add(job);
+      if (!extCompactionsToRemove.containsKey(ecid)) {
+        allCompactingFiles.addAll(ecMeta.getJobFiles());
+        Collection<CompactableFile> files =
+            ecMeta.getJobFiles().stream().map(f -> new CompactableFileImpl(f, 
dataFileSizes.get(f)))
+                .collect(Collectors.toList());
+        CompactionJob job = new CompactionJobImpl(ecMeta.getPriority(),
+            ecMeta.getCompactionExecutorId(), files, ecMeta.getKind());
+        runnningJobs.add(job);
 
-      ExternalCompactionInfo ecInfo = new ExternalCompactionInfo();
-      ecInfo.job = job;
-      ecInfo.meta = ecMeta;
-      externalCompactions.put(ecid, ecInfo);
+        ExternalCompactionInfo ecInfo = new ExternalCompactionInfo();
+        ecInfo.job = job;
+        ecInfo.meta = ecMeta;
+        externalCompactions.put(ecid, ecInfo);
 
-      log.debug("Loaded tablet {} has existing external compaction {} {}", 
getExtent(), ecid,
-          ecMeta);
-      manager.registerExternalCompaction(ecid, getExtent());
+        log.debug("Loaded tablet {} has existing external compaction {} {}", 
getExtent(), ecid,
+            ecMeta);
+        manager.registerExternalCompaction(ecid, getExtent());
+      }
     });
 
     compactionRunning = !allCompactingFiles.isEmpty();
@@ -181,6 +206,29 @@ public class CompactableImpl implements Compactable {
     }, 2, TimeUnit.SECONDS);
   }
 
+  private void sanityCheckExternalCompactions(
+      Map<ExternalCompactionId,ExternalCompactionMetadata> extCompactions,
+      Set<StoredTabletFile> tabletFiles, Map<ExternalCompactionId,String> 
extCompactionsToRemove) {
+
+    Set<StoredTabletFile> seen = new HashSet<>();
+    AtomicBoolean overlap = new AtomicBoolean(false);
+
+    extCompactions.forEach((ecid, ecMeta) -> {
+      if (!tabletFiles.containsAll(ecMeta.getJobFiles())) {
+        extCompactionsToRemove.putIfAbsent(ecid, "Has files outside of tablet 
files");
+      } else if (!Collections.disjoint(seen, ecMeta.getJobFiles())) {
+        overlap.set(true);
+      }
+    });
+
+    if (overlap.get()) {
+      extCompactions.keySet().forEach(ecid -> {
+        extCompactionsToRemove.putIfAbsent(ecid, "Some external compaction 
files overlap");
+      });
+    }
+
+  }
+
   void initiateChop() {
 
     Set<StoredTabletFile> allFiles = tablet.getDatafiles().keySet();
@@ -321,6 +369,131 @@ public class CompactableImpl implements Compactable {
     }
   }
 
+  private void initializeSelection(
+      Map<ExternalCompactionId,ExternalCompactionMetadata> extCompactions, 
Tablet tablet,
+      Map<ExternalCompactionId,String> externalCompactionsToRemove) {
+    CompactionKind extKind = null;
+    boolean unexpectedExternal = false;
+    Set<StoredTabletFile> tmpSelectedFiles = null;
+    Boolean selAll = null;
+    Long cid = null;
+    Boolean propDel = null;
+    int count = 0;
+
+    ArrayList<String> reasons = new ArrayList<>();
+
+    for (Entry<ExternalCompactionId,ExternalCompactionMetadata> entry : 
extCompactions.entrySet()) {
+      var ecMeta = entry.getValue();
+
+      // CBUG what about chop?
+      if (ecMeta.getKind() != CompactionKind.USER && ecMeta.getKind() != 
CompactionKind.SELECTOR) {
+        continue;
+      }
+
+      count++;
+
+      if (extKind == null || extKind == ecMeta.getKind()) {
+        extKind = ecMeta.getKind();
+      } else {
+        reasons.add("Saw USER and SELECTOR");
+        unexpectedExternal = true;
+        break;
+      }
+
+      if (tmpSelectedFiles == null) {
+        tmpSelectedFiles = Sets.union(ecMeta.getJobFiles(), 
ecMeta.getNextFiles());
+      } else if (!Sets.union(ecMeta.getJobFiles(), ecMeta.getNextFiles())
+          .equals(tmpSelectedFiles)) {
+        reasons.add("Selected set of files differs");
+        unexpectedExternal = true;
+        break;
+      }
+
+      if (selAll == null) {
+        selAll = ecMeta.isSelectedAll();
+      } else if (selAll != ecMeta.isSelectedAll()) {
+        unexpectedExternal = true;
+        reasons.add("Disagreement on selectedAll");
+        break;
+      }
+
+      if (ecMeta.getKind() == CompactionKind.USER) {
+        if (ecMeta.getCompactionId() == null) {
+          unexpectedExternal = true;
+          reasons.add("Missing compactionId");
+          break;
+        } else if (cid == null) {
+          cid = ecMeta.getCompactionId();
+        } else if (!cid.equals(ecMeta.getCompactionId())) {
+          unexpectedExternal = true;
+          reasons.add("Disagreement on compactionId");
+          break;
+        }
+      } else if (ecMeta.getCompactionId() != null) {
+        unexpectedExternal = true;
+        reasons.add("Unexpected compactionId");
+        break;
+      }
+
+      if (propDel == null) {
+        propDel = ecMeta.isPropogateDeletes();
+      } else if (propDel != ecMeta.isPropogateDeletes()) {
+        unexpectedExternal = true;
+        reasons.add("Disagreement on propogateDeletes");
+        break;
+      }
+
+    }
+
+    if (propDel != null && !propDel && count > 1) {
+      unexpectedExternal = true;
+      reasons.add("Concurrent compactions not propogatingDeletes");
+    }
+
+    Pair<Long,CompactionConfig> idAndCfg = null;
+    if (extKind != null && extKind == CompactionKind.USER) {
+      try {
+        idAndCfg = tablet.getCompactionID();
+        if (!idAndCfg.getFirst().equals(cid)) {
+          unexpectedExternal = true;
+          reasons.add("Compaction id mismatch with zookeeper");
+        }
+      } catch (NoNodeException e) {
+        unexpectedExternal = true;
+        reasons.add("No compaction id in zookeeper");
+      }
+    }
+
+    if (unexpectedExternal) {
+      String reason = reasons.toString();
+      extCompactions.entrySet().stream().filter(e -> {
+        var kind = e.getValue().getKind();
+        return kind == CompactionKind.SELECTOR || kind == CompactionKind.USER;
+      }).map(Entry::getKey).forEach(ecid -> 
externalCompactionsToRemove.putIfAbsent(ecid, reason));
+      return;
+    }
+
+    if (extKind != null) {
+
+      if (extKind == CompactionKind.USER) {
+        this.chelper = CompactableUtils.getHelper(extKind, tablet, cid, 
idAndCfg.getSecond());
+        this.compactionConfig = idAndCfg.getSecond();
+        this.compactionId = cid;
+      } else if (extKind == CompactionKind.SELECTOR) {
+        this.chelper = CompactableUtils.getHelper(extKind, tablet, null, null);
+      }
+
+      this.selectedFiles.clear();
+      this.selectedFiles.addAll(tmpSelectedFiles);
+      this.selectKind = extKind;
+      this.selectedAll = selAll;
+      this.selectStatus = SpecialStatus.SELECTED;
+
+      log.debug("Selected compaction status initialized from external 
compactions {} {} {} {}",
+          getExtent(), selectStatus, selectedAll, asFileNames(selectedFiles));
+    }
+  }
+
   private void initiateSelection(CompactionKind kind, Long compactionId,
       CompactionConfig compactionConfig) {
     Preconditions.checkArgument(kind == CompactionKind.USER || kind == 
CompactionKind.SELECTOR);
@@ -585,12 +758,13 @@ public class CompactableImpl implements Compactable {
 
   private static class CompactionInfo {
     Set<StoredTabletFile> jobFiles;
-    Long compactionId = null;
     Long checkCompactionId = null;
     boolean propogateDeletes = true;
     CompactionHelper localHelper;
     List<IteratorSetting> iters = List.of();
     CompactionConfig localCompactionCfg;
+    boolean selectedAll;
+    Set<StoredTabletFile> selectedFiles;
   }
 
   private CompactionInfo reserveFilesForCompaction(CompactionServiceId 
service, CompactionJob job) {
@@ -667,6 +841,10 @@ public class CompactableImpl implements Compactable {
               && cInfo.jobFiles.containsAll(selectedFiles)) {
             cInfo.propogateDeletes = false;
           }
+
+          cInfo.selectedFiles = Set.copyOf(selectedFiles);
+          cInfo.selectedAll = selectedAll;
+
           break;
         default:
           if (((CompactionJobImpl) job).selectedAll()) {
@@ -674,11 +852,9 @@ public class CompactableImpl implements Compactable {
             // dropped.
             cInfo.propogateDeletes = false;
           }
-      }
 
-      if (job.getKind() == CompactionKind.USER && selectKind == job.getKind()
-          && selectedFiles.equals(cInfo.jobFiles)) {
-        cInfo.compactionId = this.compactionId;
+          // CBUG what about chop?
+          cInfo.selectedFiles = Set.of();
       }
 
       if (job.getKind() == CompactionKind.USER) {
@@ -735,8 +911,8 @@ public class CompactableImpl implements Compactable {
       TabletLogger.compacting(getExtent(), job, cInfo.localCompactionCfg);
       tablet.incrementStatusMajor();
 
-      metaFile = CompactableUtils.compact(tablet, job, cInfo.jobFiles, 
cInfo.compactionId,
-          cInfo.propogateDeletes, cInfo.localHelper, cInfo.iters,
+      metaFile = CompactableUtils.compact(tablet, job, cInfo.jobFiles, 
cInfo.checkCompactionId,
+          cInfo.selectedFiles, cInfo.propogateDeletes, cInfo.localHelper, 
cInfo.iters,
           new CompactionCheck(service, job.getKind(), 
cInfo.checkCompactionId), readLimiter,
           writeLimiter, stats);
 
@@ -771,8 +947,10 @@ public class CompactableImpl implements Compactable {
 
       ExternalCompactionInfo ecInfo = new ExternalCompactionInfo();
 
-      ecInfo.meta = new ExternalCompactionMetadata(cInfo.jobFiles, 
compactTmpName, newFile,
-          compactorId, job.getKind(), job.getPriority(), job.getExecutor());
+      ecInfo.meta = new ExternalCompactionMetadata(cInfo.jobFiles,
+          Sets.difference(cInfo.selectedFiles, cInfo.jobFiles), 
compactTmpName, newFile,
+          compactorId, job.getKind(), job.getPriority(), job.getExecutor(), 
cInfo.propogateDeletes,
+          cInfo.selectedAll, cInfo.checkCompactionId);
       tablet.getContext().getAmple().mutateTablet(getExtent())
           .putExternalCompaction(externalCompactionId, ecInfo.meta).mutate();
 
@@ -780,8 +958,6 @@ public class CompactableImpl implements Compactable {
 
       externalCompactions.put(externalCompactionId, ecInfo);
 
-      // CBUG because this is an RPC the return may never get to the caller... 
however the caller
-      // may be alive.... maybe the caller can set the externalCompactionId it 
working on in ZK
       return new ExternalCompactionJob(cInfo.jobFiles, cInfo.propogateDeletes, 
compactTmpName,
           getExtent(), externalCompactionId, job.getPriority(), job.getKind(), 
cInfo.iters);
 
@@ -814,9 +990,12 @@ public class CompactableImpl implements Compactable {
         // TODO do a sanity check that files exists in dfs?
         StoredTabletFile metaFile = null;
         try {
+          // possibly do some sanity checks here
           metaFile = tablet.getDatafileManager().bringMajorCompactionOnline(
               ecInfo.meta.getJobFiles(), ecInfo.meta.getCompactTmpName(), 
ecInfo.meta.getNewFile(),
-              compactionId, new DataFileValue(fileSize, entries), 
Optional.of(extCompactionId));
+              ecInfo.meta.getCompactionId(),
+              Sets.union(ecInfo.meta.getJobFiles(), 
ecInfo.meta.getNextFiles()),
+              new DataFileValue(fileSize, entries), 
Optional.of(extCompactionId));
           TabletLogger.compacted(getExtent(), ecInfo.job, metaFile);
         } catch (Exception e) {
           metaFile = null;
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java
index f5b0b5e..45cf61e 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java
@@ -536,10 +536,10 @@ public class CompactableUtils {
   }
 
   static StoredTabletFile compact(Tablet tablet, CompactionJob job, 
Set<StoredTabletFile> jobFiles,
-      Long compactionId, boolean propogateDeletes, 
CompactableImpl.CompactionHelper helper,
-      List<IteratorSetting> iters, CompactionCheck compactionCheck, 
RateLimiter readLimiter,
-      RateLimiter writeLimiter, CompactionStats stats)
-      throws IOException, CompactionCanceledException {
+      Long compactionId, Set<StoredTabletFile> selectedFiles, boolean 
propogateDeletes,
+      CompactableImpl.CompactionHelper helper, List<IteratorSetting> iters,
+      CompactionCheck compactionCheck, RateLimiter readLimiter, RateLimiter 
writeLimiter,
+      CompactionStats stats) throws IOException, CompactionCanceledException {
     StoredTabletFile metaFile;
     CompactionEnv cenv = new CompactionEnv() {
       @Override
@@ -615,7 +615,7 @@ public class CompactableUtils {
     stats.add(mcs);
 
     metaFile = 
tablet.getDatafileManager().bringMajorCompactionOnline(compactFiles.keySet(),
-        compactTmpName, newFile, compactionId,
+        compactTmpName, newFile, compactionId, selectedFiles,
         new DataFileValue(mcs.getFileSize(), mcs.getEntriesWritten()), 
Optional.empty());
     return metaFile;
   }
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java
index 56483bc..b0fb60d 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java
@@ -398,8 +398,9 @@ class DatafileManager {
   }
 
   StoredTabletFile bringMajorCompactionOnline(Set<StoredTabletFile> 
oldDatafiles,
-      TabletFile tmpDatafile, TabletFile newDatafile, Long compactionId, 
DataFileValue dfv,
-      Optional<ExternalCompactionId> ecid) throws IOException {
+      TabletFile tmpDatafile, TabletFile newDatafile, Long compactionId,
+      Set<StoredTabletFile> selectedFiles, DataFileValue dfv, 
Optional<ExternalCompactionId> ecid)
+      throws IOException {
     final KeyExtent extent = tablet.getExtent();
     VolumeManager vm = 
tablet.getTabletServer().getContext().getVolumeManager();
     long t1, t2;
@@ -420,6 +421,9 @@ class DatafileManager {
     TServerInstance lastLocation = null;
     // calling insert to get the new file before inserting into the metadata
     StoredTabletFile newFile = newDatafile.insert();
+
+    Long compactionIdToWrite = null;
+
     synchronized (tablet) {
       t1 = System.currentTimeMillis();
 
@@ -445,6 +449,10 @@ class DatafileManager {
 
       lastLocation = tablet.resetLastLocation();
 
+      if (compactionId != null && Collections.disjoint(selectedFiles, 
datafileSizes.keySet())) {
+        compactionIdToWrite = compactionId;
+      }
+
       t2 = System.currentTimeMillis();
     }
 
@@ -453,10 +461,10 @@ class DatafileManager {
     if (!filesInUseByScans.isEmpty())
       log.debug("Adding scan refs to metadata {} {}", extent, 
filesInUseByScans);
     ManagerMetadataUtil.replaceDatafiles(tablet.getContext(), extent, 
oldDatafiles,
-        filesInUseByScans, newFile, compactionId, dfv,
+        filesInUseByScans, newFile, compactionIdToWrite, dfv,
         tablet.getTabletServer().getClientAddressString(), lastLocation,
         tablet.getTabletServer().getLock(), ecid);
-    tablet.setLastCompactionID(compactionId);
+    tablet.setLastCompactionID(compactionIdToWrite);
     removeFilesAfterScan(filesInUseByScans);
 
     if (log.isTraceEnabled()) {
diff --git 
a/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java 
b/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java
index 9ce8ea9..fc61228 100644
--- a/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/ExternalCompactionIT.java
@@ -60,7 +60,6 @@ import 
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
 import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
 import org.apache.accumulo.core.spi.compaction.DefaultCompactionPlanner;
 import org.apache.accumulo.core.spi.compaction.SimpleCompactionDispatcher;
-import org.apache.accumulo.core.util.threads.Threads;
 import org.apache.accumulo.fate.util.UtilWaitThread;
 import org.apache.accumulo.minicluster.ServerType;
 import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl.ProcessInfo;
@@ -81,6 +80,12 @@ public class ExternalCompactionIT extends 
ConfigurableMacBase {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(ExternalCompactionIT.class);
 
+  private static final int MAX_DATA = 1000;
+
+  private static String row(int r) {
+    return String.format("r:%04d", r);
+  }
+
   @Override
   protected void configure(MiniAccumuloConfigImpl cfg, Configuration 
hadoopCoreSite) {
     cfg.setProperty("tserver.compaction.major.service.cs1.planner",
@@ -151,7 +156,7 @@ public class ExternalCompactionIT extends 
ConfigurableMacBase {
       verify(client, table1, 2);
 
       SortedSet<Text> splits = new TreeSet<>();
-      splits.add(new Text("r:4"));
+      splits.add(new Text(row(MAX_DATA / 2)));
       client.tableOperations().addSplits(table2, splits);
 
       compact(client, table2, 3, "DCQ2", true);
@@ -160,8 +165,44 @@ public class ExternalCompactionIT extends 
ConfigurableMacBase {
     }
   }
 
+  // test times out, need to improve how a single tablet w/ lots of 
compactions is handled...
+  // currently waits 1 min between each compaction per tserver
+  @Ignore
   @Test
-  //@Ignore // waiting for solution to issue #2019
+  public void testManytablets() throws Exception {
+    try (AccumuloClient client = 
Accumulo.newClient().from(getClientProperties()).build()) {
+      String table1 = "ectt4";
+
+      SortedSet<Text> splits = new TreeSet<>();
+      int jump = MAX_DATA / 200;
+
+      for (int r = jump; r < MAX_DATA; r += jump) {
+        splits.add(new Text(row(r)));
+      }
+
+      createTable(client, table1, "cs1", splits);
+
+      writeData(client, table1);
+
+      cluster.exec(Compactor.class, "-q", "DCQ1");
+      cluster.exec(Compactor.class, "-q", "DCQ1");
+      cluster.exec(Compactor.class, "-q", "DCQ1");
+      cluster.exec(Compactor.class, "-q", "DCQ1");
+      cluster.exec(CompactionCoordinator.class);
+
+      compact(client, table1, 3, "DCQ1", true);
+
+      verify(client, table1, 3);
+    }
+  }
+
+  // CBUG add test that configures output file for external compaction
+
+  // CBUG add test that verifies iterators configured on table (not on user 
compaction) are used in
+  // external compaction
+
+  @Test
+  // @Ignore // waiting for solution to issue #2019
   public void testExternalCompactionDeadTServer() throws Exception {
     // Shut down the normal TServers
     getCluster().getProcesses().get(TABLET_SERVER).forEach(p -> {
@@ -254,7 +295,7 @@ public class ExternalCompactionIT extends 
ConfigurableMacBase {
       }
 
       int expectedCount = 0;
-      for (int i = 0; i < 10; i++) {
+      for (int i = 0; i < MAX_DATA; i++) {
         if (i % modulus == 0)
           expectedCount++;
       }
@@ -286,11 +327,22 @@ public class ExternalCompactionIT extends 
ConfigurableMacBase {
 
   }
 
+  private void createTable(AccumuloClient client, String tableName, String 
service,
+      SortedSet<Text> splits) throws Exception {
+    Map<String,String> props =
+        Map.of("table.compaction.dispatcher", 
SimpleCompactionDispatcher.class.getName(),
+            "table.compaction.dispatcher.opts.service", service);
+    NewTableConfiguration ntc = new 
NewTableConfiguration().setProperties(props).withSplits(splits);
+
+    client.tableOperations().create(tableName, ntc);
+
+  }
+
   private void writeData(AccumuloClient client, String table1) throws 
MutationsRejectedException,
       TableNotFoundException, AccumuloException, AccumuloSecurityException {
     try (BatchWriter bw = client.createBatchWriter(table1)) {
-      for (int i = 0; i < 10; i++) {
-        Mutation m = new Mutation("r:" + i);
+      for (int i = 0; i < MAX_DATA; i++) {
+        Mutation m = new Mutation(row(i));
         m.put("", "", "" + i);
         bw.addMutation(m);
       }

Reply via email to