keith-turner commented on code in PR #5416: URL: https://github.com/apache/accumulo/pull/5416#discussion_r2017750656
########## server/manager/src/main/java/org/apache/accumulo/manager/Manager.java: ########## @@ -568,10 +573,13 @@ private class MigrationCleanupThread implements Runnable { @Override public void run() { while (stillManager()) { - if (!migrations.isEmpty()) { + // migrations are stored in the metadata tables which cannot be read until the + // TabletGroupWatchers are started + if (watchersStarted.get() && numMigrations() > 0) { Review Comment: Instead of having this watcherStarted boolean, could instead move where the cleanup thread is started to the point where you set this to true. ########## server/manager/src/main/java/org/apache/accumulo/manager/Manager.java: ########## @@ -568,10 +573,13 @@ private class MigrationCleanupThread implements Runnable { @Override public void run() { while (stillManager()) { - if (!migrations.isEmpty()) { + // migrations are stored in the metadata tables which cannot be read until the + // TabletGroupWatchers are started + if (watchersStarted.get() && numMigrations() > 0) { try { cleanupOfflineMigrations(); Review Comment: It would probably be more efficient to collapse this function into cleanupDeletedMigrations() and scan the metadata table once. ```java for (var tabletMetadata : tabletsMetadata) { if(tabletMetadata.getMigration() != null && (!onlineTabletServers().contains(tabletMetadata.getMigration()) || OFFLINE = getTableState(tm.getExtent()))) { tabletsMutator.mutateTablet(tabletMetadata.getExtent()).requireAbsentOperation().requireMigration(tabletMetadata.getMigration()).deleteMigration().submit(tm->false); } } ``` ########## server/manager/src/main/java/org/apache/accumulo/manager/Manager.java: ########## @@ -984,23 +1031,23 @@ private long balanceTablets() { continue; } migrationsOutForLevel++; - migrations.put(ke, TabletServerIdImpl.toThrift(m.getNewTabletServer())); + getContext().getAmple().mutateTablet(ke) Review Comment: This will do a single write RPC and wait for it. Would be better to to create a tablet mutor for once and use it for the entire loop. This will allow much more efficient steaming of data to the write ahead logs on the tablet servers, instead of making a single RPC that does a single write ahead log write on the tablet server and waiting for it. ########## server/manager/src/main/java/org/apache/accumulo/manager/Manager.java: ########## @@ -586,14 +594,9 @@ public void run() { * the metadata table and remove any migrating tablets that no longer exist. */ private void cleanupNonexistentMigrations(final ClientContext clientContext) { + Map<DataLevel,Set<KeyExtent>> notSeen = partitionMigrations(); Review Comment: This entire function can probably be removed. The set of migrations in memory could get out of sync with the metadata table. With migrations stored in the metadata table this should not happen if the following conditions are met. 1. Migrations are added using a conditional mutation that checks tablet exists 2. Splits and merge operation on tablet remove migrations. 3. Deleting a table delete all of its metadata including migrations. This should happen w/o any changes in this PR. These conditions are being met by the changes in this PR AFAICT. ########## server/manager/src/main/java/org/apache/accumulo/manager/Manager.java: ########## @@ -629,6 +635,30 @@ private void cleanupOfflineMigrations() { } } } + + /** + * Remove any migrations to any of the deleted TServers + */ + private void cleanupDeletedMigrations() { + synchronized (deletedTServers) { + var iter = deletedTServers.iterator(); + var ample = getContext().getAmple(); + while (iter.hasNext()) { + var deletedTServer = iter.next(); + for (DataLevel dl : DataLevel.values()) { + // prev row needed for the extent + try (var tabletsMetadata = ample.readTablets().forLevel(dl) + .fetch(TabletMetadata.ColumnType.PREV_ROW, TabletMetadata.ColumnType.MIGRATION) + .build()) { + for (var tabletMetadata : tabletsMetadata) { + conditionallyDeleteMigration(tabletMetadata.getExtent(), deletedTServer); + } + } + } + iter.remove(); + } + } + } Review Comment: Tracking these deleted tservers is probably not needed, can use the set of online tablet servers instead. Also would be better to create a single tablet mutator and use it for all tablets instead of creating one per tablet. This allows streaming data from the metadata table to tablet server write ahead logs. ```suggestion var ample = getContext().getAmple(); for (DataLevel dl : DataLevel.values()) { // prev row needed for the extent try (var tabletsMetadata = ample.readTablets().forLevel(dl) .fetch(TabletMetadata.ColumnType.PREV_ROW, TabletMetadata.ColumnType.MIGRATION) .build(); var tabletsMutator = ample.conditionallyMutateTablets(result->{})) { for (var tabletMetadata : tabletsMetadata) { if(tabletMetadata.getMigration() != null && !onlineTabletServers().contains(tabletMetadata.getMigration())) { tabletsMutator.mutateTablet(tabletMetadata.getExtent()).requireAbsentOperation().requireMigration(tabletMetadata.getMigration()).deleteMigration().submit(tm->false); } } } } ``` ########## server/manager/src/main/java/org/apache/accumulo/manager/Manager.java: ########## @@ -984,23 +1031,23 @@ private long balanceTablets() { continue; } migrationsOutForLevel++; - migrations.put(ke, TabletServerIdImpl.toThrift(m.getNewTabletServer())); + getContext().getAmple().mutateTablet(ke) + .putMigration(TabletServerIdImpl.toThrift(m.getNewTabletServer())).mutate(); log.debug("migration {}", m); } totalMigrationsOut += migrationsOutForLevel; // increment this at end of loop to signal complete run w/o any continue levelsCompleted++; } - balancerMetrics.assignMigratingCount(migrations::size); + balancerMetrics.assignMigratingCount(Manager.this::numMigrations); Review Comment: This will scan the metadata table again. It was scanned earlier, wonder if we can use the map and a count of new migrations we added here to get the new count. Not sure if that will work, may be something to look into. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@accumulo.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org