keith-turner commented on a change in pull request #2422:
URL: https://github.com/apache/accumulo/pull/2422#discussion_r840098253
##########
File path: server/base/src/main/java/org/apache/accumulo/server/util/ZooZap.java
##########
@@ -176,6 +178,25 @@ public void execute(String[] args) throws Exception {
}
+ if (opts.zapScanServers) {
+ String sserversPath = Constants.ZROOT + "/" + iid +
Constants.ZSSERVERS;
+ try {
+ List<String> children = zoo.getChildren(sserversPath);
+ for (String child : children) {
+ message("Deleting " + sserversPath + "/" + child + " from
zookeeper", opts);
+
+ var zLockPath = ServiceLock.path(sserversPath + "/" + child);
+ if (!zoo.getChildren(zLockPath.toString()).isEmpty()) {
+ if (!ServiceLock.deleteLock(zoo, zLockPath, "tserver")) {
Review comment:
Is the `"tserver"` string correct?
##########
File path:
minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java
##########
@@ -56,6 +56,7 @@
private Map<String,String> configuredSiteConig = new HashMap<>();
private Map<String,String> clientProps = new HashMap<>();
private int numTservers = 2;
+ private int numScanServers = 2;
Review comment:
This will make Mini Accumulo all of a sudden use more memory for an
optional feature. Wondering if this should default to zero.
##########
File path: test/src/main/resources/log4j2-test.properties
##########
@@ -79,7 +79,7 @@ logger.16.name =
org.apache.accumulo.server.util.ReplicationTableUtil
logger.16.level = trace
logger.17.name = org.apache.accumulo.core.clientImpl.ThriftScanner
-logger.17.level = info
+logger.17.level = debug
Review comment:
Do we want to keep this change?
##########
File path:
server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
##########
@@ -0,0 +1,1288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.tserver;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static
org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.clientImpl.thrift.ConfigurationType;
+import org.apache.accumulo.core.clientImpl.thrift.TDiskUsage;
+import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException;
+import
org.apache.accumulo.core.clientImpl.thrift.ThriftTableOperationException;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.dataImpl.thrift.InitialMultiScan;
+import org.apache.accumulo.core.dataImpl.thrift.InitialScan;
+import org.apache.accumulo.core.dataImpl.thrift.IterInfo;
+import org.apache.accumulo.core.dataImpl.thrift.MapFileInfo;
+import org.apache.accumulo.core.dataImpl.thrift.MultiScanResult;
+import org.apache.accumulo.core.dataImpl.thrift.ScanResult;
+import org.apache.accumulo.core.dataImpl.thrift.TCMResult;
+import org.apache.accumulo.core.dataImpl.thrift.TColumn;
+import org.apache.accumulo.core.dataImpl.thrift.TConditionalMutation;
+import org.apache.accumulo.core.dataImpl.thrift.TConditionalSession;
+import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent;
+import org.apache.accumulo.core.dataImpl.thrift.TMutation;
+import org.apache.accumulo.core.dataImpl.thrift.TRange;
+import org.apache.accumulo.core.dataImpl.thrift.TRowRange;
+import org.apache.accumulo.core.dataImpl.thrift.TSummaries;
+import org.apache.accumulo.core.dataImpl.thrift.TSummaryRequest;
+import org.apache.accumulo.core.dataImpl.thrift.UpdateErrors;
+import
org.apache.accumulo.core.file.blockfile.cache.impl.BlockCacheConfiguration;
+import org.apache.accumulo.core.master.thrift.TabletServerStatus;
+import org.apache.accumulo.core.metadata.ScanServerRefTabletFile;
+import org.apache.accumulo.core.metadata.StoredTabletFile;
+import org.apache.accumulo.core.metadata.schema.Ample;
+import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.core.metrics.MetricsUtil;
+import org.apache.accumulo.core.securityImpl.thrift.TCredentials;
+import org.apache.accumulo.core.spi.compaction.CompactionExecutorId;
+import org.apache.accumulo.core.spi.compaction.CompactionServiceId;
+import org.apache.accumulo.core.spi.compaction.CompactionServices;
+import org.apache.accumulo.core.tabletserver.thrift.ActiveCompaction;
+import org.apache.accumulo.core.tabletserver.thrift.ActiveScan;
+import
org.apache.accumulo.core.tabletserver.thrift.ConstraintViolationException;
+import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException;
+import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException;
+import org.apache.accumulo.core.tabletserver.thrift.TCompactionQueueSummary;
+import org.apache.accumulo.core.tabletserver.thrift.TDurability;
+import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob;
+import org.apache.accumulo.core.tabletserver.thrift.TSampleNotPresentException;
+import org.apache.accumulo.core.tabletserver.thrift.TSamplerConfiguration;
+import org.apache.accumulo.core.tabletserver.thrift.TUnloadTabletGoal;
+import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
+import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Iface;
+import org.apache.accumulo.core.tabletserver.thrift.TabletStats;
+import org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException;
+import org.apache.accumulo.core.trace.TraceUtil;
+import org.apache.accumulo.core.trace.thrift.TInfo;
+import org.apache.accumulo.core.util.Halt;
+import org.apache.accumulo.core.util.threads.ThreadPools;
+import org.apache.accumulo.fate.util.UtilWaitThread;
+import org.apache.accumulo.fate.zookeeper.ServiceLock;
+import org.apache.accumulo.fate.zookeeper.ServiceLock.LockLossReason;
+import org.apache.accumulo.fate.zookeeper.ServiceLock.LockWatcher;
+import org.apache.accumulo.fate.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.ServerOpts;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.rpc.ServerAddress;
+import org.apache.accumulo.server.rpc.TCredentialsUpdatingWrapper;
+import org.apache.accumulo.server.rpc.TServerUtils;
+import org.apache.accumulo.server.rpc.ThriftServerType;
+import org.apache.accumulo.server.security.SecurityUtil;
+import
org.apache.accumulo.tserver.TabletServerResourceManager.TabletResourceManager;
+import org.apache.accumulo.tserver.compactions.Compactable;
+import org.apache.accumulo.tserver.compactions.CompactionManager;
+import org.apache.accumulo.tserver.compactions.ExternalCompactionJob;
+import org.apache.accumulo.tserver.metrics.CompactionExecutorsMetrics;
+import org.apache.accumulo.tserver.metrics.TabletServerScanMetrics;
+import org.apache.accumulo.tserver.session.MultiScanSession;
+import org.apache.accumulo.tserver.session.ScanSession;
+import org.apache.accumulo.tserver.session.ScanSession.TabletResolver;
+import org.apache.accumulo.tserver.session.SingleScanSession;
+import org.apache.accumulo.tserver.tablet.Tablet;
+import org.apache.accumulo.tserver.tablet.TabletData;
+import org.apache.commons.lang3.tuple.MutableTriple;
+import org.apache.thrift.TException;
+import org.apache.zookeeper.KeeperException;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.github.benmanes.caffeine.cache.CacheLoader;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
+import com.github.benmanes.caffeine.cache.Scheduler;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Sets;
+
+public class ScanServer extends TabletServer implements
TabletClientService.Iface {
+
+ static class ScanInformation extends MutableTriple<Long,KeyExtent,Tablet> {
+ private static final long serialVersionUID = 1L;
+
+ public Long getScanId() {
+ return getLeft();
+ }
+
+ public void setScanId(Long scanId) {
+ setLeft(scanId);
+ }
+
+ public KeyExtent getExtent() {
+ return getMiddle();
+ }
+
+ public void setExtent(KeyExtent extent) {
+ setMiddle(extent);
+ }
+
+ public Tablet getTablet() {
+ return getRight();
+ }
+
+ public void setTablet(Tablet tablet) {
+ setRight(tablet);
+ }
+ }
+
+ /**
+ * A compaction manager that does nothing
+ */
+ private static class ScanServerCompactionManager extends CompactionManager {
+
+ public ScanServerCompactionManager(ServerContext context,
+ CompactionExecutorsMetrics ceMetrics) {
+ super(new ArrayList<>(), context, ceMetrics);
+ }
+
+ @Override
+ public void compactableChanged(Compactable compactable) {}
+
+ @Override
+ public void start() {}
+
+ @Override
+ public CompactionServices getServices() {
+ return null;
+ }
+
+ @Override
+ public boolean isCompactionQueued(KeyExtent extent,
Set<CompactionServiceId> servicesUsed) {
+ return false;
+ }
+
+ @Override
+ public int getCompactionsRunning() {
+ return 0;
+ }
+
+ @Override
+ public int getCompactionsQueued() {
+ return 0;
+ }
+
+ @Override
+ public ExternalCompactionJob reserveExternalCompaction(String queueName,
long priority,
+ String compactorId, ExternalCompactionId externalCompactionId) {
+ return null;
+ }
+
+ @Override
+ public void registerExternalCompaction(ExternalCompactionId ecid,
KeyExtent extent,
+ CompactionExecutorId ceid) {}
+
+ @Override
+ public void commitExternalCompaction(ExternalCompactionId extCompactionId,
+ KeyExtent extentCompacted, Map<KeyExtent,Tablet> currentTablets, long
fileSize,
+ long entries) {}
+
+ @Override
+ public void externalCompactionFailed(ExternalCompactionId ecid, KeyExtent
extentCompacted,
+ Map<KeyExtent,Tablet> currentTablets) {}
+
+ @Override
+ public List<TCompactionQueueSummary> getCompactionQueueSummaries() {
+ return null;
+ }
+
+ @Override
+ public Collection<ExtCompMetric> getExternalMetrics() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public void compactableClosed(KeyExtent extent, Set<CompactionServiceId>
servicesUsed,
+ Set<ExternalCompactionId> ecids) {}
+
+ }
+
+ public static class ScanServerCompactionExecutorMetrics extends
CompactionExecutorsMetrics {
+
+ @Override
+ protected void startUpdateThread() {}
+
+ }
+
+ private static final Logger LOG = LoggerFactory.getLogger(ScanServer.class);
+
+ protected ThriftClientHandler handler;
+ private UUID serverLockUUID;
+ private final TabletMetadataLoader tabletMetadataLoader;
+ private final LoadingCache<KeyExtent,TabletMetadata> tabletMetadataCache;
+ protected Set<StoredTabletFile> lockedFiles = new HashSet<>();
+ protected Map<StoredTabletFile,ReservedFile> reservedFiles = new
ConcurrentHashMap<>();
+ protected AtomicLong nextScanReservationId = new AtomicLong();
+
+ private static class TabletMetadataLoader implements
CacheLoader<KeyExtent,TabletMetadata> {
+
+ private final Ample ample;
+
+ private TabletMetadataLoader(Ample ample) {
+ this.ample = ample;
+ }
+
+ @Override
+ public @Nullable TabletMetadata load(KeyExtent keyExtent) {
+ long t1 = System.currentTimeMillis();
+ var tm = ample.readTablet(keyExtent);
+ long t2 = System.currentTimeMillis();
+ LOG.trace("Read metadata for 1 tablet in {} ms", t2 - t1);
+ return tm;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public Map<? extends KeyExtent,? extends TabletMetadata>
+ loadAll(Set<? extends KeyExtent> keys) {
+ long t1 = System.currentTimeMillis();
+ var tms = ample.readTablets().forTablets((Collection<KeyExtent>)
keys).build().stream()
+ .collect(Collectors.toMap(tm -> tm.getExtent(), tm -> tm));
+ long t2 = System.currentTimeMillis();
+ LOG.trace("Read metadata for {} tablets in {} ms", keys.size(), t2 - t1);
+ return tms;
+ }
+ }
+
+ public ScanServer(ServerOpts opts, String[] args) {
+ super(opts, args, true);
+
+ // Note: The way to control the number of concurrent scans that a
ScanServer will
+ // perform is by using Property.SSERV_SCAN_EXECUTORS_DEFAULT_THREADS or
the number
+ // of threads in Property.SSERV_SCAN_EXECUTORS_PREFIX.
+
+ long cacheExpiration =
+
getConfiguration().getTimeInMillis(Property.SSERV_CACHED_TABLET_METADATA_EXPIRATION);
+
+ long scanServerReservationExpiration =
+
getConfiguration().getTimeInMillis(Property.SSERVER_SCAN_REFERENCE_EXPIRATION_TIME);
+
+ tabletMetadataLoader = new TabletMetadataLoader(getContext().getAmple());
+
+ if (cacheExpiration == 0L) {
+ LOG.warn("Tablet metadata caching disabled, may cause excessive scans on
metadata table.");
+ tabletMetadataCache = null;
+ } else {
+ if (cacheExpiration < 60000) {
+ LOG.warn(
+ "Tablet metadata caching less than one minute, may cause excessive
scans on metadata table.");
+ }
+ tabletMetadataCache =
+ Caffeine.newBuilder().expireAfterWrite(cacheExpiration,
TimeUnit.MILLISECONDS)
Review comment:
Curious why choose Caffeine over Guava cache?
##########
File path:
test/src/main/java/org/apache/accumulo/test/functional/AccumuloClientIT.java
##########
@@ -149,6 +149,7 @@ public void testAccumuloClientBuilder() throws Exception {
props.put(ClientProperty.INSTANCE_ZOOKEEPERS.getKey(), zookeepers);
props.put(ClientProperty.AUTH_PRINCIPAL.getKey(), user1);
props.put(ClientProperty.INSTANCE_ZOOKEEPERS_TIMEOUT.getKey(), "22s");
+ props.put(ClientProperty.SCAN_SERVER_DISPATCHER_OPTS_PREFIX + "enabled",
"true");
Review comment:
What is the purpose of this?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]