keith-turner commented on a change in pull request #2320:
URL: https://github.com/apache/accumulo/pull/2320#discussion_r750705528



##########
File path: 
server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
##########
@@ -791,6 +798,50 @@ public void run() {
       }
     }, 0, 5000, TimeUnit.MILLISECONDS);
 
+    // Periodically check that metadata of tablets matches what is held in 
memory
+    
ThreadPools.createGeneralScheduledExecutorService(aconf).scheduleWithFixedDelay(()
 -> {
+      final SortedMap<KeyExtent,Tablet> onlineTabletsSnapshot = 
onlineTablets.snapshot();
+
+      final SortedSet<KeyExtent> userTablets = new TreeSet<>();
+      final SortedSet<KeyExtent> nonUserTablets = new TreeSet<>();
+
+      // Create subsets of tablets based on DataLevel: one set who's DataLevel 
is USER and another
+      // containing the remaining tablets (those who's DataLevel is ROOT or 
METADATA).
+      // This needs to happen so we can use .readTablets() on the 
DataLevel.USER tablets in order
+      // to reduce RPCs.
+      // TODO: Push this partitioning, based on DataLevel, to ample.
+      onlineTabletsSnapshot.forEach((k, v) -> {
+        if (Ample.DataLevel.of(k.tableId()) == Ample.DataLevel.USER) {
+          userTablets.add(k);
+        } else {
+          nonUserTablets.add(k);
+        }
+      });
+
+      List<TabletMetadata> tmdList;
+
+      // gather metadata for all tablets with DataLevel.USER using 
readTablets()
+      try (TabletsMetadata tabletsMetadata = 
getContext().getAmple().readTablets()
+          .forTablets(userTablets).fetch(FILES, LOGS, ECOMP, 
PREV_ROW).build()) {
+        tmdList = new 
ArrayList<>(IteratorUtils.toList(tabletsMetadata.iterator()));
+      }
+
+      // gather metadata for all tablets with DataLevel.ROOT or METADATA using 
readTablet()
+      nonUserTablets.forEach(extent -> {
+        TabletMetadata tabletMetadata =
+            getContext().getAmple().readTablet(extent, FILES, LOGS, ECOMP, 
PREV_ROW);
+        tmdList.add(tabletMetadata);
+      });
+
+      // for each tablet, compare its metadata to what is held in memory
+      for (TabletMetadata tabletMetadata : tmdList) {

Review comment:
       If for some reason a tablet was not found in the metadata table, that 
could be a problem that this loop would not detect because its only look at 
what was returned. Maybe the code should loop over the onlineTabletsSnapshot 
set and look for the corresponding data found from metadata table, logging an 
error if not found.

##########
File path: 
server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
##########
@@ -791,6 +798,50 @@ public void run() {
       }
     }, 0, 5000, TimeUnit.MILLISECONDS);
 
+    // Periodically check that metadata of tablets matches what is held in 
memory
+    
ThreadPools.createGeneralScheduledExecutorService(aconf).scheduleWithFixedDelay(()
 -> {
+      final SortedMap<KeyExtent,Tablet> onlineTabletsSnapshot = 
onlineTablets.snapshot();
+
+      final SortedSet<KeyExtent> userTablets = new TreeSet<>();
+      final SortedSet<KeyExtent> nonUserTablets = new TreeSet<>();
+
+      // Create subsets of tablets based on DataLevel: one set who's DataLevel 
is USER and another
+      // containing the remaining tablets (those who's DataLevel is ROOT or 
METADATA).
+      // This needs to happen so we can use .readTablets() on the 
DataLevel.USER tablets in order
+      // to reduce RPCs.
+      // TODO: Push this partitioning, based on DataLevel, to ample.
+      onlineTabletsSnapshot.forEach((k, v) -> {
+        if (Ample.DataLevel.of(k.tableId()) == Ample.DataLevel.USER) {
+          userTablets.add(k);
+        } else {
+          nonUserTablets.add(k);
+        }
+      });
+
+      List<TabletMetadata> tmdList;
+

Review comment:
       should get the update counts before reading from the metadata table.  
This allows to detect changes that happen while reading the metadata.
   
   ```suggestion
   Map<KeyExtent, Long> updateCounts = new HashMap<>()
   onlineTabletsSnapshot.forEach((ke,tablet) -> {
      updateCounts.put(ke, tablet.getUpdateCount());
   });
   ```

##########
File path: 
server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
##########
@@ -791,6 +798,50 @@ public void run() {
       }
     }, 0, 5000, TimeUnit.MILLISECONDS);
 
