This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/elasticity by this push:
     new cb25f26ab3 batches reading tablet metadata for refresh (#4439)
cb25f26ab3 is described below

commit cb25f26ab3be77dbfce54e9d1e7234e5974510ec
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Wed Apr 17 17:38:37 2024 -0400

    batches reading tablet metadata for refresh (#4439)
    
    Changes tablet server code that refreshes tablet metadata to read tablet
    metadata for multiple tablets in a batch.  Also noticed the refresh code
    was not handling some race condition correctly and could result in
    spurious error messages.  It was also not handling closed tablets.
    Refactored the code to return false when refresh is not possble for
    acceptable reasons instead of throw an error.  This will cause the the
    RPC to retry for those tablets later.  Also remove some code that was
    attempting to handle complex race conditions that would be hard to test,
    realized if these conditions do happen that it would be best to retry
    rather than try to handle them.
    
    
    Co-authored-by: Dave Marion <dlmar...@apache.org>
---
 .../accumulo/tserver/TabletClientHandler.java      |  86 ++++---------
 .../org/apache/accumulo/tserver/tablet/Tablet.java | 137 +++++++++++++++++----
 2 files changed, 138 insertions(+), 85 deletions(-)

diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java
 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java
index f8532de0b8..a1770b90cb 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java
@@ -31,6 +31,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -1096,80 +1097,47 @@ public class TabletClientHandler implements 
TabletServerClientService.Iface,
     // handle that more expensive case if needed.
     var tabletsSnapshot = server.getOnlineTablets();
 
-    Set<KeyExtent> notFound = new HashSet<>();
+    Map<KeyExtent,Tablet.RefreshSession> refreshSessions = new HashMap<>();
+
+    // Created this as synchronized list because it's passed to a lambda that 
could possibly run in
+    // another thread.
+    List<TKeyExtent> unableToRefresh = Collections.synchronizedList(new 
ArrayList<>());
 
     for (var tkextent : refreshes) {
       var extent = KeyExtent.fromThrift(tkextent);
-
       var tablet = tabletsSnapshot.get(extent);
       if (tablet != null) {
-        // ELASTICITY_TODO use a batch reader to read all tablets metadata at 
once instead of one by
-        // one. This may be a bit tricky from a synchronization perspective 
(with multiple tablets
-        // and multiple concurrent refresh request), so defer doing this until 
after removing
-        // functionality from the tablet. No need to make the change now and 
have to change it
-        // later.
-        tablet.refreshMetadata(RefreshPurpose.REFRESH_RPC);
+        refreshSessions.put(extent, tablet.startRefresh());
       } else {
-        notFound.add(extent);
+        unableToRefresh.add(extent.toThrift());
       }
     }
 
-    if (!notFound.isEmpty()) {
-      // Some tablets were not found, lets see if they are loading or moved to 
online while doing
-      // the refreshes above.
-      List<TKeyExtent> unableToRefresh = new ArrayList<>();
-      List<Tablet> foundTablets = new ArrayList<>();
-
-      synchronized (server.unopenedTablets) {
-        synchronized (server.openingTablets) {
-          synchronized (server.onlineTablets) {
-            // Get the snapshot again, however this time nothing will be 
changing while we iterate
-            // over the snapshot because all three locks are held.
-            tabletsSnapshot = server.getOnlineTablets();
-            for (var extent : notFound) {
-              // TODO investigate if its safe to ignore tablets in the 
unopened set because they
-              // have not yet read any metadata
-              if (server.unopenedTablets.contains(extent)
-                  || server.openingTablets.contains(extent)) {
-                // Can not refresh these tablets that are in the process of 
loading, but they may
-                // still need refreshing because we don't know when they read 
their metadata
-                // relative to the refresh event.
-                unableToRefresh.add(extent.toThrift());
-              } else {
-                var tablet = tabletsSnapshot.get(extent);
-                if (tablet != null) {
-                  // Intentionally not calling refresh on the tablet while 
holding these locks.
-                  foundTablets.add(tablet);
-                }
-              }
-            }
-
-            // If a tablet is not in any of the three sets then that is ok, it 
either means the
-            // tablet has not begun to load at all yet in which case it will 
see the metadata when
-            // it does load later OR it means the tablet has already 
completely unloaded. There is
-            // nothing to report back for either case.
-          }
+    // Read the metadata for all of the tablets that need to be refreshed. Its 
very important that
+    // this metadata read is done after calling Tablet.startRefresh, this is 
because the refresh
+    // session can detect changes that may necessitate rereading the tablet 
metadata. The refresh
+    // session assumes the metadata was read after the session was created in 
order to determine if
+    // its still valid.
+    try (var tabletsMeta = context.getAmple().readTablets()
+        .forTablets(refreshSessions.keySet(), Optional.of(e -> 
unableToRefresh.add(e.toThrift())))
+        .build()) {
+      for (var tabletMeta : tabletsMeta) {
+        var refreshSession = refreshSessions.get(tabletMeta.getExtent());
+        if (!refreshSession.refreshMetadata(RefreshPurpose.REFRESH_RPC, 
tabletMeta)) {
+          unableToRefresh.add(tabletMeta.getExtent().toThrift());
         }
       }
+    }
 
-      for (var tablet : foundTablets) {
-        tablet.refreshMetadata(RefreshPurpose.REFRESH_RPC);
-      }
-
-      if (log.isDebugEnabled()) {
-        for (var extent : unableToRefresh) {
-          // these tablet could hold up bulk import, lets logs the specific 
tablet in case it stays
-          // like this
-          log.debug("Unable to refresh tablet that is currently loading : {}",
-              KeyExtent.fromThrift(extent));
-        }
+    if (log.isDebugEnabled()) {
+      for (var extent : unableToRefresh) {
+        // these tablet could hold up bulk import, lets logs the specific 
tablet in case it stays
+        // like this
+        log.debug("Unable to refresh tablet : {}", 
KeyExtent.fromThrift(extent));
       }
-
-      return unableToRefresh;
     }
 
-    // no problematic extents to report
-    return List.of();
+    return unableToRefresh;
   }
 
   @Override
diff --git 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
index 2e87b25305..348807f161 100644
--- 
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
+++ 
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
@@ -39,6 +39,7 @@ import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -134,7 +135,23 @@ public class Tablet extends TabletBase {
 
   private final AtomicLong dataSourceDeletions = new AtomicLong(0);
 
-  private volatile TabletMetadata latestMetadata;
+  // This class exists so that a single volatile can reference two variables. 
Coordinating reads and
+  // writes of two separate volatiles that depend on each other is really 
tricky, putting them under
+  // a single volatile removes the tricky part. One key factor to avoiding 
consistency issues is
+  // that instances of this class are immutable, so that should not be changed 
w/o considering the
+  // implications on multithreading.
+  private static class LatestMetadata {
+    final TabletMetadata tabletMetadata;
+    // this exists to detect changes in tabletMetadata
+    final long refreshCount;
+
+    private LatestMetadata(TabletMetadata tabletMetadata, long refreshCount) {
+      this.tabletMetadata = tabletMetadata;
+      this.refreshCount = refreshCount;
+    }
+  }
+
+  private final AtomicReference<LatestMetadata> latestMetadata;
 
   @Override
   public long getDataSourceDeletions() {
@@ -233,7 +250,8 @@ public class Tablet extends TabletBase {
 
     this.tabletServer = tabletServer;
     this.tabletResources = trm;
-    this.latestMetadata = metadata;
+    this.latestMetadata =
+        new AtomicReference<>(new LatestMetadata(metadata, 
RANDOM.get().nextLong()));
     this.tabletTime = TabletTime.getInstance(metadata.getTime());
     this.logId = tabletServer.createLogId();
 
@@ -331,7 +349,7 @@ public class Tablet extends TabletBase {
   }
 
   public TabletMetadata getMetadata() {
-    return latestMetadata;
+    return latestMetadata.get().tabletMetadata;
   }
 
   public void checkConditions(ConditionChecker checker, Authorizations 
authorizations,
@@ -1552,23 +1570,76 @@ public class Tablet extends TabletBase {
     MINC_COMPLETION, REFRESH_RPC, FLUSH_ID_UPDATE, LOAD
   }
 
-  public void refreshMetadata(RefreshPurpose refreshPurpose) {
+  public class RefreshSession {
+
+    private final long observedRefreshCount;
+
+    private RefreshSession(long observedRefreshCount) {
+      this.observedRefreshCount = observedRefreshCount;
+    }
+
+    /**
+     * Refresh tablet metadata using metadata that was read separately.
+     *
+     * @param tabletMetadata this tablet metadata must have been read after 
calling
+     *        {@link Tablet#startRefresh()}
+     */
+    public boolean refreshMetadata(RefreshPurpose refreshPurpose, 
TabletMetadata tabletMetadata) {
+      return Tablet.this.refreshMetadata(refreshPurpose, observedRefreshCount, 
tabletMetadata);
+    }
+  }
+
+  /**
+   * A refresh session allows code outside of this class to safely read tablet 
metadata and pass it
+   * back. This is useful for the case where many tablets need to be refreshed 
and we want to batch
+   * reading their metadata. Creating a refresh session will not block. A 
refresh session is able to
+   * detect changes in tablet metadata that happen during its existence and 
reread tablet metadata
+   * if necessary.
+   */
+  public RefreshSession startRefresh() {
+    return new RefreshSession(latestMetadata.get().refreshCount);
+  }
+
+  private boolean refreshMetadata(RefreshPurpose refreshPurpose, Long 
observedRefreshCount,
+      TabletMetadata tabletMetadata) {
+
     refreshLock.lock();
     try {
+      var prevMetadata = latestMetadata.get();
+      // if the tablet metadata passed in is stale, then reread it
+      if (observedRefreshCount == null || 
!observedRefreshCount.equals(prevMetadata.refreshCount)) {
+        if (observedRefreshCount != null) {
+          log.debug(
+              "Metadata read outside of refresh lock is no longer valid, 
rereading metadata. {} {} {}",
+              extent, observedRefreshCount, prevMetadata.refreshCount);
+        }
+        // do not want to hold tablet lock while doing metadata read as this 
could negatively impact
+        // scans
+        tabletMetadata = getContext().getAmple().readTablet(getExtent());
+        if (tabletMetadata == null) {
+          log.debug(
+              "Unable to refresh tablet {} for {} because it no longer exists 
in metadata table",
+              extent, refreshPurpose);
+          return false;
+        }
+      } else {
+        // when observedRefreshCount is not null, tabletMetadata must not be 
null
+        Preconditions.checkArgument(tabletMetadata != null);
+      }
 
-      // do not want to hold tablet lock while doing metadata read as this 
could negatively impact
-      // scans
-      TabletMetadata tabletMetadata = 
getContext().getAmple().readTablet(getExtent());
-
-      Preconditions.checkState(tabletMetadata != null, "Tablet no longer exits 
%s", getExtent());
-      Preconditions.checkState(
-          
tabletServer.getTabletSession().equals(tabletMetadata.getLocation().getServerInstance()),
-          "Tablet %s location %s is not this tserver %s", getExtent(), 
tabletMetadata.getLocation(),
-          tabletServer.getTabletSession());
+      if (tabletMetadata.getLocation() == null || 
!tabletServer.getTabletSession()
+          .equals(tabletMetadata.getLocation().getServerInstance())) {
+        log.debug("Unable to refresh tablet {} for {} because it has a 
different location {}",
+            extent, refreshPurpose, tabletMetadata.getLocation());
+        return false;
+      }
 
       synchronized (this) {
-        var prevMetadata = latestMetadata;
-        latestMetadata = tabletMetadata;
+        if (isClosed()) {
+          log.debug("Unable to refresh tablet {} for {} because the tablet is 
closed", extent,
+              refreshPurpose);
+          return false;
+        }
 
         // Its expected that what is persisted should be less than equal to 
the time that tablet has
         // in memory.
@@ -1576,16 +1647,11 @@ public class Tablet extends TabletBase {
             "Time in metadata is ahead of tablet %s memory:%s metadata:%s", 
extent, tabletTime,
             tabletMetadata.getTime());
 
-        if (log.isDebugEnabled() && 
!prevMetadata.getFiles().equals(latestMetadata.getFiles())) {
-          SetView<StoredTabletFile> removed =
-              Sets.difference(prevMetadata.getFiles(), 
latestMetadata.getFiles());
-          SetView<StoredTabletFile> added =
-              Sets.difference(latestMetadata.getFiles(), 
prevMetadata.getFiles());
-          log.debug("Tablet {} was refreshed because {}. Files removed: [{}] 
Files added: [{}]",
-              this.getExtent(), refreshPurpose,
-              
removed.stream().map(StoredTabletFile::getFileName).collect(Collectors.joining(",")),
-              
added.stream().map(StoredTabletFile::getFileName).collect(Collectors.joining(",")));
-        }
+        // must update latestMetadata before computeNumEntries() is called
+        Preconditions.checkState(
+            latestMetadata.compareAndSet(prevMetadata,
+                new LatestMetadata(tabletMetadata, prevMetadata.refreshCount + 
1)),
+            "A concurrency bug exists in the code, something is setting 
latestMetadata without holding the refreshLock.");
 
         if (refreshPurpose == RefreshPurpose.MINC_COMPLETION) {
           // Atomically replace the in memory map with the new file. Before 
this synch block a scan
@@ -1601,7 +1667,7 @@ public class Tablet extends TabletBase {
 
           // important to call this after updating latestMetadata and 
tabletMemory
           computeNumEntries();
-        } else if 
(!prevMetadata.getFilesMap().equals(latestMetadata.getFilesMap())) {
+        } else if 
(!prevMetadata.tabletMetadata.getFilesMap().equals(getMetadata().getFilesMap()))
 {
 
           // the files changed, incrementing this will cause scans to switch 
data sources
           dataSourceDeletions.incrementAndGet();
@@ -1610,6 +1676,18 @@ public class Tablet extends TabletBase {
           computeNumEntries();
         }
       }
+
+      if (log.isDebugEnabled()
+          && 
!prevMetadata.tabletMetadata.getFiles().equals(getMetadata().getFiles())) {
+        SetView<StoredTabletFile> removed =
+            Sets.difference(prevMetadata.tabletMetadata.getFiles(), 
getMetadata().getFiles());
+        SetView<StoredTabletFile> added =
+            Sets.difference(getMetadata().getFiles(), 
prevMetadata.tabletMetadata.getFiles());
+        log.debug("Tablet {} was refreshed because {}. Files removed: [{}] 
Files added: [{}]",
+            this.getExtent(), refreshPurpose,
+            
removed.stream().map(StoredTabletFile::getFileName).collect(Collectors.joining(",")),
+            
added.stream().map(StoredTabletFile::getFileName).collect(Collectors.joining(",")));
+      }
     } finally {
       refreshLock.unlock();
     }
@@ -1618,6 +1696,13 @@ public class Tablet extends TabletBase {
       scanfileManager.removeFilesAfterScan(getMetadata().getScans(),
           Location.current(tabletServer.getTabletSession()));
     }
+
+    return true;
+  }
+
+  public void refreshMetadata(RefreshPurpose refreshPurpose) {
+    Preconditions.checkState(refreshMetadata(refreshPurpose, null, null), 
"Failed to refresh %s",
+        extent);
   }
 
   public long getLastAccessTime() {

Reply via email to