This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/master by this push: new e357d8a Refactored tablet loading code to use Ample (#1302) e357d8a is described below commit e357d8ab5a29f28794593d31ca2981619266499c Author: Keith Turner <ktur...@apache.org> AuthorDate: Fri Aug 2 11:59:28 2019 -0400 Refactored tablet loading code to use Ample (#1302) --- .../accumulo/core/metadata/MetadataTable.java | 1 - .../core/metadata/schema/MetadataSchema.java | 24 ++- .../core/metadata/schema/TabletMetadata.java | 67 +++++-- .../core/metadata/schema/TabletMetadataTest.java | 7 +- .../server/constraints/MetadataConstraints.java | 4 +- .../org/apache/accumulo/server/fs/FileRef.java | 5 + .../server/iterators/MetadataBulkLoadFilter.java | 4 +- .../accumulo/server/util/MasterMetadataUtil.java | 45 ++--- .../accumulo/server/util/MetadataTableUtil.java | 34 +--- .../master/tableOps/bulkVer1/CopyFailed.java | 4 +- .../master/tableOps/bulkVer2/LoadFiles.java | 2 +- .../org/apache/accumulo/tserver/TabletServer.java | 214 +++++---------------- .../apache/accumulo/tserver/tablet/TabletData.java | 101 +++------- .../accumulo/tserver/CheckTabletMetadataTest.java | 22 ++- .../accumulo/test/functional/SplitRecoveryIT.java | 49 +++-- 15 files changed, 235 insertions(+), 348 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/MetadataTable.java b/core/src/main/java/org/apache/accumulo/core/metadata/MetadataTable.java index b5208e3..2bc9906 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/MetadataTable.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/MetadataTable.java @@ -25,5 +25,4 @@ public class MetadataTable { public static final TableId ID = TableId.of("!0"); public static final String NAME = Namespace.ACCUMULO.name() + ".metadata"; - } diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java index 8da43a4..9d23ae0 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java @@ -25,8 +25,10 @@ import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.PartialKey; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.schema.Section; import org.apache.accumulo.core.util.ColumnFQ; +import org.apache.accumulo.fate.FateTxId; import org.apache.hadoop.io.Text; /** @@ -86,11 +88,15 @@ public class MetadataSchema { /** * A temporary field in case a split fails and we need to roll back */ - public static final ColumnFQ OLD_PREV_ROW_COLUMN = new ColumnFQ(NAME, new Text("oldprevrow")); + public static final String OLD_PREV_ROW_QUAL = "oldprevrow"; + public static final ColumnFQ OLD_PREV_ROW_COLUMN = + new ColumnFQ(NAME, new Text(OLD_PREV_ROW_QUAL)); /** * A temporary field for splits to optimize certain operations */ - public static final ColumnFQ SPLIT_RATIO_COLUMN = new ColumnFQ(NAME, new Text("splitRatio")); + public static final String SPLIT_RATIO_QUAL = "splitRatio"; + public static final ColumnFQ SPLIT_RATIO_COLUMN = + new ColumnFQ(NAME, new Text(SPLIT_RATIO_QUAL)); } /** @@ -167,6 +173,20 @@ public class MetadataSchema { public static class BulkFileColumnFamily { public static final String STR_NAME = "loaded"; public static final Text NAME = new Text(STR_NAME); + + public static long getBulkLoadTid(Value v) { + return getBulkLoadTid(v.toString()); + } + + public static long getBulkLoadTid(String vs) { + if (FateTxId.isFormatedTid(vs)) { + return FateTxId.fromString(vs); + } else { + // a new serialization format was introduce in 2.0. This code support deserializing the + // old format. + return Long.parseLong(vs); + } + } } /** diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java index dc30c46..025d4be 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadata.java @@ -21,7 +21,9 @@ import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSec import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.DIRECTORY_QUAL; import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.FLUSH_QUAL; import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.TIME_QUAL; +import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily.OLD_PREV_ROW_QUAL; import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_QUAL; +import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily.SPLIT_RATIO_QUAL; import java.util.Collection; import java.util.EnumSet; @@ -31,7 +33,6 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Objects; import java.util.OptionalLong; -import java.util.Set; import java.util.SortedMap; import java.util.function.Function; @@ -61,7 +62,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSortedMap; import com.google.common.collect.Iterators; @@ -70,11 +70,13 @@ public class TabletMetadata { private TableId tableId; private Text prevEndRow; private boolean sawPrevEndRow = false; + private Text oldPrevEndRow; + private boolean sawOldPrevEndRow = false; private Text endRow; private Location location; private Map<String,DataFileValue> files; private List<String> scans; - private Set<String> loadedFiles; + private Map<String,Long> loadedFiles; private EnumSet<ColumnType> fetchedCols; private KeyExtent extent; private Location last; @@ -85,13 +87,27 @@ public class TabletMetadata { private OptionalLong flush = OptionalLong.empty(); private List<LogEntry> logs; private OptionalLong compact = OptionalLong.empty(); + private Double splitRatio = null; public enum LocationType { CURRENT, FUTURE, LAST } public enum ColumnType { - LOCATION, PREV_ROW, FILES, LAST, LOADED, SCANS, DIR, TIME, CLONED, FLUSH_ID, LOGS, COMPACT_ID + LOCATION, + PREV_ROW, + OLD_PREV_ROW, + FILES, + LAST, + LOADED, + SCANS, + DIR, + TIME, + CLONED, + FLUSH_ID, + LOGS, + COMPACT_ID, + SPLIT_RATIO } public static class Location { @@ -159,6 +175,20 @@ public class TabletMetadata { return sawPrevEndRow; } + public Text getOldPrevEndRow() { + ensureFetched(ColumnType.OLD_PREV_ROW); + if (!sawOldPrevEndRow) { + throw new IllegalStateException( + "No old prev endrow seen. tableId: " + tableId + " endrow: " + endRow); + } + return oldPrevEndRow; + } + + public boolean sawOldPrevEndRow() { + ensureFetched(ColumnType.OLD_PREV_ROW); + return sawOldPrevEndRow; + } + public Text getEndRow() { return endRow; } @@ -173,7 +203,7 @@ public class TabletMetadata { return location != null && location.getType() == LocationType.CURRENT; } - public Set<String> getLoaded() { + public Map<String,Long> getLoaded() { ensureFetched(ColumnType.LOADED); return loadedFiles; } @@ -228,12 +258,18 @@ public class TabletMetadata { return compact; } + public Double getSplitRatio() { + ensureFetched(ColumnType.SPLIT_RATIO); + return splitRatio; + } + public SortedMap<Key,Value> getKeyValues() { Preconditions.checkState(keyValues != null, "Requested key values when it was not saved"); return keyValues; } - static TabletMetadata convertRow(Iterator<Entry<Key,Value>> rowIter, + @VisibleForTesting + public static TabletMetadata convertRow(Iterator<Entry<Key,Value>> rowIter, EnumSet<ColumnType> fetchedColumns, boolean buildKeyValueMap) { Objects.requireNonNull(rowIter); @@ -246,7 +282,7 @@ public class TabletMetadata { var filesBuilder = ImmutableMap.<String,DataFileValue>builder(); var scansBuilder = ImmutableList.<String>builder(); var logsBuilder = ImmutableList.<LogEntry>builder(); - final var loadedFilesBuilder = ImmutableSet.<String>builder(); + final var loadedFilesBuilder = ImmutableMap.<String,Long>builder(); ByteSequence row = null; while (rowIter.hasNext()) { @@ -272,9 +308,18 @@ public class TabletMetadata { switch (fam.toString()) { case TabletColumnFamily.STR_NAME: - if (PREV_ROW_QUAL.equals(qual)) { - te.prevEndRow = KeyExtent.decodePrevEndRow(kv.getValue()); - te.sawPrevEndRow = true; + switch (qual) { + case PREV_ROW_QUAL: + te.prevEndRow = KeyExtent.decodePrevEndRow(kv.getValue()); + te.sawPrevEndRow = true; + break; + case OLD_PREV_ROW_QUAL: + te.oldPrevEndRow = KeyExtent.decodePrevEndRow(kv.getValue()); + te.sawOldPrevEndRow = true; + break; + case SPLIT_RATIO_QUAL: + te.splitRatio = Double.parseDouble(val); + break; } break; case ServerColumnFamily.STR_NAME: @@ -297,7 +342,7 @@ public class TabletMetadata { filesBuilder.put(qual, new DataFileValue(val)); break; case BulkFileColumnFamily.STR_NAME: - loadedFilesBuilder.add(qual); + loadedFilesBuilder.put(qual, BulkFileColumnFamily.getBulkLoadTid(val)); break; case CurrentLocationColumnFamily.STR_NAME: te.setLocationOnce(val, qual, LocationType.CURRENT); diff --git a/core/src/test/java/org/apache/accumulo/core/metadata/schema/TabletMetadataTest.java b/core/src/test/java/org/apache/accumulo/core/metadata/schema/TabletMetadataTest.java index c7a06d4..5d94876 100644 --- a/core/src/test/java/org/apache/accumulo/core/metadata/schema/TabletMetadataTest.java +++ b/core/src/test/java/org/apache/accumulo/core/metadata/schema/TabletMetadataTest.java @@ -48,6 +48,7 @@ import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; import org.apache.accumulo.core.metadata.schema.TabletMetadata.LocationType; import org.apache.accumulo.core.tabletserver.log.LogEntry; import org.apache.accumulo.core.util.HostAndPort; +import org.apache.accumulo.fate.FateTxId; import org.apache.hadoop.io.Text; import org.junit.Test; @@ -64,8 +65,8 @@ public class TabletMetadataTest { FLUSH_COLUMN.put(mutation, new Value("6")); TIME_COLUMN.put(mutation, new Value("M123456789")); - mutation.at().family(BulkFileColumnFamily.NAME).qualifier("bf1").put(""); - mutation.at().family(BulkFileColumnFamily.NAME).qualifier("bf2").put(""); + mutation.at().family(BulkFileColumnFamily.NAME).qualifier("bf1").put(FateTxId.formatTid(56)); + mutation.at().family(BulkFileColumnFamily.NAME).qualifier("bf2").put(FateTxId.formatTid(59)); mutation.at().family(ClonedColumnFamily.NAME).qualifier("").put("OK"); @@ -102,7 +103,7 @@ public class TabletMetadataTest { assertEquals(Map.of("df1", dfv1, "df2", dfv2), tm.getFilesMap()); assertEquals(6L, tm.getFlushId().getAsLong()); assertEquals(rowMap, tm.getKeyValues()); - assertEquals(Set.of("bf1", "bf2"), Set.copyOf(tm.getLoaded())); + assertEquals(Map.of("bf1", 56L, "bf2", 59L), tm.getLoaded()); assertEquals(HostAndPort.fromParts("server1", 8555), tm.getLocation().getHostAndPort()); assertEquals("s001", tm.getLocation().getSession()); assertEquals(LocationType.CURRENT, tm.getLocation().getType()); diff --git a/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java b/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java index 4e9ad98..0b59364 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java +++ b/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java @@ -34,6 +34,7 @@ import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.metadata.schema.DataFileValue; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.BulkFileColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ChoppedColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ClonedColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily; @@ -44,7 +45,6 @@ import org.apache.accumulo.fate.zookeeper.ZooCache; import org.apache.accumulo.fate.zookeeper.ZooLock; import org.apache.accumulo.fate.zookeeper.ZooUtil; import org.apache.accumulo.server.ServerContext; -import org.apache.accumulo.server.util.MetadataTableUtil; import org.apache.accumulo.server.zookeeper.TransactionWatcher.Arbitrator; import org.apache.accumulo.server.zookeeper.TransactionWatcher.ZooArbitrator; import org.apache.hadoop.io.Text; @@ -230,7 +230,7 @@ public class MetadataConstraints implements Constraint { } if (!isSplitMutation && !isLocationMutation) { - long tid = MetadataTableUtil.getBulkLoadTid(new Value(tidString)); + long tid = BulkFileColumnFamily.getBulkLoadTid(new Value(tidString)); try { if (otherTidCount > 0 || !dataFiles.equals(loadedFiles) || !getArbitrator(context) diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/FileRef.java b/server/base/src/main/java/org/apache/accumulo/server/fs/FileRef.java index 361aaab..c149cdd 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/fs/FileRef.java +++ b/server/base/src/main/java/org/apache/accumulo/server/fs/FileRef.java @@ -17,6 +17,7 @@ package org.apache.accumulo.server.fs; import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.metadata.schema.Ample; import org.apache.accumulo.server.fs.VolumeManager.FileType; import org.apache.hadoop.fs.Path; @@ -36,6 +37,10 @@ public class FileRef implements Ample.FileMeta, Comparable<FileRef> { this(key.getColumnQualifier().toString(), fs.getFullPath(key)); } + public FileRef(VolumeManager fs, String metaReference, TableId tableId) { + this(metaReference, fs.getFullPath(tableId, metaReference)); + } + public FileRef(String metaReference, Path fullReference) { this.metaReference = metaReference; this.fullReference = fullReference; diff --git a/server/base/src/main/java/org/apache/accumulo/server/iterators/MetadataBulkLoadFilter.java b/server/base/src/main/java/org/apache/accumulo/server/iterators/MetadataBulkLoadFilter.java index 38ebe85..816e900 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/iterators/MetadataBulkLoadFilter.java +++ b/server/base/src/main/java/org/apache/accumulo/server/iterators/MetadataBulkLoadFilter.java @@ -28,8 +28,8 @@ import org.apache.accumulo.core.iterators.IteratorEnvironment; import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; import org.apache.accumulo.core.iterators.SortedKeyValueIterator; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.BulkFileColumnFamily; import org.apache.accumulo.server.ServerContext; -import org.apache.accumulo.server.util.MetadataTableUtil; import org.apache.accumulo.server.zookeeper.TransactionWatcher.Arbitrator; import org.apache.accumulo.server.zookeeper.TransactionWatcher.ZooArbitrator; import org.slf4j.Logger; @@ -51,7 +51,7 @@ public class MetadataBulkLoadFilter extends Filter { @Override public boolean accept(Key k, Value v) { if (!k.isDeleted() && k.compareColumnFamily(TabletsSection.BulkFileColumnFamily.NAME) == 0) { - long txid = MetadataTableUtil.getBulkLoadTid(v); + long txid = BulkFileColumnFamily.getBulkLoadTid(v); Status status = bulkTxStatusCache.get(txid); if (status == null) { diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/MasterMetadataUtil.java b/server/base/src/main/java/org/apache/accumulo/server/util/MasterMetadataUtil.java index 5fd79a2..1c50b0e 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/MasterMetadataUtil.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/MasterMetadataUtil.java @@ -16,7 +16,6 @@ */ package org.apache.accumulo.server.util; -import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly; import java.util.ArrayList; @@ -46,9 +45,9 @@ import org.apache.accumulo.core.metadata.schema.DataFileValue; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataTime; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.metadata.schema.TabletMetadata.LocationType; import org.apache.accumulo.core.security.Authorizations; -import org.apache.accumulo.core.util.ColumnFQ; import org.apache.accumulo.fate.zookeeper.ZooLock; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.fs.FileRef; @@ -95,43 +94,26 @@ public class MasterMetadataUtil { tablet.mutate(); } - public static KeyExtent fixSplit(ServerContext context, Text metadataEntry, - SortedMap<ColumnFQ,Value> columns, ZooLock lock) throws AccumuloException { - log.info("Incomplete split {} attempting to fix", metadataEntry); - - Value oper = columns.get(TabletsSection.TabletColumnFamily.OLD_PREV_ROW_COLUMN); - - if (columns.get(TabletsSection.TabletColumnFamily.SPLIT_RATIO_COLUMN) == null) { - throw new IllegalArgumentException( - "Metadata entry does not have split ratio (" + metadataEntry + ")"); - } - - double splitRatio = Double.parseDouble( - new String(columns.get(TabletsSection.TabletColumnFamily.SPLIT_RATIO_COLUMN).get(), UTF_8)); - - Value prevEndRowIBW = columns.get(TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN); + public static KeyExtent fixSplit(ServerContext context, TabletMetadata meta, ZooLock lock) + throws AccumuloException { + log.info("Incomplete split {} attempting to fix", meta.getExtent()); - if (prevEndRowIBW == null) { + if (meta.getSplitRatio() == null) { throw new IllegalArgumentException( - "Metadata entry does not have prev row (" + metadataEntry + ")"); + "Metadata entry does not have split ratio (" + meta.getExtent() + ")"); } - Value time = columns.get(TabletsSection.ServerColumnFamily.TIME_COLUMN); - - if (time == null) { + if (meta.getTime() == null) { throw new IllegalArgumentException( - "Metadata entry does not have time (" + metadataEntry + ")"); + "Metadata entry does not have time (" + meta.getExtent() + ")"); } - Text metadataPrevEndRow = KeyExtent.decodePrevEndRow(prevEndRowIBW); - - TableId tableId = (new KeyExtent(metadataEntry, (Text) null)).getTableId(); - - return fixSplit(context, tableId, metadataEntry, metadataPrevEndRow, oper, splitRatio, lock); + return fixSplit(context, meta.getTableId(), meta.getExtent().getMetadataEntry(), + meta.getPrevEndRow(), meta.getOldPrevEndRow(), meta.getSplitRatio(), lock); } private static KeyExtent fixSplit(ServerContext context, TableId tableId, Text metadataEntry, - Text metadataPrevEndRow, Value oper, double splitRatio, ZooLock lock) + Text metadataPrevEndRow, Text oper, double splitRatio, ZooLock lock) throws AccumuloException { if (metadataPrevEndRow == null) // something is wrong, this should not happen... if a tablet is split, it will always have a @@ -147,9 +129,8 @@ public class MasterMetadataUtil { if (!scanner2.iterator().hasNext()) { log.info("Rolling back incomplete split {} {}", metadataEntry, metadataPrevEndRow); - MetadataTableUtil.rollBackSplit(metadataEntry, KeyExtent.decodePrevEndRow(oper), context, - lock); - return new KeyExtent(metadataEntry, KeyExtent.decodePrevEndRow(oper)); + MetadataTableUtil.rollBackSplit(metadataEntry, oper, context, lock); + return new KeyExtent(metadataEntry, oper); } else { log.info("Finishing incomplete split {} {}", metadataEntry, metadataPrevEndRow); diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java index 1dedfc8..57edd93 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java @@ -68,6 +68,7 @@ import org.apache.accumulo.core.metadata.schema.Ample.TabletMutator; import org.apache.accumulo.core.metadata.schema.DataFileValue; import org.apache.accumulo.core.metadata.schema.MetadataSchema; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.BulkFileColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ChoppedColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ClonedColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily; @@ -697,18 +698,6 @@ public class MetadataTableUtil { tablet.mutate(); } - public static long getBulkLoadTid(Value v) { - String vs = v.toString(); - - if (FateTxId.isFormatedTid(vs)) { - return FateTxId.fromString(vs); - } else { - // a new serialization format was introduce in 2.0. This code support deserializing the old - // format. - return Long.parseLong(vs); - } - } - public static void removeBulkLoadEntries(AccumuloClient client, TableId tableId, long tid) throws Exception { try ( @@ -720,7 +709,7 @@ public class MetadataTableUtil { for (Entry<Key,Value> entry : mscanner) { log.trace("Looking at entry {} with tid {}", entry, tid); - long entryTid = getBulkLoadTid(entry.getValue()); + long entryTid = BulkFileColumnFamily.getBulkLoadTid(entry.getValue()); if (tid == entryTid) { log.trace("deleting entry {}", entry); Key key = entry.getKey(); @@ -732,25 +721,6 @@ public class MetadataTableUtil { } } - public static Map<Long,? extends Collection<FileRef>> getBulkFilesLoaded(ServerContext context, - KeyExtent extent) { - Text metadataRow = extent.getMetadataEntry(); - Map<Long,List<FileRef>> result = new HashMap<>(); - - VolumeManager fs = context.getVolumeManager(); - try (Scanner scanner = new ScannerImpl(context, - extent.isMeta() ? RootTable.ID : MetadataTable.ID, Authorizations.EMPTY)) { - scanner.setRange(new Range(metadataRow)); - scanner.fetchColumnFamily(TabletsSection.BulkFileColumnFamily.NAME); - for (Entry<Key,Value> entry : scanner) { - Long tid = getBulkLoadTid(entry.getValue()); - List<FileRef> lst = result.computeIfAbsent(tid, k -> new ArrayList<FileRef>()); - lst.add(new FileRef(fs, entry.getKey())); - } - } - return result; - } - public static void addBulkLoadInProgressFlag(ServerContext context, String path, long fateTxid) { Mutation m = new Mutation(MetadataSchema.BlipSection.getRowPrefix() + path); diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer1/CopyFailed.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer1/CopyFailed.java index 39f510c..fe36533 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer1/CopyFailed.java +++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer1/CopyFailed.java @@ -36,6 +36,7 @@ import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.master.thrift.BulkImportState; import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.BulkFileColumnFamily; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.fate.FateTxId; import org.apache.accumulo.fate.Repo; @@ -45,7 +46,6 @@ import org.apache.accumulo.server.fs.FileRef; import org.apache.accumulo.server.fs.VolumeManager; import org.apache.accumulo.server.master.LiveTServerSet.TServerConnection; import org.apache.accumulo.server.master.state.TServerInstance; -import org.apache.accumulo.server.util.MetadataTableUtil; import org.apache.accumulo.server.zookeeper.DistributedWorkQueue; import org.apache.hadoop.fs.Path; import org.apache.thrift.TException; @@ -123,7 +123,7 @@ class CopyFailed extends MasterRepo { mscanner.fetchColumnFamily(TabletsSection.BulkFileColumnFamily.NAME); for (Entry<Key,Value> entry : mscanner) { - if (MetadataTableUtil.getBulkLoadTid(entry.getValue()) == tid) { + if (BulkFileColumnFamily.getBulkLoadTid(entry.getValue()) == tid) { FileRef loadedFile = new FileRef(fs, entry.getKey()); String absPath = failures.remove(loadedFile); if (absPath != null) { diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/LoadFiles.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/LoadFiles.java index 7f069ad..1798458 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/LoadFiles.java +++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/LoadFiles.java @@ -211,7 +211,7 @@ class LoadFiles extends MasterRepo { server = location.getHostAndPort(); } - Set<String> loadedFiles = tablet.getLoaded(); + Set<String> loadedFiles = tablet.getLoaded().keySet(); Map<String,MapFileInfo> thriftImports = new HashMap<>(); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index 5d1ce6c..c76ca45 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@ -39,7 +39,6 @@ import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.Objects; import java.util.Random; import java.util.Set; import java.util.SortedMap; @@ -69,7 +68,6 @@ import org.apache.accumulo.core.client.SampleNotPresentException; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.clientImpl.CompressedIterators; import org.apache.accumulo.core.clientImpl.DurabilityImpl; -import org.apache.accumulo.core.clientImpl.ScannerImpl; import org.apache.accumulo.core.clientImpl.Tables; import org.apache.accumulo.core.clientImpl.TabletLocator; import org.apache.accumulo.core.clientImpl.TabletType; @@ -90,7 +88,6 @@ import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.NamespaceId; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.TableId; -import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.dataImpl.thrift.InitialMultiScan; import org.apache.accumulo.core.dataImpl.thrift.InitialScan; @@ -120,7 +117,6 @@ import org.apache.accumulo.core.master.thrift.TabletLoadState; import org.apache.accumulo.core.master.thrift.TabletServerStatus; import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.metadata.RootTable; -import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.metadata.schema.TabletMetadata.Location; import org.apache.accumulo.core.metadata.schema.TabletMetadata.LocationType; @@ -152,7 +148,6 @@ import org.apache.accumulo.core.tabletserver.thrift.TabletStats; import org.apache.accumulo.core.trace.TraceUtil; import org.apache.accumulo.core.trace.thrift.TInfo; import org.apache.accumulo.core.util.ByteBufferUtil; -import org.apache.accumulo.core.util.ColumnFQ; import org.apache.accumulo.core.util.ComparablePair; import org.apache.accumulo.core.util.Daemon; import org.apache.accumulo.core.util.HostAndPort; @@ -212,7 +207,6 @@ import org.apache.accumulo.server.security.delegation.ZooAuthenticationKeyWatche import org.apache.accumulo.server.util.FileSystemMonitor; import org.apache.accumulo.server.util.Halt; import org.apache.accumulo.server.util.MasterMetadataUtil; -import org.apache.accumulo.server.util.MetadataTableUtil; import org.apache.accumulo.server.util.ServerBulkImportStatus; import org.apache.accumulo.server.util.time.RelativeTime; import org.apache.accumulo.server.util.time.SimpleTimer; @@ -2435,30 +2429,33 @@ public class TabletServer extends AbstractServer { // check Metadata table before accepting assignment Text locationToOpen = null; - SortedMap<Key,Value> tabletsKeyValues = new TreeMap<>(); + TabletMetadata tabletMetadata = null; + boolean canLoad = false; try { - Pair<Text,KeyExtent> pair = - verifyTabletInformation(getContext(), extent, TabletServer.this.getTabletSession(), - tabletsKeyValues, getClientAddressString(), getLock()); - if (pair != null) { - locationToOpen = pair.getFirst(); - if (pair.getSecond() != null) { - synchronized (openingTablets) { - openingTablets.remove(extent); - openingTablets.notifyAll(); - // it expected that the new extent will overlap the old one... if it does not, it - // should not be added to unopenedTablets - if (!KeyExtent.findOverlapping(extent, new TreeSet<>(Arrays.asList(pair.getSecond()))) - .contains(pair.getSecond())) { - throw new IllegalStateException( - "Fixed split does not overlap " + extent + " " + pair.getSecond()); - } - unopenedTablets.add(pair.getSecond()); + tabletMetadata = getContext().getAmple().readTablet(extent); + + canLoad = checkTabletMetadata(extent, TabletServer.this.getTabletSession(), tabletMetadata); + + if (canLoad && tabletMetadata.sawOldPrevEndRow()) { + KeyExtent fixedExtent = + MasterMetadataUtil.fixSplit(getContext(), tabletMetadata, getLock()); + + synchronized (openingTablets) { + openingTablets.remove(extent); + openingTablets.notifyAll(); + // it expected that the new extent will overlap the old one... if it does not, it + // should not be added to unopenedTablets + if (!KeyExtent.findOverlapping(extent, new TreeSet<>(Arrays.asList(fixedExtent))) + .contains(fixedExtent)) { + throw new IllegalStateException( + "Fixed split does not overlap " + extent + " " + fixedExtent); } - // split was rolled back... try again - new AssignmentHandler(pair.getSecond()).run(); - return; + unopenedTablets.add(fixedExtent); } + // split was rolled back... try again + new AssignmentHandler(fixedExtent).run(); + return; + } } catch (Exception e) { synchronized (openingTablets) { @@ -2470,7 +2467,7 @@ public class TabletServer extends AbstractServer { throw new RuntimeException(e); } - if (locationToOpen == null) { + if (!canLoad) { log.debug("Reporting tablet {} assignment failure: unable to verify Tablet Information", extent); synchronized (openingTablets) { @@ -2491,9 +2488,9 @@ public class TabletServer extends AbstractServer { resourceManager.createTabletResourceManager(extent, getTableConfiguration(extent)); TabletData data; if (extent.isRootTablet()) { - data = new TabletData(getContext(), fs, getTableConfiguration(extent)); + data = new TabletData(getContext(), fs, getTableConfiguration(extent), tabletMetadata); } else { - data = new TabletData(extent, fs, tabletsKeyValues.entrySet().iterator()); + data = new TabletData(extent, fs, tabletMetadata); } tablet = new Tablet(TabletServer.this, extent, trm, data); @@ -2956,156 +2953,37 @@ public class TabletServer extends AbstractServer { SimpleTimer.getInstance(aconf).schedule(replicationWorkThreadPoolResizer, 10000, 30000); } - private static Pair<Text,KeyExtent> verifyRootTablet(ServerContext context, - TServerInstance instance) throws AccumuloException { - - TabletMetadata rootMeta = context.getAmple().readTablet(RootTable.EXTENT); - - if (rootMeta.getLocation() == null) { - throw new AccumuloException("Illegal state: location is not set in zookeeper"); - } - - Location loc = rootMeta.getLocation(); - - if (loc.getType() == LocationType.FUTURE && !instance.equals(new TServerInstance(loc))) { - throw new AccumuloException( - "Future location is not to this server for the root tablet " + loc); - } - - if (loc.getType() == LocationType.CURRENT) { - throw new AccumuloException("Root tablet already has a location set " + loc); - } - - return new Pair<>(new Text(rootMeta.getDir()), null); - } - - public static Pair<Text,KeyExtent> verifyTabletInformation(ServerContext context, - KeyExtent extent, TServerInstance instance, final SortedMap<Key,Value> tabletsKeyValues, - String clientAddress, ZooLock lock) throws DistributedStoreException, AccumuloException { - Objects.requireNonNull(tabletsKeyValues); + static boolean checkTabletMetadata(KeyExtent extent, TServerInstance instance, + TabletMetadata meta) throws AccumuloException { - log.debug("verifying extent {}", extent); - if (extent.isRootTablet()) { - return verifyRootTablet(context, instance); - } - TableId tableToVerify = MetadataTable.ID; - if (extent.isMeta()) { - tableToVerify = RootTable.ID; + if (!meta.sawPrevEndRow()) { + throw new AccumuloException("Metadata entry does not have prev row (" + meta.getTableId() + + " " + meta.getEndRow() + ")"); } - List<ColumnFQ> columnsToFetch = - Arrays.asList(TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN, - TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN, - TabletsSection.TabletColumnFamily.SPLIT_RATIO_COLUMN, - TabletsSection.TabletColumnFamily.OLD_PREV_ROW_COLUMN, - TabletsSection.ServerColumnFamily.TIME_COLUMN); - - TreeMap<Key,Value> tkv = new TreeMap<>(); - try (ScannerImpl scanner = new ScannerImpl(context, tableToVerify, Authorizations.EMPTY)) { - scanner.setRange(extent.toMetadataRange()); - for (Entry<Key,Value> entry : scanner) { - tkv.put(entry.getKey(), entry.getValue()); - } + if (!extent.equals(meta.getExtent())) { + log.info("Tablet extent mismatch {} {}", extent, meta.getExtent()); + return false; } - // only populate map after success - tabletsKeyValues.clear(); - tabletsKeyValues.putAll(tkv); - - Text metadataEntry = extent.getMetadataEntry(); - - Value dir = checkTabletMetadata(extent, instance, tabletsKeyValues, metadataEntry); - if (dir == null) { - return null; - } - - Value oldPrevEndRow = null; - for (Entry<Key,Value> entry : tabletsKeyValues.entrySet()) { - if (TabletsSection.TabletColumnFamily.OLD_PREV_ROW_COLUMN.hasColumns(entry.getKey())) { - oldPrevEndRow = entry.getValue(); - } - } - - if (oldPrevEndRow != null) { - SortedMap<Text,SortedMap<ColumnFQ,Value>> tabletEntries; - tabletEntries = MetadataTableUtil.getTabletEntries(tabletsKeyValues, columnsToFetch); - - KeyExtent fke = MasterMetadataUtil.fixSplit(context, metadataEntry, - tabletEntries.get(metadataEntry), lock); - - if (!fke.equals(extent)) { - return new Pair<>(null, fke); - } - - // reread and reverify metadata entries now that metadata entries were fixed - tabletsKeyValues.clear(); - return verifyTabletInformation(context, fke, instance, tabletsKeyValues, clientAddress, lock); - } - - return new Pair<>(new Text(dir.get()), null); - } - - static Value checkTabletMetadata(KeyExtent extent, TServerInstance instance, - SortedMap<Key,Value> tabletsKeyValues, Text metadataEntry) throws AccumuloException { - - TServerInstance future = null; - Value prevEndRow = null; - Value dir = null; - Value time = null; - for (Entry<Key,Value> entry : tabletsKeyValues.entrySet()) { - Key key = entry.getKey(); - if (!metadataEntry.equals(key.getRow())) { - log.info("Unexpected row in tablet metadata {} {}", metadataEntry, key.getRow()); - return null; - } - Text cf = key.getColumnFamily(); - if (cf.equals(TabletsSection.FutureLocationColumnFamily.NAME)) { - if (future != null) { - throw new AccumuloException("Tablet has multiple future locations " + extent); - } - future = new TServerInstance(entry.getValue(), key.getColumnQualifier()); - } else if (cf.equals(TabletsSection.CurrentLocationColumnFamily.NAME)) { - log.info("Tablet seems to be already assigned to {}", - new TServerInstance(entry.getValue(), key.getColumnQualifier())); - return null; - } else if (TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.hasColumns(key)) { - prevEndRow = entry.getValue(); - } else if (TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.hasColumns(key)) { - dir = entry.getValue(); - } else if (TabletsSection.ServerColumnFamily.TIME_COLUMN.hasColumns(key)) { - time = entry.getValue(); - } - } - - if (prevEndRow == null) { - throw new AccumuloException("Metadata entry does not have prev row (" + metadataEntry + ")"); - } else { - KeyExtent ke2 = new KeyExtent(metadataEntry, prevEndRow); - if (!extent.equals(ke2)) { - log.info("Tablet prev end row mismatch {} {}", extent, ke2.getPrevEndRow()); - return null; - } + if (meta.getDir() == null) { + throw new AccumuloException( + "Metadata entry does not have directory (" + meta.getExtent() + ")"); } - if (dir == null) { - throw new AccumuloException("Metadata entry does not have directory (" + metadataEntry + ")"); + if (meta.getTime() == null && !extent.equals(RootTable.EXTENT)) { + throw new AccumuloException("Metadata entry does not have time (" + meta.getExtent() + ")"); } - if (time == null && !extent.equals(RootTable.OLD_EXTENT)) { - throw new AccumuloException("Metadata entry does not have time (" + metadataEntry + ")"); - } + Location loc = meta.getLocation(); - if (future == null) { - log.info("The master has not assigned {} to {}", extent, instance); - return null; - } - - if (!instance.equals(future)) { - log.info("Table {} has been assigned to {} which is not {}", extent, future, instance); - return null; + if (loc == null || loc.getType() != LocationType.FUTURE + || !instance.equals(new TServerInstance(loc))) { + log.info("Unexpected location {} {}", extent, loc); + return false; } - return dir; + return true; } public String getClientAddressString() { diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletData.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletData.java index 5aaff34..53f5ebc 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletData.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletData.java @@ -16,38 +16,23 @@ */ package org.apache.accumulo.tserver.tablet; -import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.COMPACT_COLUMN; -import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN; -import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.FLUSH_COLUMN; -import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN; - import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.SortedMap; import java.util.TreeMap; import org.apache.accumulo.core.client.admin.TimeType; import org.apache.accumulo.core.conf.AccumuloConfiguration; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.file.FileOperations; import org.apache.accumulo.core.file.FileSKVIterator; import org.apache.accumulo.core.metadata.RootTable; import org.apache.accumulo.core.metadata.schema.DataFileValue; -import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.BulkFileColumnFamily; -import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily; -import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LastLocationColumnFamily; -import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily; -import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ScanFileColumnFamily; -import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataTime; import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.tabletserver.log.LogEntry; @@ -56,20 +41,16 @@ import org.apache.accumulo.server.fs.FileRef; import org.apache.accumulo.server.fs.VolumeManager; import org.apache.accumulo.server.fs.VolumeUtil; import org.apache.accumulo.server.master.state.TServerInstance; -import org.apache.accumulo.server.util.MetadataTableUtil; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.Text; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; /* * Basic information needed to create a tablet. */ public class TabletData { - private static Logger log = LoggerFactory.getLogger(TabletData.class); - private MetadataTime time = null; private SortedMap<FileRef,DataFileValue> dataFiles = new TreeMap<>(); private List<LogEntry> logEntries = new ArrayList<>(); @@ -82,58 +63,32 @@ public class TabletData { private String directory = null; // Read tablet data from metadata tables - public TabletData(KeyExtent extent, VolumeManager fs, Iterator<Entry<Key,Value>> entries) { - final Text family = new Text(); - Text rowName = extent.getMetadataEntry(); - while (entries.hasNext()) { - Entry<Key,Value> entry = entries.next(); - Key key = entry.getKey(); - Value value = entry.getValue(); - key.getColumnFamily(family); - if (key.compareRow(rowName) != 0) { - log.info("Unexpected metadata table entry for {}: {}", extent, key.getRow()); - continue; - } - if (ServerColumnFamily.TIME_COLUMN.hasColumns(entry.getKey())) { - if (time == null) { - time = MetadataTime.parse(value.toString()); - } - } else if (DataFileColumnFamily.NAME.equals(family)) { - FileRef ref = new FileRef(fs, key); - dataFiles.put(ref, new DataFileValue(entry.getValue().get())); - } else if (DIRECTORY_COLUMN.hasColumns(key)) { - directory = value.toString(); - } else if (family.equals(LogColumnFamily.NAME)) { - logEntries.add(LogEntry.fromKeyValue(key, entry.getValue())); - } else if (family.equals(ScanFileColumnFamily.NAME)) { - scanFiles.add(new FileRef(fs, key)); - } else if (FLUSH_COLUMN.hasColumns(key)) { - flushID = Long.parseLong(value.toString()); - } else if (COMPACT_COLUMN.hasColumns(key)) { - compactID = Long.parseLong(entry.getValue().toString()); - } else if (family.equals(LastLocationColumnFamily.NAME)) { - lastLocation = new TServerInstance(value, key.getColumnQualifier()); - } else if (family.equals(BulkFileColumnFamily.NAME)) { - Long id = MetadataTableUtil.getBulkLoadTid(value); - bulkImported.computeIfAbsent(id, l -> new ArrayList<FileRef>()).add(new FileRef(fs, key)); - } else if (PREV_ROW_COLUMN.hasColumns(key)) { - KeyExtent check = new KeyExtent(key.getRow(), value); - if (!check.equals(extent)) { - throw new RuntimeException("Found bad entry for " + extent + ": " + check); - } - } - } - if (time == null && dataFiles.isEmpty() && extent.equals(RootTable.OLD_EXTENT)) { - // recovery... old root tablet has no data, so time doesn't matter: - time = new MetadataTime(Long.MIN_VALUE, TimeType.LOGICAL); - } + public TabletData(KeyExtent extent, VolumeManager fs, TabletMetadata meta) { + + this.time = meta.getTime(); + this.compactID = meta.getCompactId().orElse(-1); + this.flushID = meta.getFlushId().orElse(-1); + this.directory = meta.getDir(); + this.logEntries.addAll(meta.getLogs()); + meta.getScans().forEach(path -> scanFiles.add(new FileRef(fs, path, meta.getTableId()))); + + if (meta.getLast() != null) + this.lastLocation = new TServerInstance(meta.getLast()); + + meta.getFilesMap().forEach((path, dfv) -> { + dataFiles.put(new FileRef(fs, path, meta.getTableId()), dfv); + }); + + meta.getLoaded().forEach((path, txid) -> { + bulkImported.computeIfAbsent(txid, k -> new ArrayList<FileRef>()) + .add(new FileRef(fs, path, meta.getTableId())); + }); } // Read basic root table metadata from zookeeper - public TabletData(ServerContext context, VolumeManager fs, AccumuloConfiguration conf) - throws IOException { - - TabletMetadata rootMeta = context.getAmple().readTablet(RootTable.EXTENT); + public TabletData(ServerContext context, VolumeManager fs, AccumuloConfiguration conf, + TabletMetadata rootMeta) throws IOException { + Preconditions.checkArgument(rootMeta.getExtent().equals(RootTable.EXTENT)); directory = VolumeUtil.switchRootTableVolume(context, rootMeta.getDir()); @@ -166,11 +121,7 @@ public class TabletData { } } - try { - logEntries = new ArrayList<>(rootMeta.getLogs()); - } catch (Exception ex) { - throw new RuntimeException("Unable to read tablet log entries", ex); - } + this.logEntries.addAll(rootMeta.getLogs()); } // Data pulled from an existing tablet to make a split diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/CheckTabletMetadataTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/CheckTabletMetadataTest.java index 15d552f..bdfa737 100644 --- a/server/tserver/src/test/java/org/apache/accumulo/tserver/CheckTabletMetadataTest.java +++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/CheckTabletMetadataTest.java @@ -17,9 +17,11 @@ package org.apache.accumulo.tserver; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import java.util.EnumSet; import java.util.TreeMap; import org.apache.accumulo.core.data.Key; @@ -27,6 +29,8 @@ import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; import org.apache.accumulo.core.util.ColumnFQ; import org.apache.accumulo.server.master.state.TServerInstance; import org.apache.hadoop.io.Text; @@ -55,9 +59,11 @@ public class CheckTabletMetadataTest { private static void assertFail(TreeMap<Key,Value> tabletMeta, KeyExtent ke, TServerInstance tsi) { try { - assertNull(TabletServer.checkTabletMetadata(ke, tsi, tabletMeta, ke.getMetadataEntry())); + TabletMetadata tm = TabletMetadata.convertRow(tabletMeta.entrySet().iterator(), + EnumSet.allOf(ColumnType.class), true); + assertFalse(TabletServer.checkTabletMetadata(ke, tsi, tm)); } catch (Exception e) { - + e.printStackTrace(); } } @@ -66,9 +72,11 @@ public class CheckTabletMetadataTest { TreeMap<Key,Value> copy = new TreeMap<>(tabletMeta); assertNotNull(copy.remove(keyToDelete)); try { - assertNull(TabletServer.checkTabletMetadata(ke, tsi, copy, ke.getMetadataEntry())); + TabletMetadata tm = TabletMetadata.convertRow(copy.entrySet().iterator(), + EnumSet.allOf(ColumnType.class), true); + assertFalse(TabletServer.checkTabletMetadata(ke, tsi, tm)); } catch (Exception e) { - + e.printStackTrace(); } } @@ -87,7 +95,9 @@ public class CheckTabletMetadataTest { TServerInstance tsi = new TServerInstance("127.0.0.1:9997", 4); - assertNotNull(TabletServer.checkTabletMetadata(ke, tsi, tabletMeta, ke.getMetadataEntry())); + TabletMetadata tm = TabletMetadata.convertRow(tabletMeta.entrySet().iterator(), + EnumSet.allOf(ColumnType.class), true); + assertTrue(TabletServer.checkTabletMetadata(ke, tsi, tm)); assertFail(tabletMeta, ke, new TServerInstance("127.0.0.1:9998", 4)); assertFail(tabletMeta, ke, new TServerInstance("127.0.0.1:9998", 5)); diff --git a/test/src/main/java/org/apache/accumulo/test/functional/SplitRecoveryIT.java b/test/src/main/java/org/apache/accumulo/test/functional/SplitRecoveryIT.java index 540f673..aaadfdc 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/SplitRecoveryIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/SplitRecoveryIT.java @@ -18,6 +18,7 @@ package org.apache.accumulo.test.functional; import static java.nio.charset.StandardCharsets.UTF_8; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import java.util.ArrayList; import java.util.Collection; @@ -47,6 +48,7 @@ import org.apache.accumulo.core.metadata.schema.DataFileValue; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataTime; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.util.ColumnFQ; import org.apache.accumulo.fate.zookeeper.ZooLock; @@ -62,7 +64,6 @@ import org.apache.accumulo.server.master.state.TServerInstance; import org.apache.accumulo.server.util.MasterMetadataUtil; import org.apache.accumulo.server.util.MetadataTableUtil; import org.apache.accumulo.server.zookeeper.TransactionWatcher; -import org.apache.accumulo.tserver.TabletServer; import org.apache.hadoop.io.Text; import org.junit.Test; @@ -171,6 +172,17 @@ public class SplitRecoveryIT extends ConfigurableMacBase { "localhost:1234", failPoint, zl); } + private static Map<Long,List<FileRef>> getBulkFilesLoaded(ServerContext context, + KeyExtent extent) { + Map<Long,List<FileRef>> bulkFiles = new HashMap<>(); + + context.getAmple().readTablet(extent).getLoaded() + .forEach((path, txid) -> bulkFiles.computeIfAbsent(txid, k -> new ArrayList<FileRef>()) + .add(new FileRef(context.getVolumeManager(), path, extent.getTableId()))); + + return bulkFiles; + } + private void splitPartiallyAndRecover(ServerContext context, KeyExtent extent, KeyExtent high, KeyExtent low, double splitRatio, SortedMap<FileRef,DataFileValue> mapFiles, Text midRow, String location, int steps, ZooLock zl) throws Exception { @@ -191,8 +203,8 @@ public class SplitRecoveryIT extends ConfigurableMacBase { writer.update(m); if (steps >= 1) { - Map<Long,? extends Collection<FileRef>> bulkFiles = - MetadataTableUtil.getBulkFilesLoaded(context, extent); + Map<Long,List<FileRef>> bulkFiles = getBulkFilesLoaded(context, extent); + MasterMetadataUtil.addNewTablet(context, low, "/lowDir", instance, lowDatafileSizes, bulkFiles, new MetadataTime(0, TimeType.LOGICAL), -1L, -1L, zl); } @@ -200,17 +212,19 @@ public class SplitRecoveryIT extends ConfigurableMacBase { MetadataTableUtil.finishSplit(high, highDatafileSizes, highDatafilesToRemove, context, zl); } - TabletServer.verifyTabletInformation(context, high, instance, new TreeMap<>(), "127.0.0.1:0", - zl); + TabletMetadata meta = context.getAmple().readTablet(high); + KeyExtent fixedExtent = MasterMetadataUtil.fixSplit(context, meta, zl); + + if (steps < 2) + assertEquals(splitRatio, meta.getSplitRatio(), 0.0); if (steps >= 1) { + assertEquals(high, fixedExtent); ensureTabletHasNoUnexpectedMetadataEntries(context, low, lowDatafileSizes); ensureTabletHasNoUnexpectedMetadataEntries(context, high, highDatafileSizes); - Map<Long,? extends Collection<FileRef>> lowBulkFiles = - MetadataTableUtil.getBulkFilesLoaded(context, low); - Map<Long,? extends Collection<FileRef>> highBulkFiles = - MetadataTableUtil.getBulkFilesLoaded(context, high); + Map<Long,? extends Collection<FileRef>> lowBulkFiles = getBulkFilesLoaded(context, low); + Map<Long,? extends Collection<FileRef>> highBulkFiles = getBulkFilesLoaded(context, high); if (!lowBulkFiles.equals(highBulkFiles)) { throw new Exception(" " + lowBulkFiles + " != " + highBulkFiles + " " + low + " " + high); @@ -220,6 +234,7 @@ public class SplitRecoveryIT extends ConfigurableMacBase { throw new Exception(" no bulk files " + low); } } else { + assertEquals(extent, fixedExtent); ensureTabletHasNoUnexpectedMetadataEntries(context, extent, mapFiles); } } @@ -243,14 +258,25 @@ public class SplitRecoveryIT extends ConfigurableMacBase { expectedColumnFamilies.add(TabletsSection.BulkFileColumnFamily.NAME); Iterator<Entry<Key,Value>> iter = scanner.iterator(); + + boolean sawPer = false; + while (iter.hasNext()) { - Key key = iter.next().getKey(); + Entry<Key,Value> entry = iter.next(); + Key key = entry.getKey(); if (!key.getRow().equals(extent.getMetadataEntry())) { throw new Exception( "Tablet " + extent + " contained unexpected " + MetadataTable.NAME + " entry " + key); } + if (TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN.hasColumns(key)) { + sawPer = true; + if (!new KeyExtent(key.getRow(), entry.getValue()).equals(extent)) { + throw new Exception("Unexpected prev end row " + entry); + } + } + if (expectedColumnFamilies.contains(key.getColumnFamily())) { continue; } @@ -263,11 +289,12 @@ public class SplitRecoveryIT extends ConfigurableMacBase { "Tablet " + extent + " contained unexpected " + MetadataTable.NAME + " entry " + key); } - System.out.println("expectedColumns " + expectedColumns); if (expectedColumns.size() > 1 || (expectedColumns.size() == 1)) { throw new Exception("Not all expected columns seen " + extent + " " + expectedColumns); } + assertTrue(sawPer); + SortedMap<FileRef,DataFileValue> fixedMapFiles = MetadataTableUtil.getFileAndLogEntries(context, extent).getSecond(); verifySame(expectedMapFiles, fixedMapFiles);