+    // Periodically check that metadata of tablets matches what is held in 
memory
+    
ThreadPools.createGeneralScheduledExecutorService(aconf).scheduleWithFixedDelay(()
 -> {
+      final SortedMap<KeyExtent,Tablet> onlineTabletsSnapshot = 
onlineTablets.snapshot();
+
+      final SortedSet<KeyExtent> userTablets = new TreeSet<>();
+      final SortedSet<KeyExtent> nonUserTablets = new TreeSet<>();
+
+      // Create subsets of tablets based on DataLevel: one set who's DataLevel 
is USER and another
+      // containing the remaining tablets (those who's DataLevel is ROOT or 
METADATA).
+      // This needs to happen so we can use .readTablets() on the 
DataLevel.USER tablets in order
+      // to reduce RPCs.
+      // TODO: Push this partitioning, based on DataLevel, to ample.
+      onlineTabletsSnapshot.forEach((k, v) -> {
+        if (Ample.DataLevel.of(k.tableId()) == Ample.DataLevel.USER) {
+          userTablets.add(k);
+        } else {
+          nonUserTablets.add(k);
+        }
+      });
+
+      List<TabletMetadata> tmdList;
+
+      // gather metadata for all tablets with DataLevel.USER using 
readTablets()
+      try (TabletsMetadata tabletsMetadata = 
getContext().getAmple().readTablets()

Review comment:
       I think ample used to read tablets using only the end row, so could get 
back things w/ diff prev end row than in key extent.  However I think we 
changed Ample to only return data where the end row and prev end row match.  
But not completely sure, need to verify that as the correctness of this code 
depends on it.

##########
File path: 
server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
##########
@@ -791,6 +798,50 @@ public void run() {
       }
     }, 0, 5000, TimeUnit.MILLISECONDS);
 
+    // Periodically check that metadata of tablets matches what is held in 
memory
+    
ThreadPools.createGeneralScheduledExecutorService(aconf).scheduleWithFixedDelay(()
 -> {
+      final SortedMap<KeyExtent,Tablet> onlineTabletsSnapshot = 
onlineTablets.snapshot();
+
+      final SortedSet<KeyExtent> userTablets = new TreeSet<>();
+      final SortedSet<KeyExtent> nonUserTablets = new TreeSet<>();
+
+      // Create subsets of tablets based on DataLevel: one set who's DataLevel 
is USER and another
+      // containing the remaining tablets (those who's DataLevel is ROOT or 
METADATA).
+      // This needs to happen so we can use .readTablets() on the 
DataLevel.USER tablets in order
+      // to reduce RPCs.
+      // TODO: Push this partitioning, based on DataLevel, to ample.
+      onlineTabletsSnapshot.forEach((k, v) -> {
+        if (Ample.DataLevel.of(k.tableId()) == Ample.DataLevel.USER) {
+          userTablets.add(k);
+        } else {
+          nonUserTablets.add(k);
+        }
+      });
+
+      List<TabletMetadata> tmdList;
+
+      // gather metadata for all tablets with DataLevel.USER using 
readTablets()
+      try (TabletsMetadata tabletsMetadata = 
getContext().getAmple().readTablets()
+          .forTablets(userTablets).fetch(FILES, LOGS, ECOMP, 
PREV_ROW).build()) {
+        tmdList = new 
ArrayList<>(IteratorUtils.toList(tabletsMetadata.iterator()));
+      }
+
+      // gather metadata for all tablets with DataLevel.ROOT or METADATA using 
readTablet()
+      nonUserTablets.forEach(extent -> {
+        TabletMetadata tabletMetadata =
+            getContext().getAmple().readTablet(extent, FILES, LOGS, ECOMP, 
PREV_ROW);
+        tmdList.add(tabletMetadata);
+      });
+
+      // for each tablet, compare its metadata to what is held in memory
+      for (TabletMetadata tabletMetadata : tmdList) {
+        KeyExtent extent = tabletMetadata.getExtent();
+        Tablet tablet = onlineTabletsSnapshot.get(extent);
+        Long counter = tablet.getUpdateCounter();

Review comment:
       ```suggestion
           Long counter = updateCounts.get(extent);
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to