This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch 1451-external-compactions-feature
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to 
refs/heads/1451-external-compactions-feature by this push:
     new 3b9b9fa  Fixes #1989: Optimization in CompactionCoordinator startup to 
find tserver hosting tablet that is being compacted
3b9b9fa is described below

commit 3b9b9fa3105632aec5332cafbf857b5fdba4153d
Author: Dave Marion <dlmar...@apache.org>
AuthorDate: Wed Mar 31 15:31:07 2021 +0000

    Fixes #1989: Optimization in CompactionCoordinator startup to find tserver 
hosting tablet that is being compacted
---
 .../coordinator/CompactionCoordinator.java         | 79 +++++++++++++++++-----
 .../org/apache/accumulo/compactor/Compactor.java   |  4 +-
 2 files changed, 64 insertions(+), 19 deletions(-)

diff --git 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
index 7e1a42a..895d131 100644
--- 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
+++ 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
@@ -47,6 +47,9 @@ import org.apache.accumulo.core.dataImpl.KeyExtent;
 import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent;
 import org.apache.accumulo.core.metadata.TServerInstance;
 import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata.LocationType;
 import org.apache.accumulo.core.rpc.ThriftUtil;
 import org.apache.accumulo.core.securityImpl.thrift.TCredentials;
 import org.apache.accumulo.core.tabletserver.thrift.CompactionStats;
@@ -213,27 +216,69 @@ public class CompactionCoordinator extends AbstractServer
         LOG.info("Found {} running external compactions", running.size());
         running.forEach((hp, job) -> {
           // Find the tserver that has this compaction id
-          // CBUG: Is there a more efficient way of finding the tablet server?
           boolean matchFound = false;
-          for (TServerInstance tsi : tservers) {
-            TabletClientService.Client client = null;
-            try {
-              client = getTabletServerConnection(tsi);
-              boolean tserverMatch = 
client.isRunningExternalCompaction(TraceUtil.traceInfo(),
-                  getContext().rpcCreds(), job.getExternalCompactionId(), 
job.getExtent());
-              if (tserverMatch) {
-                
RUNNING.put(ExternalCompactionId.of(job.getExternalCompactionId()),
-                    new RunningCompaction(job, 
ExternalCompactionUtil.getHostPortString(hp), tsi));
-                matchFound = true;
+
+          // Attempt to find the TServer hosting the tablet based on the 
metadata table
+          // CBUG use #1974 for more efficient metadata reads
+          KeyExtent extent = KeyExtent.fromThrift(job.getExtent());
+          TabletMetadata tabletMetadata = 
getContext().getAmple().readTablets().forTablet(extent)
+              .fetch(ColumnType.LOCATION, 
ColumnType.PREV_ROW).build().stream().findFirst()
+              .orElse(null);
+
+          if (tabletMetadata != null && 
tabletMetadata.getExtent().equals(extent)
+              && tabletMetadata.getLocation() != null
+              && tabletMetadata.getLocation().getType() == 
LocationType.CURRENT) {
+
+            TServerInstance tsi = tservers.stream()
+                .filter(
+                    t -> 
t.getHostAndPort().equals(tabletMetadata.getLocation().getHostAndPort()))
+                .findFirst().orElse(null);
+
+            if (null != tsi) {
+              TabletClientService.Client client = null;
+              try {
+                client = getTabletServerConnection(tsi);
+                boolean tserverMatch = 
client.isRunningExternalCompaction(TraceUtil.traceInfo(),
+                    getContext().rpcCreds(), job.getExternalCompactionId(), 
job.getExtent());
+                if (tserverMatch) {
+                  
RUNNING.put(ExternalCompactionId.of(job.getExternalCompactionId()),
+                      new RunningCompaction(job, 
ExternalCompactionUtil.getHostPortString(hp),
+                          tsi));
+                  matchFound = true;
+                }
+              } catch (TException e) {
+                LOG.warn("Failed to notify tserver {}",
+                    tabletMetadata.getLocation().getHostAndPort(), e);
+              } finally {
+                ThriftUtil.returnClient(client);
               }
-            } catch (TException e) {
-              LOG.error(
-                  "Error from tserver {} while trying to check if external 
compaction is running, trying next tserver",
-                  
ExternalCompactionUtil.getHostPortString(tsi.getHostAndPort()), e);
-            } finally {
-              ThriftUtil.returnClient(client);
             }
           }
+
+          // As a fallback, try them all
+          if (!matchFound) {
+            for (TServerInstance tsi : tservers) {
+              TabletClientService.Client client = null;
+              try {
+                client = getTabletServerConnection(tsi);
+                boolean tserverMatch = 
client.isRunningExternalCompaction(TraceUtil.traceInfo(),
+                    getContext().rpcCreds(), job.getExternalCompactionId(), 
job.getExtent());
+                if (tserverMatch) {
+                  
RUNNING.put(ExternalCompactionId.of(job.getExternalCompactionId()),
+                      new RunningCompaction(job, 
ExternalCompactionUtil.getHostPortString(hp),
+                          tsi));
+                  matchFound = true;
+                }
+              } catch (TException e) {
+                LOG.error(
+                    "Error from tserver {} while trying to check if external 
compaction is running, trying next tserver",
+                    
ExternalCompactionUtil.getHostPortString(tsi.getHostAndPort()), e);
+              } finally {
+                ThriftUtil.returnClient(client);
+              }
+            }
+          }
+
           if (!matchFound) {
             // There is an external compaction running on a Compactor, but we 
can't resolve it to a
             // TServer?
diff --git 
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java 
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
index ed46edd..698e5f8 100644
--- 
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
+++ 
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
@@ -717,13 +717,13 @@ public class Compactor extends AbstractServer
         job = JOB_HOLDER.getJob();
       }
       while (null == job) {
-        //CBUG: It's possible that the call from the coordinator could
+        // CBUG: It's possible that the call from the coordinator could
         // be stuck here waiting for a compaction to be reserved and stall the
         // DeadCompactionDetector from contacting other compactors.
         UtilWaitThread.sleep(50);
         synchronized (JOB_HOLDER) {
           job = JOB_HOLDER.getJob();
-        }        
+        }
       }
       return job;
     }

Reply via email to