milleruntime commented on a change in pull request #1605:
URL: https://github.com/apache/accumulo/pull/1605#discussion_r440381804



##########
File path: 
core/src/main/java/org/apache/accumulo/core/client/admin/compaction/TooManyDeletesSelector.java
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.accumulo.core.client.admin.compaction;
+
+import static 
org.apache.accumulo.core.client.summary.summarizers.DeletesSummarizer.DELETES_STAT;
+import static 
org.apache.accumulo.core.client.summary.summarizers.DeletesSummarizer.TOTAL_STAT;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.function.Predicate;
+
+import org.apache.accumulo.core.client.rfile.RFile.WriterOptions;
+import org.apache.accumulo.core.client.summary.SummarizerConfiguration;
+import org.apache.accumulo.core.client.summary.Summary;
+import org.apache.accumulo.core.client.summary.summarizers.DeletesSummarizer;
+
+/**
+ * This compaction selector works in concert with the {@link 
DeletesSummarizer}. Using the
+ * statistics from DeleteSummarizer this strategy will compact all files in a 
table when the number
+ * of deletes/non-deletes exceeds a threshold.
+ *
+ * <p>
+ * This strategy has two options. First the {@value #THRESHOLD_OPT} option 
allows setting the point
+ * at which a compaction will be triggered. This options defaults to {@value 
#THRESHOLD_OPT_DEFAULT}
+ * and must be in the range (0.0, 1.0]. The second option is {@value 
#PROCEED_ZERO_NO_SUMMARY_OPT}
+ * which determines if the strategy should proceed when a bulk imported file 
has no summary
+ * information.
+ *
+ * <p>
+ * If the delete summarizer was configured on a table that already had files, 
then those files will
+ * have not summary information. This strategy can still proceed in this 
situation. It will fall
+ * back to using Accumulo's estimated entries per file in this case. For the 
files without summary
+ * information the estimated number of deletes will be zero. This fall back 
method will
+ * underestimate deletes which will not lead to false positives, except for 
the case of bulk
+ * imported files. Accumulo estimates that bulk imported files have zero 
entires. The second option
+ * {@value #PROCEED_ZERO_NO_SUMMARY_OPT} determines if this strategy should 
proceed when it sees
+ * bulk imported files that do not have summary data. This option defaults to
+ * {@value #PROCEED_ZERO_NO_SUMMARY_OPT_DEFAULT}.
+ *
+ * <p>
+ * Bulk files can be generated with summary information by calling
+ * {@code AccumuloFileOutputFormat#setSummarizers(JobConf, 
SummarizerConfiguration...)} or
+ * {@link WriterOptions#withSummarizers(SummarizerConfiguration...)}
+ *
+ * <p>
+ * When using this feature, its important to ensure summary cache is on and 
the summaries fit in the
+ * cache.
+ *
+ * @since 2.1.0
+ */
+public class TooManyDeletesSelector implements CompactionSelector {
+
+  private double threshold;
+
+  private boolean proceed_bns;
+
+  /**
+   * This option should be a floating point number between 1 and 0.
+   */
+  public static final String THRESHOLD_OPT = "threshold";
+
+  /**
+   * The default threshold.
+   */
+  public static final String THRESHOLD_OPT_DEFAULT = ".25";
+
+  public static final String PROCEED_ZERO_NO_SUMMARY_OPT = 
"proceed_zero_no_summary";
+
+  public static final String PROCEED_ZERO_NO_SUMMARY_OPT_DEFAULT = "false";
+
+  @Override
+  public void init(InitParamaters iparams) {
+    var options = iparams.getOptions();
+    this.threshold = Double.parseDouble(options.getOrDefault(THRESHOLD_OPT, 
THRESHOLD_OPT_DEFAULT));
+    if (threshold <= 0.0 || threshold > 1.0) {
+      throw new IllegalArgumentException(
+          "Threshold must be in range (0.0, 1.0], saw : " + threshold);
+    }
+
+    this.proceed_bns = Boolean.parseBoolean(
+        options.getOrDefault(PROCEED_ZERO_NO_SUMMARY_OPT, 
PROCEED_ZERO_NO_SUMMARY_OPT_DEFAULT));
+  }
+
+  @Override
+  public Selection select(SelectionParameters sparams) {
+
+    var tableConf = 
sparams.getEnvironment().getConfiguration(sparams.getTableId());
+
+    // TODO ISSUE could add a method to get props with prefix. That could be 
used to efficiently get

Review comment:
       Open issue for TODO 

##########
File path: 
core/src/main/java/org/apache/accumulo/core/client/admin/compaction/CompactionSelector.java
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.client.admin.compaction;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Predicate;
+
+import org.apache.accumulo.core.client.PluginEnvironment;
+import org.apache.accumulo.core.client.sample.SamplerConfiguration;
+import org.apache.accumulo.core.client.summary.SummarizerConfiguration;
+import org.apache.accumulo.core.client.summary.Summary;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+
+/**
+ * This class select which files a user compaction will compact. It can also 
be configured per table

Review comment:
       Typo
   ```suggestion
    * This class selects which files a user compaction will compact. It can 
also be configured per table
   ```

##########
File path: 
server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java
##########
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.tserver.compactions;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.function.Consumer;
+
+import org.apache.accumulo.core.client.admin.compaction.CompactableFile;
+import org.apache.accumulo.core.conf.ConfigurationTypeHelper;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.spi.common.ServiceEnvironment;
+import org.apache.accumulo.core.spi.compaction.CompactionExecutorId;
+import org.apache.accumulo.core.spi.compaction.CompactionJob;
+import org.apache.accumulo.core.spi.compaction.CompactionKind;
+import org.apache.accumulo.core.spi.compaction.CompactionPlan;
+import org.apache.accumulo.core.spi.compaction.CompactionPlanner;
+import 
org.apache.accumulo.core.spi.compaction.CompactionPlanner.PlanningParameters;
+import org.apache.accumulo.core.spi.compaction.CompactionServiceId;
+import org.apache.accumulo.core.spi.compaction.ExecutorManager;
+import org.apache.accumulo.core.util.compaction.CompactionPlanImpl;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.ServiceEnvironmentImpl;
+import org.apache.accumulo.tserver.TabletServerResourceManager;
+import org.apache.accumulo.tserver.compactions.SubmittedJob.Status;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+public class CompactionService {
+  // TODO ISSUE move rate limiters to the compaction service level.
+  private final CompactionPlanner planner;
+  private final Map<CompactionExecutorId,CompactionExecutor> executors;
+  private final CompactionServiceId myId;
+  private Map<KeyExtent,Collection<SubmittedJob>> submittedJobs = new 
ConcurrentHashMap<>();
+  private ServerContext serverCtx;
+
+  private static final Logger log = 
LoggerFactory.getLogger(CompactionService.class);
+
+  // TODO ISSUE change thread pool sizes if compaction service config changes
+  public CompactionService(String serviceName, String plannerClass,
+      Map<String,String> serviceOptions, ServerContext sctx, 
TabletServerResourceManager tsrm) {
+
+    this.myId = CompactionServiceId.of(serviceName);
+    this.serverCtx = sctx;
+
+    try {
+      planner =
+          ConfigurationTypeHelper.getClassInstance(null, plannerClass, 
CompactionPlanner.class);
+    } catch (IOException | ReflectiveOperationException e) {
+      throw new RuntimeException(e);
+    }
+
+    Map<CompactionExecutorId,CompactionExecutor> tmpExecutors = new 
HashMap<>();
+
+    planner.init(new CompactionPlanner.InitParameters() {
+
+      @Override
+      public ServiceEnvironment getServiceEnvironment() {
+        return new ServiceEnvironmentImpl(sctx);
+      }
+
+      @Override
+      public Map<String,String> getOptions() {
+        return serviceOptions;
+      }
+
+      @Override
+      public ExecutorManager getExecutorManager() {
+        return new ExecutorManager() {
+          @Override
+          public CompactionExecutorId createExecutor(String executorName, int 
threads) {
+            var ceid = CompactionExecutorId.of(serviceName + "." + 
executorName);
+            Preconditions.checkState(!tmpExecutors.containsKey(ceid));
+            tmpExecutors.put(ceid, new CompactionExecutor(ceid, threads, 
tsrm));
+            return ceid;
+          }
+        };
+      }
+
+      @Override
+      public String getFullyQualifiedOption(String key) {
+        return Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + serviceName 
+ ".opts." + key;
+      }
+    });
+
+    this.executors = Map.copyOf(tmpExecutors);
+
+    log.debug("Created new compaction service id:{} executors:{}", myId, 
executors.keySet());
+  }
+
+  private boolean reconcile(Set<CompactionJob> jobs, Collection<SubmittedJob> 
submitted) {
+    for (SubmittedJob submittedJob : submitted) {
+      // only read status once to avoid race conditions since multiple 
compares are done
+      var status = submittedJob.getStatus();
+      if (status == Status.QUEUED) {
+        if (!jobs.remove(submittedJob.getJob())) {
+          if (!submittedJob.cancel(Status.QUEUED)) {
+            return false;
+          }
+        }
+      } else if (status == Status.RUNNING) {
+        for (CompactionJob job : jobs) {
+          if (!Collections.disjoint(submittedJob.getJob().getFiles(), 
job.getFiles())) {
+            return false;
+          }
+        }
+      }
+    }
+
+    return true;
+  }
+
+  public void compact(CompactionKind kind, Compactable compactable,
+      Consumer<Compactable> completionCallback) {
+    // TODO ISSUE this could take a while... could run this in a thread pool

Review comment:
       Open issue for TODO

##########
File path: 
server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java
##########
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.tserver.compactions;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.function.Consumer;
+
+import org.apache.accumulo.core.client.admin.compaction.CompactableFile;
+import org.apache.accumulo.core.conf.ConfigurationTypeHelper;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.spi.common.ServiceEnvironment;
+import org.apache.accumulo.core.spi.compaction.CompactionExecutorId;
+import org.apache.accumulo.core.spi.compaction.CompactionJob;
+import org.apache.accumulo.core.spi.compaction.CompactionKind;
+import org.apache.accumulo.core.spi.compaction.CompactionPlan;
+import org.apache.accumulo.core.spi.compaction.CompactionPlanner;
+import 
org.apache.accumulo.core.spi.compaction.CompactionPlanner.PlanningParameters;
+import org.apache.accumulo.core.spi.compaction.CompactionServiceId;
+import org.apache.accumulo.core.spi.compaction.ExecutorManager;
+import org.apache.accumulo.core.util.compaction.CompactionPlanImpl;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.ServiceEnvironmentImpl;
+import org.apache.accumulo.tserver.TabletServerResourceManager;
+import org.apache.accumulo.tserver.compactions.SubmittedJob.Status;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+public class CompactionService {
+  // TODO ISSUE move rate limiters to the compaction service level.

Review comment:
       Open issue for TODO

##########
File path: 
server/master/src/main/java/org/apache/accumulo/master/FateServiceHandler.java
##########
@@ -467,14 +465,11 @@ public String invalidMessage(String argument) {
       }
       case TABLE_COMPACT: {
         TableOperation tableOp = TableOperation.COMPACT;
-        validateArgumentCount(arguments, tableOp, 5);
+        // TODO ISSUE could have compatability mode for the old number of args

Review comment:
       Open Issue for TODO

##########
File path: 
server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionService.java
##########
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.tserver.compactions;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.function.Consumer;
+
+import org.apache.accumulo.core.client.admin.compaction.CompactableFile;
+import org.apache.accumulo.core.conf.ConfigurationTypeHelper;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.spi.common.ServiceEnvironment;
+import org.apache.accumulo.core.spi.compaction.CompactionExecutorId;
+import org.apache.accumulo.core.spi.compaction.CompactionJob;
+import org.apache.accumulo.core.spi.compaction.CompactionKind;
+import org.apache.accumulo.core.spi.compaction.CompactionPlan;
+import org.apache.accumulo.core.spi.compaction.CompactionPlanner;
+import 
org.apache.accumulo.core.spi.compaction.CompactionPlanner.PlanningParameters;
+import org.apache.accumulo.core.spi.compaction.CompactionServiceId;
+import org.apache.accumulo.core.spi.compaction.ExecutorManager;
+import org.apache.accumulo.core.util.compaction.CompactionPlanImpl;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.ServiceEnvironmentImpl;
+import org.apache.accumulo.tserver.TabletServerResourceManager;
+import org.apache.accumulo.tserver.compactions.SubmittedJob.Status;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+public class CompactionService {
+  // TODO ISSUE move rate limiters to the compaction service level.
+  private final CompactionPlanner planner;
+  private final Map<CompactionExecutorId,CompactionExecutor> executors;
+  private final CompactionServiceId myId;
+  private Map<KeyExtent,Collection<SubmittedJob>> submittedJobs = new 
ConcurrentHashMap<>();
+  private ServerContext serverCtx;
+
+  private static final Logger log = 
LoggerFactory.getLogger(CompactionService.class);
+
+  // TODO ISSUE change thread pool sizes if compaction service config changes

Review comment:
       Open issue for TODO




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to