keith-turner commented on a change in pull request #2320:
URL: https://github.com/apache/accumulo/pull/2320#discussion_r750705528
##########
File path:
server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
##########
@@ -791,6 +798,50 @@ public void run() {
}
}, 0, 5000, TimeUnit.MILLISECONDS);
+ // Periodically check that metadata of tablets matches what is held in
memory
+
ThreadPools.createGeneralScheduledExecutorService(aconf).scheduleWithFixedDelay(()
-> {
+ final SortedMap<KeyExtent,Tablet> onlineTabletsSnapshot =
onlineTablets.snapshot();
+
+ final SortedSet<KeyExtent> userTablets = new TreeSet<>();
+ final SortedSet<KeyExtent> nonUserTablets = new TreeSet<>();
+
+ // Create subsets of tablets based on DataLevel: one set who's DataLevel
is USER and another
+ // containing the remaining tablets (those who's DataLevel is ROOT or
METADATA).
+ // This needs to happen so we can use .readTablets() on the
DataLevel.USER tablets in order
+ // to reduce RPCs.
+ // TODO: Push this partitioning, based on DataLevel, to ample.
+ onlineTabletsSnapshot.forEach((k, v) -> {
+ if (Ample.DataLevel.of(k.tableId()) == Ample.DataLevel.USER) {
+ userTablets.add(k);
+ } else {
+ nonUserTablets.add(k);
+ }
+ });
+
+ List<TabletMetadata> tmdList;
+
+ // gather metadata for all tablets with DataLevel.USER using
readTablets()
+ try (TabletsMetadata tabletsMetadata =
getContext().getAmple().readTablets()
+ .forTablets(userTablets).fetch(FILES, LOGS, ECOMP,
PREV_ROW).build()) {
+ tmdList = new
ArrayList<>(IteratorUtils.toList(tabletsMetadata.iterator()));
+ }
+
+ // gather metadata for all tablets with DataLevel.ROOT or METADATA using
readTablet()
+ nonUserTablets.forEach(extent -> {
+ TabletMetadata tabletMetadata =
+ getContext().getAmple().readTablet(extent, FILES, LOGS, ECOMP,
PREV_ROW);
+ tmdList.add(tabletMetadata);
+ });
+
+ // for each tablet, compare its metadata to what is held in memory
+ for (TabletMetadata tabletMetadata : tmdList) {
Review comment:
If for some reason a tablet was not found in the metadata table, that
could be a problem that this loop would not detect because its only look at
what was returned. Maybe the code should loop over the onlineTabletsSnapshot
set and look for the corresponding data found from metadata table, logging an
error if not found.
##########
File path:
server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
##########
@@ -791,6 +798,50 @@ public void run() {
}
}, 0, 5000, TimeUnit.MILLISECONDS);
+ // Periodically check that metadata of tablets matches what is held in
memory
+
ThreadPools.createGeneralScheduledExecutorService(aconf).scheduleWithFixedDelay(()
-> {
+ final SortedMap<KeyExtent,Tablet> onlineTabletsSnapshot =
onlineTablets.snapshot();
+
+ final SortedSet<KeyExtent> userTablets = new TreeSet<>();
+ final SortedSet<KeyExtent> nonUserTablets = new TreeSet<>();
+
+ // Create subsets of tablets based on DataLevel: one set who's DataLevel
is USER and another
+ // containing the remaining tablets (those who's DataLevel is ROOT or
METADATA).
+ // This needs to happen so we can use .readTablets() on the
DataLevel.USER tablets in order
+ // to reduce RPCs.
+ // TODO: Push this partitioning, based on DataLevel, to ample.
+ onlineTabletsSnapshot.forEach((k, v) -> {
+ if (Ample.DataLevel.of(k.tableId()) == Ample.DataLevel.USER) {
+ userTablets.add(k);
+ } else {
+ nonUserTablets.add(k);
+ }
+ });
+
+ List<TabletMetadata> tmdList;
+
Review comment:
should get the update counts before reading from the metadata table.
This allows to detect changes that happen while reading the metadata.
```suggestion
Map<KeyExtent, Long> updateCounts = new HashMap<>()
onlineTabletsSnapshot.forEach((ke,tablet) -> {
updateCounts.put(ke, tablet.getUpdateCount());
});
```
##########
File path:
server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
##########
@@ -791,6 +798,50 @@ public void run() {
}
}, 0, 5000, TimeUnit.MILLISECONDS);
+ // Periodically check that metadata of tablets matches what is held in
memory
+
ThreadPools.createGeneralScheduledExecutorService(aconf).scheduleWithFixedDelay(()
-> {
+ final SortedMap<KeyExtent,Tablet> onlineTabletsSnapshot =
onlineTablets.snapshot();
+
+ final SortedSet<KeyExtent> userTablets = new TreeSet<>();
+ final SortedSet<KeyExtent> nonUserTablets = new TreeSet<>();
+
+ // Create subsets of tablets based on DataLevel: one set who's DataLevel
is USER and another
+ // containing the remaining tablets (those who's DataLevel is ROOT or
METADATA).
+ // This needs to happen so we can use .readTablets() on the
DataLevel.USER tablets in order
+ // to reduce RPCs.
+ // TODO: Push this partitioning, based on DataLevel, to ample.
+ onlineTabletsSnapshot.forEach((k, v) -> {
+ if (Ample.DataLevel.of(k.tableId()) == Ample.DataLevel.USER) {
+ userTablets.add(k);
+ } else {
+ nonUserTablets.add(k);
+ }
+ });
+
+ List<TabletMetadata> tmdList;
+
+ // gather metadata for all tablets with DataLevel.USER using
readTablets()
+ try (TabletsMetadata tabletsMetadata =
getContext().getAmple().readTablets()
Review comment:
I think ample used to read tablets using only the end row, so could get
back things w/ diff prev end row than in key extent. However I think we
changed Ample to only return data where the end row and prev end row match.
But not completely sure, need to verify that as the correctness of this code
depends on it.
##########
File path:
server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
##########
@@ -791,6 +798,50 @@ public void run() {
}
}, 0, 5000, TimeUnit.MILLISECONDS);
+ // Periodically check that metadata of tablets matches what is held in
memory
+
ThreadPools.createGeneralScheduledExecutorService(aconf).scheduleWithFixedDelay(()
-> {
+ final SortedMap<KeyExtent,Tablet> onlineTabletsSnapshot =
onlineTablets.snapshot();
+
+ final SortedSet<KeyExtent> userTablets = new TreeSet<>();
+ final SortedSet<KeyExtent> nonUserTablets = new TreeSet<>();
+
+ // Create subsets of tablets based on DataLevel: one set who's DataLevel
is USER and another
+ // containing the remaining tablets (those who's DataLevel is ROOT or
METADATA).
+ // This needs to happen so we can use .readTablets() on the
DataLevel.USER tablets in order
+ // to reduce RPCs.
+ // TODO: Push this partitioning, based on DataLevel, to ample.
+ onlineTabletsSnapshot.forEach((k, v) -> {
+ if (Ample.DataLevel.of(k.tableId()) == Ample.DataLevel.USER) {
+ userTablets.add(k);
+ } else {
+ nonUserTablets.add(k);
+ }
+ });
+
+ List<TabletMetadata> tmdList;
+
+ // gather metadata for all tablets with DataLevel.USER using
readTablets()
+ try (TabletsMetadata tabletsMetadata =
getContext().getAmple().readTablets()
+ .forTablets(userTablets).fetch(FILES, LOGS, ECOMP,
PREV_ROW).build()) {
+ tmdList = new
ArrayList<>(IteratorUtils.toList(tabletsMetadata.iterator()));
+ }
+
+ // gather metadata for all tablets with DataLevel.ROOT or METADATA using
readTablet()
+ nonUserTablets.forEach(extent -> {
+ TabletMetadata tabletMetadata =
+ getContext().getAmple().readTablet(extent, FILES, LOGS, ECOMP,
PREV_ROW);
+ tmdList.add(tabletMetadata);
+ });
+
+ // for each tablet, compare its metadata to what is held in memory
+ for (TabletMetadata tabletMetadata : tmdList) {
+ KeyExtent extent = tabletMetadata.getExtent();
+ Tablet tablet = onlineTabletsSnapshot.get(extent);
+ Long counter = tablet.getUpdateCounter();
Review comment:
```suggestion
Long counter = updateCounts.get(extent);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]