This is an automated email from the ASF dual-hosted git repository. domgarguilo pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push: new 1687ede Periodically verify tablet metadata (#2320) 1687ede is described below commit 1687ede211888da0dcd40323c894eff0e00fce26 Author: Dom G <47725857+domgargu...@users.noreply.github.com> AuthorDate: Mon Dec 6 16:08:30 2021 -0500 Periodically verify tablet metadata (#2320) Adds scheduled executor to periodically verify that metadata of tablets matches what is held in memory. Co-authored-by: Keith Turner <ktur...@apache.org> --- .../org/apache/accumulo/tserver/TabletServer.java | 57 ++++++++++++++++++++++ .../accumulo/tserver/tablet/DatafileManager.java | 15 ++++-- .../org/apache/accumulo/tserver/tablet/Tablet.java | 31 +++++++++--- 3 files changed, 94 insertions(+), 9 deletions(-) diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index 63c40b0..96e8e66 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@ -19,6 +19,10 @@ package org.apache.accumulo.tserver; import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.ECOMP; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FILES; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOGS; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW; import static org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly; import java.io.IOException; @@ -51,6 +55,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Stream; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.Durability; @@ -68,6 +73,9 @@ import org.apache.accumulo.core.master.thrift.TabletServerStatus; import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.metadata.RootTable; import org.apache.accumulo.core.metadata.TServerInstance; +import org.apache.accumulo.core.metadata.schema.Ample; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.core.metadata.schema.TabletsMetadata; import org.apache.accumulo.core.metrics.MetricsUtil; import org.apache.accumulo.core.replication.thrift.ReplicationServicer; import org.apache.accumulo.core.rpc.ThriftUtil; @@ -794,6 +802,55 @@ public class TabletServer extends AbstractServer { } }, 0, 5000, TimeUnit.MILLISECONDS); + int tabletCheckFrequency = 30 + random.nextInt(31); // random 30-60 minute delay + // Periodically check that metadata of tablets matches what is held in memory + ThreadPools.createGeneralScheduledExecutorService(aconf).scheduleWithFixedDelay(() -> { + final SortedMap<KeyExtent,Tablet> onlineTabletsSnapshot = onlineTablets.snapshot(); + + final SortedSet<KeyExtent> userExtents = new TreeSet<>(); + final SortedSet<KeyExtent> nonUserExtents = new TreeSet<>(); + + // Create subsets of tablets based on DataLevel: one set who's DataLevel is USER and another + // containing the remaining tablets (those who's DataLevel is ROOT or METADATA). + // This needs to happen so we can use .readTablets() on the DataLevel.USER tablets in order + // to reduce RPCs. + // TODO: Push this partitioning, based on DataLevel, to ample - accumulo issue #2373 + onlineTabletsSnapshot.forEach((ke, tablet) -> { + if (Ample.DataLevel.of(ke.tableId()) == Ample.DataLevel.USER) { + userExtents.add(ke); + } else { + nonUserExtents.add(ke); + } + }); + + Map<KeyExtent,Long> updateCounts = new HashMap<>(); + + // gather updateCounts for each tablet + onlineTabletsSnapshot.forEach((ke, tablet) -> { + updateCounts.put(ke, tablet.getUpdateCount()); + }); + + // gather metadata for all tablets with DataLevel.USER using readTablets() + try (TabletsMetadata tabletsMetadata = getContext().getAmple().readTablets() + .forTablets(userExtents).fetch(FILES, LOGS, ECOMP, PREV_ROW).build()) { + + Stream<TabletMetadata> userTablets = tabletsMetadata.stream(); + + // gather metadata for all tablets with DataLevel.ROOT or METADATA using readTablet() + Stream<TabletMetadata> nonUserTablets = nonUserExtents.stream().flatMap(extent -> Stream + .of(getContext().getAmple().readTablet(extent, FILES, LOGS, ECOMP, PREV_ROW))); + + // combine both streams of TabletMetadata + // for each tablet, compare its metadata to what is held in memory + Stream.concat(userTablets, nonUserTablets).forEach(tabletMetadata -> { + KeyExtent extent = tabletMetadata.getExtent(); + Tablet tablet = onlineTabletsSnapshot.get(extent); + Long counter = updateCounts.get(extent); + tablet.compareTabletInfo(counter, tabletMetadata); + }); + } + }, tabletCheckFrequency, tabletCheckFrequency, TimeUnit.MINUTES); + final long CLEANUP_BULK_LOADED_CACHE_MILLIS = 15 * 60 * 1000; context.getScheduledExecutor().scheduleWithFixedDelay(new BulkImportCacheCleaner(this), CLEANUP_BULK_LOADED_CACHE_MILLIS, CLEANUP_BULK_LOADED_CACHE_MILLIS, TimeUnit.MILLISECONDS); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java index 91826a2..1068a86 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java @@ -69,10 +69,12 @@ class DatafileManager { // ensure we only have one reader/writer of our bulk file notes at at time private final Object bulkFileImportLock = new Object(); + // This must be incremented whenever datafileSizes is mutated + private long updateCount; + DatafileManager(Tablet tablet, SortedMap<StoredTabletFile,DataFileValue> datafileSizes) { - for (Entry<StoredTabletFile,DataFileValue> datafiles : datafileSizes.entrySet()) { - this.datafileSizes.put(datafiles.getKey(), datafiles.getValue()); - } + this.datafileSizes.putAll(datafileSizes); + this.updateCount = 0L; this.tablet = tablet; } @@ -260,6 +262,7 @@ class DatafileManager { } datafileSizes.put(tpath.getKey(), tpath.getValue()); } + updateCount++; tablet.getTabletResources().importedMapFiles(); @@ -386,6 +389,7 @@ class DatafileManager { log.error("Adding file that is already in set {}", newFileStored); } datafileSizes.put(newFileStored, dfv); + updateCount++; } tablet.flushComplete(flushId); @@ -456,6 +460,7 @@ class DatafileManager { datafileSizes.put(newFile, dfv); // could be used by a follow on compaction in a multipass compaction } + updateCount++; tablet.computeNumEntries(); @@ -504,4 +509,8 @@ class DatafileManager { return datafileSizes.size(); } + public long getUpdateCount() { + return updateCount; + } + } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java index ac98e80..56645d6 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java @@ -82,6 +82,7 @@ import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; import org.apache.accumulo.core.metadata.schema.ExternalCompactionMetadata; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataTime; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; import org.apache.accumulo.core.protobuf.ProtobufUtil; import org.apache.accumulo.core.security.Authorizations; @@ -1401,12 +1402,7 @@ public class Tablet { throw new RuntimeException(msg); } - if (!tabletMeta.getFilesMap().equals(getDatafileManager().getDatafileSizes())) { - String msg = "Data files in differ from in memory data " + extent + " " - + tabletMeta.getFilesMap() + " " + getDatafileManager().getDatafileSizes(); - log.error(msg); - throw new RuntimeException(msg); - } + compareToDataInMemory(tabletMeta); } catch (Exception e) { String msg = "Failed to do close consistency check for tablet " + extent; log.error(msg, e); @@ -1424,6 +1420,25 @@ public class Tablet { // TODO check lastFlushID and lostCompactID - ACCUMULO-1290 } + private void compareToDataInMemory(TabletMetadata tabletMetadata) { + if (!tabletMetadata.getFilesMap().equals(getDatafileManager().getDatafileSizes())) { + String msg = "Data files in " + extent + " differ from in-memory data " + + tabletMetadata.getFilesMap() + " " + getDatafileManager().getDatafileSizes(); + log.error(msg); + } + } + + public synchronized void compareTabletInfo(Long updateCounter, TabletMetadata tabletMetadata) { + if (isClosed() || isClosing()) { + return; + } + // if the counter didn't change, compare metadata to what is in memory + if (updateCounter == this.getUpdateCount()) { + this.compareToDataInMemory(tabletMetadata); + } + // if counter did change, don't compare metadata and try again later + } + /** * Returns an int representing the total block size of the files served by this tablet. * @@ -2234,6 +2249,10 @@ public class Tablet { return datafileManager; } + public synchronized long getUpdateCount() { + return getDatafileManager().getUpdateCount(); + } + TabletMemory getTabletMemory() { return tabletMemory; }