keith-turner commented on code in PR #4691: URL: https://github.com/apache/accumulo/pull/4691#discussion_r1664903815
########## test/src/main/java/org/apache/accumulo/test/upgrade/ScanServerUpgrade11to12TestIT.java: ########## @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.upgrade; + +import static org.apache.accumulo.harness.AccumuloITBase.MINI_CLUSTER_ONLY; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; + +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.apache.accumulo.core.client.Accumulo; +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchScanner; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.MutationsRejectedException; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.manager.state.tables.TableState; +import org.apache.accumulo.core.metadata.AccumuloTable; +import org.apache.accumulo.core.metadata.ScanServerRefTabletFile; +import org.apache.accumulo.core.metadata.schema.Ample; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.harness.MiniClusterConfigurationCallback; +import org.apache.accumulo.harness.SharedMiniClusterBase; +import org.apache.accumulo.manager.upgrade.Upgrader11to12; +import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; +import org.apache.accumulo.server.ServerContext; +import org.apache.hadoop.io.Text; +import org.apache.zookeeper.KeeperException; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.net.HostAndPort; + +@Tag(MINI_CLUSTER_ONLY) +public class ScanServerUpgrade11to12TestIT extends SharedMiniClusterBase { + + public static final Logger log = LoggerFactory.getLogger(ScanServerUpgrade11to12TestIT.class); + + private static class ScanServerUpgradeITConfiguration + implements MiniClusterConfigurationCallback { + @Override + public void configureMiniCluster(MiniAccumuloConfigImpl cfg, + org.apache.hadoop.conf.Configuration coreSite) { + cfg.setNumScanServers(0); + } + } + + @BeforeAll + public static void start() throws Exception { + ScanServerUpgradeITConfiguration c = new ScanServerUpgradeITConfiguration(); + SharedMiniClusterBase.startMiniClusterWithConfig(c); + } + + @AfterAll + public static void stop() throws Exception { + stopMiniCluster(); + } + + private Stream<Map.Entry<Key,Value>> getOldScanServerRefs(String tableName) { + try { + BatchScanner scanner = + getCluster().getServerContext().createBatchScanner(tableName, Authorizations.EMPTY); + scanner.setRanges(Upgrader11to12.OLD_SCAN_SERVERS_RANGES); + return scanner.stream().onClose(scanner::close); + } catch (TableNotFoundException e) { + throw new IllegalStateException("Unable to find table " + tableName); + } + } + + private void testMetadataScanServerRefRemoval(String tableName) { + + HostAndPort server = HostAndPort.fromParts("127.0.0.1", 1234); + UUID serverLockUUID = UUID.randomUUID(); + + Set<ScanServerRefTabletFile> scanRefs = Stream.of("F0000070.rf", "F0000071.rf", "F0000072.rf") + .map(f -> "hdfs://localhost:8020/accumulo/tables/2a/default_tablet/" + f) + .map(f -> new ScanServerRefTabletFile(serverLockUUID, server.toString(), f)) + .collect(Collectors.toSet()); + + ServerContext ctx = getCluster().getServerContext(); + try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + client.tableOperations().removeProperty(tableName, + Property.TABLE_CONSTRAINT_PREFIX.getKey() + "1"); + log.info("Removed constraints from table {}", tableName); + Thread.sleep(10_000); + } catch (AccumuloException | AccumuloSecurityException | InterruptedException e) { + throw new RuntimeException(e); + } + + try (BatchWriter writer = ctx.createBatchWriter(tableName)) { + for (ScanServerRefTabletFile ref : scanRefs) { + Mutation sservMutation = new Mutation("~sserv" + ref.getFilePath()); + sservMutation.put(ref.getServerAddress(), new Text(ref.getServerLockUUID().toString()), + new Value("")); + writer.addMutation(sservMutation); + + Mutation scanRefMutation = new Mutation("~scanref" + ref.getServerLockUUID().toString()); + scanRefMutation.put(ref.getServerAddress(), ref.getFilePath(), new Value("")); + writer.addMutation(scanRefMutation); + } + writer.flush(); + } catch (TableNotFoundException | MutationsRejectedException e) { + log.warn("Failed to write mutations to metadata table"); + throw new RuntimeException(e); + } + + // Check that ample cannot find these scan server refs + assertEquals(0, ctx.getAmple().scanServerRefs().list().count()); + + // Ensure they exist on the metadata table + assertEquals(scanRefs.size() * 2L, getOldScanServerRefs(tableName).count()); + + var upgrader = new Upgrader11to12(); + upgrader.removeScanServerRange(ctx, tableName); + + // Ensure entries are now removed from the metadata table + assertEquals(0, getOldScanServerRefs(tableName).count()); + } + + @Test + public void testScanRefTableCreation() { + ServerContext ctx = getCluster().getServerContext(); + // Remove the scan server table that was created as part of init + try { + ctx.getTableManager().removeTable(AccumuloTable.SCAN_REF.tableId()); + Thread.sleep(10_000); Review Comment: This is a really neat way to test this, removing the table. I think the call will only remove the table info in zookeeper. Could also remove stuff from the metadata table and dfs. For the metadata table, could use the following range ```java // range that covers all tablets of the scan ref table in the metadata table. var metaRange = new Range(AccumuloTable.SCAN_REF.tableId()+";", AccumuloTable.SCAN_REF.tableId()+"<"); ``` and do the following in the test * Scan `metaRange` in the metadata table at beginning of test, should see something. * Delete everything in `metaRange` in the metadata table. * After calling upgrade code, scan `metaRange` in the metadata table and should see something. For DFS would need to find the dir and nuke it, then verify something is created after calling the upgrade code. The table is probably hosted while all of this mayhem is happening, but that is probably ok. Will have some unhappy tservers. ########## server/base/src/main/java/org/apache/accumulo/server/init/FileSystemInitializer.java: ########## @@ -218,4 +193,38 @@ private void createEntriesForTablet(TreeMap<Key,Value> map, Tablet tablet) { private void addEntry(TreeMap<Key,Value> map, Text row, ColumnFQ col, Value value) { map.put(new Key(row, col.getColumnFamily(), col.getColumnQualifier(), 0), value); } + + public String createScanRefTablet(ServerContext context, VolumeManager fs) throws IOException { + // First set table properties in ZK + setTableProperties(context, AccumuloTable.SCAN_REF.tableId(), initConfig.getScanRefTableConf()); + + VolumeChooserEnvironment chooserEnv = + new VolumeChooserEnvironmentImpl(VolumeChooserEnvironment.Scope.INIT, + AccumuloTable.METADATA.tableId(), SPLIT_POINT, context); + + String tableMetadataTabletDirUri = + fs.choose(chooserEnv, context.getBaseUris()) + Constants.HDFS_TABLES_DIR + Path.SEPARATOR + + AccumuloTable.METADATA.tableId() + Path.SEPARATOR + TABLE_TABLETS_TABLET_DIR; + + // Create Default Tablet entry for ScanServerRefTable + chooserEnv = new VolumeChooserEnvironmentImpl(VolumeChooserEnvironment.Scope.INIT, + AccumuloTable.SCAN_REF.tableId(), null, context); + + String scanRefTableDefaultTabletDirName = + MetadataSchema.TabletsSection.ServerColumnFamily.DEFAULT_TABLET_DIR_NAME; + String scanRefTableDefaultTabletDirUri = + fs.choose(chooserEnv, context.getBaseUris()) + Constants.HDFS_TABLES_DIR + Path.SEPARATOR + + AccumuloTable.SCAN_REF.tableId() + Path.SEPARATOR + scanRefTableDefaultTabletDirName; + + createDirectories(fs, tableMetadataTabletDirUri, scanRefTableDefaultTabletDirUri); + + String ext = FileOperations.getNewFileExtension(DefaultConfiguration.getInstance()); + String metadataFileName = tableMetadataTabletDirUri + Path.SEPARATOR + "0.1." + ext; + + Tablet scanRefTablet = + new Tablet(AccumuloTable.SCAN_REF.tableId(), scanRefTableDefaultTabletDirName, null, null); + createMetadataFile(fs, metadataFileName, scanRefTablet); + + return metadataFileName; Review Comment: The upgrade code is not doing anything with the return type of this method. The init code will add a pointer to this file to another file the init code is creating. Can not really do that sort of file manipulation in the upgrade code, need to write to metadata table instead. Howerver, I think these changes can be used with a few small tweaks starting with the following change to what is returned. ```suggestion InitialTablet scanRefTablet = new InitialTablet(AccumuloTable.SCAN_REF.tableId(), scanRefTableDefaultTabletDirName, null, null); return scanRefTablet; ``` With the above change in return type, hopefully could also do something like the following. The goal being to be able to create key/values for an initial file in init code and a mutation for a metadata table update in upgrade code. * Rename `FileSystemInitializer.Tablet` to `FileSystemInitializer.InitialTablet` and make it public. The rename is to make it more unique since it will be visible outside the class. * Create a method `TreeMap<Key,Value> createEntries()` in `InitialTablet` moving and refacotring the existing `FileSystemInitializer.createEntriesForTablet` into this new method. * Create a method `Mutation createMutation()` in `InitialTablet` that calls `createEntries()` and converts to a mutation. * In the upgrade code use the returned `InitialTablet` to create a mutation and insert that into the metadata table. * In the init code use the returned `InitialTablet` to create key values entries in a file used for initial metadata. The init code is already doing this, would just be a refactoring. Would be moving this file creation outside of the `createScanRefTablet()` method. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
