(accumulo) 02/02: Merge remote-tracking branch 'upstream/elasticity' into elasticity
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git commit 1688cecd5a8889bcb41cec2bdb45d2f063d1d9a9 Merge: 3849fffedd 202198a588 Author: Dave Marion AuthorDate: Mon Mar 4 20:15:25 2024 + Merge remote-tracking branch 'upstream/elasticity' into elasticity .../apache/accumulo/core/logging/TabletLogger.java | 49 ++- .../coordinator/CompactionCoordinator.java | 27 +- .../coordinator/commit/CommitCompaction.java | 4 + .../manager/tableOps/compact/CompactionDriver.java | 22 +- .../manager/tableOps/split/UpdateTablets.java | 42 ++- .../compaction/CompactionCoordinatorTest.java | 148 ++ .../manager/tableOps/split/UpdateTabletsTest.java | 328 + .../apache/accumulo/test/functional/SplitIT.java | 54 test/src/main/resources/log4j2-test.properties | 3 + 9 files changed, 636 insertions(+), 41 deletions(-)
(accumulo) branch elasticity updated (202198a588 -> 1688cecd5a)
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a change to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git from 202198a588 Updates compaction to use TabletLogger (#4333) add d91d016211 Optimized logic for getting a random TabletServer connection (#4309) add f3d5fb01d7 Merge branch '2.1' new 3849fffedd Merge branch 'main' into elasticity new 1688cecd5a Merge remote-tracking branch 'upstream/elasticity' into elasticity The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../core/clientImpl/ThriftTransportKey.java| 29 -- .../core/clientImpl/ThriftTransportPool.java | 110 + .../org/apache/accumulo/core/rpc/ThriftUtil.java | 7 +- .../accumulo/core/rpc/clients/TServerClient.java | 105 +++- .../core/rpc/clients/ThriftClientTypes.java| 6 +- .../core/clientImpl/ThriftTransportKeyTest.java| 25 ++--- .../apache/accumulo/test/TransportCachingIT.java | 42 .../test/functional/MemoryStarvedScanIT.java | 41 +--- 8 files changed, 182 insertions(+), 183 deletions(-)
(accumulo) 01/02: Merge branch 'main' into elasticity
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git commit 3849fffeddeb29ec92b74926c15e756f182bb7c3 Merge: 422d48a432 f3d5fb01d7 Author: Dave Marion AuthorDate: Mon Mar 4 20:14:59 2024 + Merge branch 'main' into elasticity .../core/clientImpl/ThriftTransportKey.java| 29 -- .../core/clientImpl/ThriftTransportPool.java | 110 + .../org/apache/accumulo/core/rpc/ThriftUtil.java | 7 +- .../accumulo/core/rpc/clients/TServerClient.java | 105 +++- .../core/rpc/clients/ThriftClientTypes.java| 6 +- .../core/clientImpl/ThriftTransportKeyTest.java| 25 ++--- .../apache/accumulo/test/TransportCachingIT.java | 42 .../test/functional/MemoryStarvedScanIT.java | 41 +--- 8 files changed, 182 insertions(+), 183 deletions(-) diff --cc test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedScanIT.java index 8273f8e5b8,59b9a535b8..0becd57120 --- a/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedScanIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/MemoryStarvedScanIT.java @@@ -31,6 -30,6 +30,7 @@@ import java.util.Iterator import java.util.List; import java.util.Map; import java.util.Map.Entry; ++import java.util.Optional; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.DoubleAdder; @@@ -41,23 -39,12 +41,21 @@@ import org.apache.accumulo.core.client. import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.admin.TableOperations; +import org.apache.accumulo.core.clientImpl.ClientContext; - import org.apache.accumulo.core.clientImpl.ThriftTransportKey; +import org.apache.accumulo.core.clientImpl.thrift.ClientService.Client; +import org.apache.accumulo.core.clientImpl.thrift.TInfo; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.fate.zookeeper.ZooCache; import org.apache.accumulo.core.iterators.WrappingIterator; +import org.apache.accumulo.core.lock.ServiceLock; - import org.apache.accumulo.core.lock.ServiceLock.ServiceLockPath; ++import org.apache.accumulo.core.lock.ServiceLockData; +import org.apache.accumulo.core.lock.ServiceLockData.ThriftService; import org.apache.accumulo.core.metrics.MetricsProducer; +import org.apache.accumulo.core.rpc.ThriftUtil; +import org.apache.accumulo.core.rpc.clients.ThriftClientTypes; - import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.harness.MiniClusterConfigurationCallback; import org.apache.accumulo.harness.SharedMiniClusterBase; import org.apache.accumulo.minicluster.MemoryUnit; @@@ -67,7 -54,6 +65,8 @@@ import org.apache.accumulo.test.metrics import org.apache.accumulo.test.metrics.TestStatsDSink; import org.apache.accumulo.test.metrics.TestStatsDSink.Metric; import org.apache.hadoop.conf.Configuration; +import org.apache.thrift.transport.TTransport; ++import org.apache.thrift.transport.TTransportException; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; @@@ -75,6 -61,6 +74,8 @@@ import org.junit.jupiter.api.Test import org.slf4j.Logger; import org.slf4j.LoggerFactory; ++import com.google.common.net.HostAndPort; ++ public class MemoryStarvedScanIT extends SharedMiniClusterBase { public static class MemoryStarvedITConfiguration implements MiniClusterConfigurationCallback { @@@ -187,25 -173,10 +188,35 @@@ } static void freeServerMemory(AccumuloClient client) throws Exception { -// Instantiating this class on the TabletServer will free the memory as it -// frees the buffers created by the MemoryConsumingIterator in its constructor. - client.instanceOperations().testClassLoad(MemoryFreeingIterator.class.getName(), -WrappingIterator.class.getName()); + ++// This does not call ThriftClientTypes.CLIENT.execute because ++// we only want to communicate with the TabletServer for this test +final ClientContext context = (ClientContext) client; +final long rpcTimeout = context.getClientTimeoutInMillis(); - final ArrayList servers = new ArrayList<>(); +final String serverPath = context.getZooKeeperRoot() + Constants.ZTSERVERS; +final ZooCache zc = context.getZooCache(); + +for (String server : zc.getChildren(serverPath)) { - ServiceLockPath zLocPath = ServiceLock.path(serverPath + "/" + server); - zc.getLockData(zLocPath).map(sld -> sld.getAddress(ThriftService.CLIENT)) - .map(address -> new ThriftTransportKey(address, rpcTimeout, context)) - .ifPresent(serve
(accumulo) 01/01: Merge branch '2.1'
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git commit f3d5fb01d701a6932d37d2f67f52cc0eefa64d50 Merge: ebf7054d1f d91d016211 Author: Dave Marion AuthorDate: Mon Mar 4 18:14:16 2024 + Merge branch '2.1' .../core/clientImpl/ThriftTransportKey.java| 29 -- .../core/clientImpl/ThriftTransportPool.java | 110 + .../org/apache/accumulo/core/rpc/ThriftUtil.java | 7 +- .../accumulo/core/rpc/clients/TServerClient.java | 105 +++- .../core/rpc/clients/ThriftClientTypes.java| 6 +- .../core/clientImpl/ThriftTransportKeyTest.java| 25 ++--- .../coordinator/CompactionCoordinator.java | 4 +- .../apache/accumulo/test/TransportCachingIT.java | 42 8 files changed, 158 insertions(+), 170 deletions(-) diff --cc core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftTransportKey.java index f332a09492,f4c7047d6d..0f84154a15 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftTransportKey.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftTransportKey.java @@@ -24,9 -24,10 +24,10 @@@ import java.util.Objects import org.apache.accumulo.core.rpc.SaslConnectionParams; import org.apache.accumulo.core.rpc.SslConnectionParams; + import org.apache.accumulo.core.rpc.clients.ThriftClientTypes; -import org.apache.accumulo.core.util.HostAndPort; import com.google.common.annotations.VisibleForTesting; +import com.google.common.net.HostAndPort; @VisibleForTesting public class ThriftTransportKey { @@@ -54,12 -58,18 +58,18 @@@ this.saslParams = saslParams; if (saslParams != null && sslParams != null) { // TSasl and TSSL transport factories don't play nicely together - throw new RuntimeException("Cannot use both SSL and SASL thrift transports"); + throw new IllegalArgumentException("Cannot use both SSL and SASL thrift transports"); } - this.hash = Objects.hash(server, timeout, sslParams, saslParams); + this.hash = Objects.hash(type, server, timeout, sslParams, saslParams); } - HostAndPort getServer() { + @VisibleForTesting + public ThriftClientTypes getType() { + return type; + } + + @VisibleForTesting + public HostAndPort getServer() { return server; } diff --cc core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftTransportPool.java index d1bc17e945,a3d38aa10a..b3f205fa2a --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftTransportPool.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftTransportPool.java @@@ -41,6 -41,8 +41,7 @@@ import java.util.function.LongSupplier import java.util.function.Supplier; import org.apache.accumulo.core.rpc.ThriftUtil; + import org.apache.accumulo.core.rpc.clients.ThriftClientTypes; -import org.apache.accumulo.core.util.HostAndPort; import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.core.util.threads.Threads; import org.apache.thrift.TConfiguration; @@@ -49,9 -51,7 +50,8 @@@ import org.apache.thrift.transport.TTra import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.net.HostAndPort; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; @@@ -109,71 -110,40 +109,40 @@@ public class ThriftTransportPool return pool; } - public TTransport getTransport(HostAndPort location, long milliseconds, ClientContext context) - throws TTransportException { - ThriftTransportKey cacheKey = new ThriftTransportKey(location, milliseconds, context); + public TTransport getTransport(ThriftClientTypes type, HostAndPort location, long milliseconds, + ClientContext context, boolean preferCached) throws TTransportException { - CachedConnection connection = connectionPool.reserveAny(cacheKey); - - if (connection != null) { - log.trace("Using existing connection to {}", cacheKey.getServer()); - return connection.transport; - } else { - return createNewTransport(cacheKey); + ThriftTransportKey cacheKey = new ThriftTransportKey(type, location, milliseconds, context); + if (preferCached) { + CachedConnection connection = connectionPool.reserveAny(cacheKey); + if (connection != null) { + log.trace("Using existing connection to {}", cacheKey.getServer()); + return connection.transport; + } } + return createNewTransport(cacheKey); } - @VisibleForTesting - public Pair getAnyTransport(List servers, - boolean preferCachedConnection) throws TTransportException { - - servers = new ArrayList<>(servers); - - if (preferCachedConnection) { - HashSet serversSet = new HashSet<>(servers)
(accumulo) branch main updated (ebf7054d1f -> f3d5fb01d7)
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git from ebf7054d1f Merge branch '2.1' add d91d016211 Optimized logic for getting a random TabletServer connection (#4309) new f3d5fb01d7 Merge branch '2.1' The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../core/clientImpl/ThriftTransportKey.java| 29 -- .../core/clientImpl/ThriftTransportPool.java | 110 + .../org/apache/accumulo/core/rpc/ThriftUtil.java | 7 +- .../accumulo/core/rpc/clients/TServerClient.java | 105 +++- .../core/rpc/clients/ThriftClientTypes.java| 6 +- .../core/clientImpl/ThriftTransportKeyTest.java| 25 ++--- .../coordinator/CompactionCoordinator.java | 4 +- .../apache/accumulo/test/TransportCachingIT.java | 42 8 files changed, 158 insertions(+), 170 deletions(-)
(accumulo) branch elasticity updated: Updates compaction to use TabletLogger (#4333)
This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git The following commit(s) were added to refs/heads/elasticity by this push: new 202198a588 Updates compaction to use TabletLogger (#4333) 202198a588 is described below commit 202198a588aae1096611ff9b5513075054e2a876 Author: Keith Turner AuthorDate: Mon Mar 4 10:55:50 2024 -0500 Updates compaction to use TabletLogger (#4333) --- .../apache/accumulo/core/logging/TabletLogger.java | 49 +++--- .../coordinator/CompactionCoordinator.java | 9 ++-- .../coordinator/commit/CommitCompaction.java | 4 ++ .../manager/tableOps/compact/CompactionDriver.java | 22 -- test/src/main/resources/log4j2-test.properties | 3 ++ 5 files changed, 55 insertions(+), 32 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/logging/TabletLogger.java b/core/src/main/java/org/apache/accumulo/core/logging/TabletLogger.java index 349d29b19f..e76c62a6c9 100644 --- a/core/src/main/java/org/apache/accumulo/core/logging/TabletLogger.java +++ b/core/src/main/java/org/apache/accumulo/core/logging/TabletLogger.java @@ -27,14 +27,16 @@ import java.util.SortedSet; import java.util.UUID; import java.util.concurrent.TimeUnit; -import org.apache.accumulo.core.client.admin.CompactionConfig; import org.apache.accumulo.core.client.admin.compaction.CompactableFile; import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.metadata.CompactableFileImpl; +import org.apache.accumulo.core.metadata.ReferencedTabletFile; import org.apache.accumulo.core.metadata.StoredTabletFile; import org.apache.accumulo.core.metadata.TServerInstance; import org.apache.accumulo.core.metadata.TabletFile; import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.spi.compaction.CompactionJob; import org.apache.accumulo.core.spi.compaction.CompactionKind; import org.apache.accumulo.core.tabletserver.log.LogEntry; @@ -127,40 +129,36 @@ public class TabletLogger { cf -> CompactableFileImpl.toStoredTabletFile(cf).toMinimalString()); } - public static void selected(KeyExtent extent, CompactionKind kind, + public static void selected(FateId fateId, KeyExtent extent, Collection inputs) { -fileLog.trace("{} changed compaction selection set for {} new set {}", extent, kind, +fileLog.trace("Selected files {} {} {}", extent, fateId, Collections2.transform(inputs, StoredTabletFile::toMinimalString)); } - public static void compacting(KeyExtent extent, CompactionJob job, CompactionConfig config) { + public static void compacting(TabletMetadata tabletMetadata, ExternalCompactionId cid, + String compactorAddress, CompactionJob job) { if (fileLog.isDebugEnabled()) { - if (config == null) { -fileLog.debug("Compacting {} on {} for {} from {} size {}", extent, job.getGroup(), -job.getKind(), asMinimalString(job.getFiles()), getSize(job.getFiles())); + if (job.getKind() == CompactionKind.USER) { +var fateId = tabletMetadata.getSelectedFiles().getFateId(); +fileLog.debug( +"Compacting {} driver:{} id:{} group:{} compactor:{} priority:{} size:{} kind:{} files:{}", +tabletMetadata.getExtent(), fateId, cid, job.getGroup(), compactorAddress, +job.getPriority(), getSize(job.getFiles()), job.getKind(), +asMinimalString(job.getFiles())); } else { -fileLog.debug("Compacting {} on {} for {} from {} size {} config {}", extent, -job.getGroup(), job.getKind(), asMinimalString(job.getFiles()), getSize(job.getFiles()), -config); +fileLog.debug( +"Compacting {} id:{} group:{} compactor:{} priority:{} size:{} kind:{} files:{}", +tabletMetadata.getExtent(), cid, job.getGroup(), compactorAddress, job.getPriority(), +getSize(job.getFiles()), job.getKind(), asMinimalString(job.getFiles())); } } } - public static void compacted(KeyExtent extent, CompactionJob job, StoredTabletFile output) { -fileLog.debug("Compacted {} for {} created {} from {}", extent, job.getKind(), output, -asMinimalString(job.getFiles())); - } - - public static void compactionFailed(KeyExtent extent, CompactionJob job, - CompactionConfig config) { -fileLog.debug("Failed to compact: extent: {}, input files: {}, iterators: {}", extent, -asMinimalString(job.getFiles()), config.getIterators()); - } - - public static void externalCompactionFailed(KeyExtent extent, ExternalCompactionId id, - CompactionJob job, CompactionConfig config) { -fileLog.debug("Failed to compact: id: {}, extent: {}, input files: {},
(accumulo) branch elasticity updated: Considers all tablet metadata columns in split code (#4323)
This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git The following commit(s) were added to refs/heads/elasticity by this push: new 1d7f6937fc Considers all tablet metadata columns in split code (#4323) 1d7f6937fc is described below commit 1d7f6937fcd4a122978486c10832b37c62effea1 Author: Keith Turner AuthorDate: Mon Mar 4 10:31:48 2024 -0500 Considers all tablet metadata columns in split code (#4323) Made the following changes to the split code that adds new tablets and updates the existing tablet. * fixed potential NPE w/ tablet operation id check by reversing order of equals check * Throws IllegalStateException when attempting to split tablet with merged or cloned markers * Removed adding wals when creating new tablets in split, its not expected that the parent tablet would have wals and this is checked earlier * Deleted any user compaction requested, hosting requested, suspended, or last columns in the parent tablet Added a unit test that attempts to exercise the split code with all tablet columns. The unit test also has a set of tablet columns that were verified to work with split and it is checked against the set of columns in the code. The purpose of this test is to fail when a new column is added to ensure that split is considered. Was a bit uncertain about deleting the last location and suspend. Those columns either need to be deleted from the parent tablet or added to the new tablets being created. The current code was doing neither. Decided to delete them as the new tablets have a different range and are conceptually different tablets than the parent. --- .../manager/tableOps/split/UpdateTablets.java | 42 ++- .../manager/tableOps/split/UpdateTabletsTest.java | 328 + .../apache/accumulo/test/functional/SplitIT.java | 54 3 files changed, 421 insertions(+), 3 deletions(-) diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/UpdateTablets.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/UpdateTablets.java index 2fe3c56399..f2d1501ae5 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/UpdateTablets.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/UpdateTablets.java @@ -22,6 +22,7 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.SortedSet; import java.util.TreeMap; @@ -82,7 +83,7 @@ public class UpdateTablets extends ManagerRepo { } } -Preconditions.checkState(tabletMetadata.getOperationId().equals(opid), +Preconditions.checkState(opid.equals(tabletMetadata.getOperationId()), "Tablet %s does not have expected operation id %s it has %s", splitInfo.getOriginal(), opid, tabletMetadata.getOperationId()); @@ -94,6 +95,13 @@ public class UpdateTablets extends ManagerRepo { "Tablet unexpectedly had walogs %s %s %s", fateId, tabletMetadata.getLogs(), tabletMetadata.getExtent()); +Preconditions.checkState(!tabletMetadata.hasMerged(), +"Tablet unexpectedly has a merged marker %s %s", fateId, tabletMetadata.getExtent()); + +Preconditions.checkState(tabletMetadata.getCloned() == null, +"Tablet unexpectedly has a cloned marker %s %s %s", fateId, tabletMetadata.getCloned(), +tabletMetadata.getExtent()); + var newTablets = splitInfo.getTablets(); var newTabletsFiles = getNewTabletFiles(newTablets, tabletMetadata, @@ -120,7 +128,7 @@ public class UpdateTablets extends ManagerRepo { newTablets.forEach(extent -> tabletsFiles.put(extent, new HashMap<>())); -// determine while files overlap which tablets and their estimated sizes +// determine which files overlap which tablets and their estimated sizes tabletMetadata.getFilesMap().forEach((file, dataFileValue) -> { FileUtil.FileInfo fileInfo = fileInfoProvider.apply(file); @@ -187,6 +195,7 @@ public class UpdateTablets extends ManagerRepo { mutator.putTime(tabletMetadata.getTime()); tabletMetadata.getFlushId().ifPresent(mutator::putFlushId); mutator.putPrevEndRow(newExtent.prevEndRow()); + tabletMetadata.getCompacted().forEach(mutator::putCompacted); tabletMetadata.getCompacted().forEach(compactedFateId -> log @@ -195,7 +204,6 @@ public class UpdateTablets extends ManagerRepo { mutator.putTabletAvailability(tabletMetadata.getTabletAvailability()); tabletMetadata.getLoaded().forEach((k, v) -> mutator.putBulkFile(k.getTabletFile(), v)); -tabletMetadata.getLogs().forEach(mutator::putWal); newTabletsFiles.
(accumulo) branch elasticity updated: Unit test compaction reservation and deny offline (#4334)
This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git The following commit(s) were added to refs/heads/elasticity by this push: new 01b0ff9356 Unit test compaction reservation and deny offline (#4334) 01b0ff9356 is described below commit 01b0ff9356b851d8f47368989407fa610441c23d Author: Keith Turner AuthorDate: Mon Mar 4 10:30:10 2024 -0500 Unit test compaction reservation and deny offline (#4334) Changes the compaction coordinator to not return jobs for a table that is offline. The point where this check was added to the coordinator needed unit test, so also added the unit test. --- .../coordinator/CompactionCoordinator.java | 18 ++- .../compaction/CompactionCoordinatorTest.java | 148 + 2 files changed, 160 insertions(+), 6 deletions(-) diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java index ebd5d8f764..a07ff50bc4 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java @@ -74,6 +74,7 @@ import org.apache.accumulo.core.fate.FateInstanceType; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.core.iterators.user.HasExternalCompactionsFilter; import org.apache.accumulo.core.iteratorsImpl.system.SystemIteratorUtil; +import org.apache.accumulo.core.manager.state.tables.TableState; import org.apache.accumulo.core.metadata.CompactableFileImpl; import org.apache.accumulo.core.metadata.ReferencedTabletFile; import org.apache.accumulo.core.metadata.StoredTabletFile; @@ -124,6 +125,7 @@ import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.CacheLoader; import com.github.benmanes.caffeine.cache.LoadingCache; import com.github.benmanes.caffeine.cache.Weigher; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.Sets; import com.google.common.net.HostAndPort; @@ -382,9 +384,9 @@ public class CompactionCoordinator } - // ELASTICITY_TODO unit test this code - private boolean canReserveCompaction(TabletMetadata tablet, CompactionJob job, - Set jobFiles) { + @VisibleForTesting + public static boolean canReserveCompaction(TabletMetadata tablet, CompactionKind kind, + Set jobFiles, ServerContext ctx) { if (tablet == null) { // the tablet no longer exist @@ -395,6 +397,10 @@ public class CompactionCoordinator return false; } +if (ctx.getTableState(tablet.getTableId()) != TableState.ONLINE) { + return false; +} + if (!tablet.getFiles().containsAll(jobFiles)) { return false; } @@ -406,7 +412,7 @@ public class CompactionCoordinator return false; } -switch (job.getKind()) { +switch (kind) { case SYSTEM: var userRequestedCompactions = tablet.getUserCompactionsRequested().size(); if (userRequestedCompactions > 0) { @@ -427,7 +433,7 @@ public class CompactionCoordinator } break; default: -throw new UnsupportedOperationException("Not currently handling " + job.getKind()); +throw new UnsupportedOperationException("Not currently handling " + kind); } return true; @@ -508,7 +514,7 @@ public class CompactionCoordinator try (var tabletsMutator = ctx.getAmple().conditionallyMutateTablets()) { var extent = metaJob.getTabletMetadata().getExtent(); -if (!canReserveCompaction(tabletMetadata, metaJob.getJob(), jobFiles)) { +if (!canReserveCompaction(tabletMetadata, metaJob.getJob().getKind(), jobFiles, ctx)) { return null; } diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java index 770405863e..4c0b2b1d52 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/compaction/CompactionCoordinatorTest.java @@ -18,10 +18,18 @@ */ package org.apache.accumulo.manager.compaction; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.ECOMP; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.OPID; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SELECTED; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.Colum
(accumulo) branch 2.1 updated: Optimized logic for getting a random TabletServer connection (#4309)
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git The following commit(s) were added to refs/heads/2.1 by this push: new d91d016211 Optimized logic for getting a random TabletServer connection (#4309) d91d016211 is described below commit d91d0162115ae66112a104278bcd14e8085936d3 Author: Dave Marion AuthorDate: Mon Mar 4 09:16:58 2024 -0500 Optimized logic for getting a random TabletServer connection (#4309) The previous logic in this class would gather all of the Tserver ZNodes in ZooKeeper, then get the data for each ZNode and validate their ServiceLock. Then, after all of that it would randomly pick one of the TabletServers to connect to. It did this through the ZooCache object which on an initial connection would be empty and causes a lot of back and forth to ZooKeeper. The side effect of this is that the ZooCache would be populated with TabletServer information. This change modifies TServerClient such that it no longer populates ZooCache information for each TabletServer and modifies the default logic for getting a connection to a TabletServer. The new logic will make 3 calls to ZooKeeper in the best case scenario, one to get the list of TServer ZNodes in Zookeeper, one to get the ServiceLock for a random TServer and another to get the ZNode data for one of it. This is all done through ZooCache, so it is lazily populated over time instead of incurring the penalty when getting the first TabletServer connection. Fixes #4303 --- .../core/clientImpl/ThriftTransportKey.java| 29 -- .../core/clientImpl/ThriftTransportPool.java | 110 + .../org/apache/accumulo/core/rpc/ThriftUtil.java | 7 +- .../accumulo/core/rpc/clients/TServerClient.java | 73 -- .../core/rpc/clients/ThriftClientTypes.java| 6 +- .../core/clientImpl/ThriftTransportKeyTest.java| 25 ++--- .../coordinator/CompactionCoordinator.java | 4 +- .../apache/accumulo/test/TransportCachingIT.java | 42 8 files changed, 142 insertions(+), 154 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftTransportKey.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftTransportKey.java index 8be320dcc5..f4c7047d6d 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftTransportKey.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftTransportKey.java @@ -24,12 +24,14 @@ import java.util.Objects; import org.apache.accumulo.core.rpc.SaslConnectionParams; import org.apache.accumulo.core.rpc.SslConnectionParams; +import org.apache.accumulo.core.rpc.clients.ThriftClientTypes; import org.apache.accumulo.core.util.HostAndPort; import com.google.common.annotations.VisibleForTesting; @VisibleForTesting public class ThriftTransportKey { + private final ThriftClientTypes type; private final HostAndPort server; private final long timeout; private final SslConnectionParams sslParams; @@ -38,16 +40,18 @@ public class ThriftTransportKey { private final int hash; @VisibleForTesting - public ThriftTransportKey(HostAndPort server, long timeout, ClientContext context) { -this(server, timeout, context.getClientSslParams(), context.getSaslParams()); + public ThriftTransportKey(ThriftClientTypes type, HostAndPort server, long timeout, + ClientContext context) { +this(type, server, timeout, context.getClientSslParams(), context.getSaslParams()); } /** * Visible only for testing */ - ThriftTransportKey(HostAndPort server, long timeout, SslConnectionParams sslParams, - SaslConnectionParams saslParams) { + ThriftTransportKey(ThriftClientTypes type, HostAndPort server, long timeout, + SslConnectionParams sslParams, SaslConnectionParams saslParams) { requireNonNull(server, "location is null"); +this.type = type; this.server = server; this.timeout = timeout; this.sslParams = sslParams; @@ -56,14 +60,21 @@ public class ThriftTransportKey { // TSasl and TSSL transport factories don't play nicely together throw new RuntimeException("Cannot use both SSL and SASL thrift transports"); } -this.hash = Objects.hash(server, timeout, sslParams, saslParams); +this.hash = Objects.hash(type, server, timeout, sslParams, saslParams); } - HostAndPort getServer() { + @VisibleForTesting + public ThriftClientTypes getType() { +return type; + } + + @VisibleForTesting + public HostAndPort getServer() { return server; } - long getTimeout() { + @VisibleForTesting + public long getTimeout() { return timeout; } @@ -81,7 +92,7 @@ public class ThriftTransportKey { return false; } ThriftTransportKey ttk = (ThriftTransportKey) o; -r
(accumulo) 01/01: Merge branch 'main' into elasticity
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git commit 422d48a4325bb11b9aaae1ec8c61bdae4afca214 Merge: afe2857935 ebf7054d1f Author: Dave Marion AuthorDate: Mon Mar 4 12:51:34 2024 + Merge branch 'main' into elasticity assemble/bin/accumulo-cluster | 10 +- .../accumulo/core/conf/cluster/ClusterConfigParser.java | 8 ++-- .../accumulo/core/conf/cluster/ClusterConfigParserTest.java | 11 --- .../core/conf/cluster/cluster-with-optional-services.yaml | 3 ++- 4 files changed, 25 insertions(+), 7 deletions(-) diff --cc core/src/main/java/org/apache/accumulo/core/conf/cluster/ClusterConfigParser.java index 0c465564d9,e00570154a..c4ebf8c7ec --- a/core/src/main/java/org/apache/accumulo/core/conf/cluster/ClusterConfigParser.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/cluster/ClusterConfigParser.java @@@ -39,23 -39,13 +39,24 @@@ import edu.umd.cs.findbugs.annotations. public class ClusterConfigParser { private static final String PROPERTY_FORMAT = "%s=\"%s\"%n"; - private static final String[] SECTIONS = new String[] {"manager", "monitor", "gc", "tserver"}; - - private static final Set VALID_CONFIG_KEYS = Set.of("manager", "monitor", "gc", "tserver", - "tservers_per_host", "sservers_per_host", "compaction.coordinator", "compactors_per_host"); + private static final String COMPACTOR_PREFIX = "compactor."; ++ private static final String COMPACTORS_PER_HOST_KEY = "compactors_per_host"; + private static final String GC_KEY = "gc"; + private static final String MANAGER_KEY = "manager"; + private static final String MONITOR_KEY = "monitor"; + private static final String SSERVER_PREFIX = "sserver."; + private static final String SSERVERS_PER_HOST_KEY = "sservers_per_host"; + private static final String TSERVER_PREFIX = "tserver."; + private static final String TSERVERS_PER_HOST_KEY = "tservers_per_host"; + + private static final String[] UNGROUPED_SECTIONS = + new String[] {MANAGER_KEY, MONITOR_KEY, GC_KEY}; + - private static final Set VALID_CONFIG_KEYS = - Set.of(MANAGER_KEY, MONITOR_KEY, GC_KEY, SSERVERS_PER_HOST_KEY, TSERVERS_PER_HOST_KEY); ++ private static final Set VALID_CONFIG_KEYS = Set.of(MANAGER_KEY, MONITOR_KEY, GC_KEY, ++ SSERVERS_PER_HOST_KEY, TSERVERS_PER_HOST_KEY, COMPACTORS_PER_HOST_KEY); private static final Set VALID_CONFIG_PREFIXES = - Set.of("compaction.compactor.", "sserver."); + Set.of(COMPACTOR_PREFIX, SSERVER_PREFIX, TSERVER_PREFIX); private static final Predicate VALID_CONFIG_SECTIONS = section -> VALID_CONFIG_KEYS.contains(section) diff --cc core/src/test/java/org/apache/accumulo/core/conf/cluster/ClusterConfigParserTest.java index 148b5e4f24,1410dc569a..189e48afc3 --- a/core/src/test/java/org/apache/accumulo/core/conf/cluster/ClusterConfigParserTest.java +++ b/core/src/test/java/org/apache/accumulo/core/conf/cluster/ClusterConfigParserTest.java @@@ -63,15 -63,17 +63,16 @@@ public class ClusterConfigParserTest assertEquals("localhost1 localhost2", contents.get("monitor")); assertTrue(contents.containsKey("gc")); assertEquals("localhost", contents.get("gc")); -assertTrue(contents.containsKey("tserver")); -assertEquals("localhost1 localhost2 localhost3 localhost4", contents.get("tserver")); -assertFalse(contents.containsKey("compaction")); -assertFalse(contents.containsKey("compaction.coordinator")); -assertFalse(contents.containsKey("compaction.compactor")); -assertFalse(contents.containsKey("compaction.compactor.queue")); -assertFalse(contents.containsKey("compaction.compactor.q1")); -assertFalse(contents.containsKey("compaction.compactor.q2")); +assertFalse(contents.containsKey("tserver")); +assertTrue(contents.containsKey("tserver.default")); +assertEquals("localhost1 localhost2 localhost3 localhost4", contents.get("tserver.default")); +assertFalse(contents.containsKey("compactor")); +assertFalse(contents.containsKey("compactor.queue")); +assertFalse(contents.containsKey("compactor.q1")); +assertFalse(contents.containsKey("compactor.q2")); assertFalse(contents.containsKey("tservers_per_host")); assertFalse(contents.containsKey("sservers_per_host")); + assertFalse(contents.containsKey("compactors_per_host")); } @Test @@@ -83,7 -85,7 +84,7 @@@ Map contents = ClusterConfigParser.parseConfiguration(new File(configFile.toURI()).getAbsolutePath()); --assertEquals(13, contents.size()); ++assertEquals(14, contents.size()); assertTrue(contents.containsKey("manager")); assertEquals("localhost1 localhost2", contents.get("manager")); assertTrue(contents.containsKey("monitor")); @@@ -168,10 -170,10 +171,11 @@@
(accumulo) branch elasticity updated (afe2857935 -> 422d48a432)
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a change to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git from afe2857935 Merge branch 'main' into elasticity add 18b745466e Added compactors_per_host to accumulo-cluster script (#4329) add ebf7054d1f Merge branch '2.1' new 422d48a432 Merge branch 'main' into elasticity The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: assemble/bin/accumulo-cluster | 10 +- .../accumulo/core/conf/cluster/ClusterConfigParser.java | 8 ++-- .../accumulo/core/conf/cluster/ClusterConfigParserTest.java | 11 --- .../core/conf/cluster/cluster-with-optional-services.yaml | 3 ++- 4 files changed, 25 insertions(+), 7 deletions(-)
(accumulo) 01/01: Merge branch '2.1'
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git commit ebf7054d1f6bf62d5170dcbbb07c7b0ee572cf2c Merge: c976af383f 18b745466e Author: Dave Marion AuthorDate: Mon Mar 4 12:34:51 2024 + Merge branch '2.1' assemble/bin/accumulo-cluster | 10 +- .../accumulo/core/conf/cluster/ClusterConfigParser.java | 5 - .../accumulo/core/conf/cluster/ClusterConfigParserTest.java | 11 --- .../core/conf/cluster/cluster-with-optional-services.yaml | 3 ++- 4 files changed, 23 insertions(+), 6 deletions(-)
(accumulo) branch main updated (c976af383f -> ebf7054d1f)
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a change to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git from c976af383f Merge branch '2.1' add 18b745466e Added compactors_per_host to accumulo-cluster script (#4329) new ebf7054d1f Merge branch '2.1' The 1 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: assemble/bin/accumulo-cluster | 10 +- .../accumulo/core/conf/cluster/ClusterConfigParser.java | 5 - .../accumulo/core/conf/cluster/ClusterConfigParserTest.java | 11 --- .../core/conf/cluster/cluster-with-optional-services.yaml | 3 ++- 4 files changed, 23 insertions(+), 6 deletions(-)
(accumulo) branch 2.1 updated: Added compactors_per_host to accumulo-cluster script (#4329)
This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git The following commit(s) were added to refs/heads/2.1 by this push: new 18b745466e Added compactors_per_host to accumulo-cluster script (#4329) 18b745466e is described below commit 18b745466eee10c41f72012908b867005ca89881 Author: Dave Marion AuthorDate: Mon Mar 4 07:22:46 2024 -0500 Added compactors_per_host to accumulo-cluster script (#4329) --- assemble/bin/accumulo-cluster | 10 +- .../accumulo/core/conf/cluster/ClusterConfigParser.java | 5 - .../accumulo/core/conf/cluster/ClusterConfigParserTest.java | 11 --- .../core/conf/cluster/cluster-with-optional-services.yaml | 3 ++- 4 files changed, 23 insertions(+), 6 deletions(-) diff --git a/assemble/bin/accumulo-cluster b/assemble/bin/accumulo-cluster index c9936cb78b..5dd9de7e4e 100755 --- a/assemble/bin/accumulo-cluster +++ b/assemble/bin/accumulo-cluster @@ -119,6 +119,11 @@ function parse_config { if [[ -z $NUM_SSERVERS ]]; then echo "INFO: ${NUM_SSERVERS} sservers will be started per host" fi + + if [[ -z $NUM_COMPACTORS ]]; then +echo "INFO: ${NUM_COMPACTORS} compactors will be started per host" + fi + } function control_service() { @@ -130,6 +135,7 @@ function control_service() { last_instance_id=1 [[ $service == "tserver" ]] && last_instance_id=${NUM_TSERVERS:-1} [[ $service == "sserver" ]] && last_instance_id=${NUM_SSERVERS:-1} + [[ $service == "compactor" ]] && last_instance_id=${NUM_COMPACTORS:-1} for ((inst_id = 1; inst_id <= last_instance_id; inst_id++)); do ACCUMULO_SERVICE_INSTANCE="" @@ -510,10 +516,12 @@ tserver: # to start on each host. If the following variables are not set, then they default to 1. # If the environment variable NUM_TSERVERS is set when running accumulo_cluster # then its value will override what is set in this file for tservers_per_host. Likewise if -# NUM_SSERVERS is set then it will override sservers_per_host. +# NUM_SSERVERS or NUM_COMPACTORS are set then it will override sservers_per_host and +# compactors_per_host. # tservers_per_host: 1 sservers_per_host: 1 +compactors_per_host: 1 EOF ;; diff --git a/core/src/main/java/org/apache/accumulo/core/conf/cluster/ClusterConfigParser.java b/core/src/main/java/org/apache/accumulo/core/conf/cluster/ClusterConfigParser.java index 4f41cbef3e..5790fa7fd3 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/cluster/ClusterConfigParser.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/cluster/ClusterConfigParser.java @@ -42,7 +42,7 @@ public class ClusterConfigParser { private static final String[] SECTIONS = new String[] {"manager", "monitor", "gc", "tserver"}; private static final Set VALID_CONFIG_KEYS = Set.of("manager", "monitor", "gc", "tserver", - "tservers_per_host", "sservers_per_host", "compaction.coordinator"); + "tservers_per_host", "sservers_per_host", "compaction.coordinator", "compactors_per_host"); private static final Set VALID_CONFIG_PREFIXES = Set.of("compaction.compactor.", "sserver."); @@ -150,6 +150,9 @@ public class ClusterConfigParser { String numSservers = config.getOrDefault("sservers_per_host", "1"); out.print("NUM_SSERVERS=\"${NUM_SSERVERS:=" + numSservers + "}\"\n"); +String numCompactors = config.getOrDefault("compactors_per_host", "1"); +out.print("NUM_COMPACTORS=\"${NUM_COMPACTORS:=" + numCompactors + "}\"\n"); + out.flush(); } diff --git a/core/src/test/java/org/apache/accumulo/core/conf/cluster/ClusterConfigParserTest.java b/core/src/test/java/org/apache/accumulo/core/conf/cluster/ClusterConfigParserTest.java index ef2c2382bc..1410dc569a 100644 --- a/core/src/test/java/org/apache/accumulo/core/conf/cluster/ClusterConfigParserTest.java +++ b/core/src/test/java/org/apache/accumulo/core/conf/cluster/ClusterConfigParserTest.java @@ -73,6 +73,7 @@ public class ClusterConfigParserTest { assertFalse(contents.containsKey("compaction.compactor.q2")); assertFalse(contents.containsKey("tservers_per_host")); assertFalse(contents.containsKey("sservers_per_host")); +assertFalse(contents.containsKey("compactors_per_host")); } @Test @@ -84,7 +85,7 @@ public class ClusterConfigParserTest { Map contents = ClusterConfigParser.parseConfiguration(new File(configFile.toURI()).getAbsolutePath()); -assertEquals(12, contents.size()); +assertEquals(13, contents.size()); assertTrue(contents.containsKey("manager")); assertEquals("localhost1 localhost2", contents.get("manager")); assertTrue(contents.containsKey("monitor")); @@ -111,7 +112,9 @@ public class ClusterConfigParserTest { assertTrue(contents.containsKey("tservers_per_host")); assertEquals("2", contents.get("tservers_per_host")); assertTr