keith-turner commented on code in PR #5028:
URL: https://github.com/apache/accumulo/pull/5028#discussion_r1835761174
##########
core/src/main/java/org/apache/accumulo/core/Constants.java:
##########
@@ -91,6 +91,8 @@ public class Constants {
public static final String ZTABLE_LOCKS = "/table_locks";
public static final String ZMINI_LOCK = "/mini";
+ public static final String ZADMIN_LOCK = "/admin/lock";
+ public static final String ZTEST_LOCK = "/test/lock";
Review Comment:
Would it be possible to drop this constant and have the test code create
locks under the admin path when needed?
##########
test/src/main/java/org/apache/accumulo/test/fate/FateOpsCommandsIT.java:
##########
@@ -796,4 +870,20 @@ private boolean wordIsTStatus(String word) {
}
return true;
}
+
+ /**
+ * Stop the MANAGER. For some of our tests, we want to be able to seed
transactions with our own
+ * test repos. We want our fate to reserve these transactions (and not the
real fates running in
+ * the Manager as that will lead to exceptions since the real fates wouldn't
be able to handle our
+ * test repos). So, we essentially have the fates created here acting as the
real fates: they have
+ * the same threads running that the real fates would, use a fate store with
a ZK lock, use the
+ * same locations to store fate data that the Manager does, and are running
in a separate process
+ * from the Admin process. Note that we cannot simply use different
locations for our fate data
+ * from Manager to keep our test env separate from Manager. Admin uses the
real fate data
+ * locations, so our test must also use the real locations.
+ */
+ protected void stopManager() throws InterruptedException, IOException {
+ getCluster().getClusterControl().stopAllServers(ServerType.MANAGER);
+ Thread.sleep(20_000);
Review Comment:
Why sleep here?
##########
server/base/src/main/java/org/apache/accumulo/server/util/Admin.java:
##########
@@ -910,50 +939,111 @@ private void executeFateOpsCommand(ServerContext
context, FateOpsCommand fateOps
AdminUtil<Admin> admin = new AdminUtil<>(true);
final String zkRoot = context.getZooKeeperRoot();
- var zLockManagerPath = context.getServerPaths().createManagerPath();
var zTableLocksPath = context.getServerPaths().createTableLocksPath();
String fateZkPath = zkRoot + Constants.ZFATE;
ZooReaderWriter zk = context.getZooReaderWriter();
- MetaFateStore<Admin> mfs = new MetaFateStore<>(fateZkPath, zk,
createDummyLockID(), null);
- UserFateStore<Admin> ufs = new UserFateStore<>(context,
createDummyLockID(), null);
- Map<FateInstanceType,FateStore<Admin>> fateStores =
- Map.of(FateInstanceType.META, mfs, FateInstanceType.USER, ufs);
- Map<FateInstanceType,ReadOnlyFateStore<Admin>> readOnlyFateStores =
- Map.of(FateInstanceType.META, mfs, FateInstanceType.USER, ufs);
-
- if (fateOpsCommand.cancel) {
- cancelSubmittedFateTxs(context, fateOpsCommand.fateIdList);
- } else if (fateOpsCommand.fail) {
- for (String fateIdStr : fateOpsCommand.fateIdList) {
- if (!admin.prepFail(fateStores, zk, zLockManagerPath, fateIdStr)) {
- throw new AccumuloException("Could not fail transaction: " +
fateIdStr);
+ ServiceLock adminLock = null;
+ Map<FateInstanceType,FateStore<Admin>> fateStores;
+ Map<FateInstanceType,ReadOnlyFateStore<Admin>> readOnlyFateStores = null;
+
+ try {
+ if (fateOpsCommand.cancel) {
+ cancelSubmittedFateTxs(context, fateOpsCommand.fateIdList);
+ } else if (fateOpsCommand.fail) {
+ adminLock = createAdminLock(context);
+ fateStores = createFateStores(context, zk, fateZkPath, adminLock);
+ for (String fateIdStr : fateOpsCommand.fateIdList) {
+ if (!admin.prepFail(fateStores, fateIdStr)) {
+ throw new AccumuloException("Could not fail transaction: " +
fateIdStr);
+ }
}
+ } else if (fateOpsCommand.delete) {
+ adminLock = createAdminLock(context);
+ fateStores = createFateStores(context, zk, fateZkPath, adminLock);
+ for (String fateIdStr : fateOpsCommand.fateIdList) {
+ if (!admin.prepDelete(fateStores, fateIdStr)) {
+ throw new AccumuloException("Could not delete transaction: " +
fateIdStr);
+ }
+ admin.deleteLocks(zk, zTableLocksPath, fateIdStr);
+ }
+ }
+
+ if (fateOpsCommand.print) {
+ final Set<FateId> fateIdFilter = new TreeSet<>();
+ fateOpsCommand.fateIdList.forEach(fateIdStr ->
fateIdFilter.add(FateId.from(fateIdStr)));
+ EnumSet<ReadOnlyFateStore.TStatus> statusFilter =
+ getCmdLineStatusFilters(fateOpsCommand.states);
+ EnumSet<FateInstanceType> typesFilter =
+ getCmdLineInstanceTypeFilters(fateOpsCommand.instanceTypes);
+ readOnlyFateStores = createReadOnlyFateStores(context, zk, fateZkPath);
+ admin.print(readOnlyFateStores, zk, zTableLocksPath, new
Formatter(System.out),
+ fateIdFilter, statusFilter, typesFilter);
+ // print line break at the end
+ System.out.println();
}
- } else if (fateOpsCommand.delete) {
- for (String fateIdStr : fateOpsCommand.fateIdList) {
- if (!admin.prepDelete(fateStores, zk, zLockManagerPath, fateIdStr)) {
- throw new AccumuloException("Could not delete transaction: " +
fateIdStr);
+
+ if (fateOpsCommand.summarize) {
+ if (readOnlyFateStores == null) {
+ readOnlyFateStores = createReadOnlyFateStores(context, zk,
fateZkPath);
}
- admin.deleteLocks(zk, zTableLocksPath, fateIdStr);
+ summarizeFateTx(context, fateOpsCommand, admin, readOnlyFateStores,
zTableLocksPath);
+ }
+ } finally {
+ if (adminLock != null) {
+ adminLock.unlock();
}
}
+ }
- if (fateOpsCommand.print) {
- final Set<FateId> fateIdFilter = new TreeSet<>();
- fateOpsCommand.fateIdList.forEach(fateIdStr ->
fateIdFilter.add(FateId.from(fateIdStr)));
- EnumSet<ReadOnlyFateStore.TStatus> statusFilter =
- getCmdLineStatusFilters(fateOpsCommand.states);
- EnumSet<FateInstanceType> typesFilter =
- getCmdLineInstanceTypeFilters(fateOpsCommand.instanceTypes);
- admin.print(readOnlyFateStores, zk, zTableLocksPath, new
Formatter(System.out), fateIdFilter,
- statusFilter, typesFilter);
- // print line break at the end
- System.out.println();
- }
+ private Map<FateInstanceType,FateStore<Admin>>
createFateStores(ServerContext context,
+ ZooReaderWriter zk, String fateZkPath, ServiceLock adminLock)
+ throws InterruptedException, KeeperException {
+ var lockId = adminLock.getLockID();
+ MetaFateStore<Admin> mfs = new MetaFateStore<>(fateZkPath, zk, lockId,
null);
+ UserFateStore<Admin> ufs =
+ new UserFateStore<>(context, AccumuloTable.FATE.tableName(), lockId,
null);
+ return Map.of(FateInstanceType.META, mfs, FateInstanceType.USER, ufs);
+ }
- if (fateOpsCommand.summarize) {
- summarizeFateTx(context, fateOpsCommand, admin, readOnlyFateStores,
zTableLocksPath);
+ private Map<FateInstanceType,ReadOnlyFateStore<Admin>>
+ createReadOnlyFateStores(ServerContext context, ZooReaderWriter zk,
String fateZkPath)
+ throws InterruptedException, KeeperException {
+ MetaFateStore<Admin> readOnlyMFS = new MetaFateStore<>(fateZkPath, zk,
null, null);
+ UserFateStore<Admin> readOnlyUFS =
+ new UserFateStore<>(context, AccumuloTable.FATE.tableName(), null,
null);
+ return Map.of(FateInstanceType.META, readOnlyMFS, FateInstanceType.USER,
readOnlyUFS);
+ }
+
+ private ServiceLock createAdminLock(ServerContext context) throws
InterruptedException {
+ var zk = context.getZooReaderWriter().getZooKeeper();
+ UUID uuid = UUID.randomUUID();
+ ServiceLockPath slp = context.getServerPaths().createAdminLockPath();
+ ServiceLock adminLock = new
ServiceLock(context.getZooReaderWriter().getZooKeeper(), slp, uuid);
+ AdminLockWatcher lw = new AdminLockWatcher();
+ ServiceLockData.ServiceDescriptors descriptors = new
ServiceLockData.ServiceDescriptors();
+ descriptors.addService(new ServiceLockData.ServiceDescriptor(uuid,
+ ServiceLockData.ThriftService.NONE, "localhost",
Constants.DEFAULT_RESOURCE_GROUP_NAME));
+ ServiceLockData sld = new ServiceLockData(descriptors);
+ String lockPath = slp.toString();
+ String parentLockPath = lockPath.substring(0, lockPath.indexOf("/lock"));
Review Comment:
It would be nice to avoid this literal `"/lock"` if possible. Maybe could
use `lockPath.split("/")` and work down the nodes in that.
##########
server/base/src/main/java/org/apache/accumulo/server/util/Admin.java:
##########
@@ -368,6 +374,29 @@ static class FateOpsCommand {
List<String> instanceTypes = new ArrayList<>();
}
+ class AdminLockWatcher implements ServiceLock.AccumuloLockWatcher {
+ @Override
+ public void lostLock(ServiceLock.LockLossReason reason) {
+ log.warn("Lost lock: " + reason.toString());
+ }
+
+ @Override
+ public void unableToMonitorLockNode(Exception e) {
+ log.warn("Unable to monitor lock: " + e.getMessage());
Review Comment:
Could halt the process here too.
##########
test/src/main/java/org/apache/accumulo/test/fate/MultipleStoresIT.java:
##########
@@ -341,8 +341,8 @@ private void
testDeadReservationsCleanup(TestStoreFactory<LatchTestEnv> testStor
// Create the new Fate/start the Fate threads (the work finder and the
workers).
// Don't run another dead reservation cleaner since we already have one
running from fate1.
- FastFate<LatchTestEnv> fate2 = new FastFate<>(testEnv2, store2, false,
Object::toString,
- DefaultConfiguration.getInstance());
+ Fate<LatchTestEnv> fate2 =
Review Comment:
Was this code needlessly using FastFate before? Its not running the dead
reservation cleaner, so it does not need to use FastFate?
##########
server/base/src/main/java/org/apache/accumulo/server/util/Admin.java:
##########
@@ -910,50 +939,111 @@ private void executeFateOpsCommand(ServerContext
context, FateOpsCommand fateOps
AdminUtil<Admin> admin = new AdminUtil<>(true);
final String zkRoot = context.getZooKeeperRoot();
- var zLockManagerPath = context.getServerPaths().createManagerPath();
var zTableLocksPath = context.getServerPaths().createTableLocksPath();
String fateZkPath = zkRoot + Constants.ZFATE;
ZooReaderWriter zk = context.getZooReaderWriter();
- MetaFateStore<Admin> mfs = new MetaFateStore<>(fateZkPath, zk,
createDummyLockID(), null);
- UserFateStore<Admin> ufs = new UserFateStore<>(context,
createDummyLockID(), null);
- Map<FateInstanceType,FateStore<Admin>> fateStores =
- Map.of(FateInstanceType.META, mfs, FateInstanceType.USER, ufs);
- Map<FateInstanceType,ReadOnlyFateStore<Admin>> readOnlyFateStores =
- Map.of(FateInstanceType.META, mfs, FateInstanceType.USER, ufs);
-
- if (fateOpsCommand.cancel) {
- cancelSubmittedFateTxs(context, fateOpsCommand.fateIdList);
- } else if (fateOpsCommand.fail) {
- for (String fateIdStr : fateOpsCommand.fateIdList) {
- if (!admin.prepFail(fateStores, zk, zLockManagerPath, fateIdStr)) {
- throw new AccumuloException("Could not fail transaction: " +
fateIdStr);
+ ServiceLock adminLock = null;
+ Map<FateInstanceType,FateStore<Admin>> fateStores;
+ Map<FateInstanceType,ReadOnlyFateStore<Admin>> readOnlyFateStores = null;
+
+ try {
+ if (fateOpsCommand.cancel) {
+ cancelSubmittedFateTxs(context, fateOpsCommand.fateIdList);
+ } else if (fateOpsCommand.fail) {
+ adminLock = createAdminLock(context);
+ fateStores = createFateStores(context, zk, fateZkPath, adminLock);
+ for (String fateIdStr : fateOpsCommand.fateIdList) {
+ if (!admin.prepFail(fateStores, fateIdStr)) {
+ throw new AccumuloException("Could not fail transaction: " +
fateIdStr);
+ }
}
+ } else if (fateOpsCommand.delete) {
+ adminLock = createAdminLock(context);
+ fateStores = createFateStores(context, zk, fateZkPath, adminLock);
+ for (String fateIdStr : fateOpsCommand.fateIdList) {
+ if (!admin.prepDelete(fateStores, fateIdStr)) {
+ throw new AccumuloException("Could not delete transaction: " +
fateIdStr);
+ }
+ admin.deleteLocks(zk, zTableLocksPath, fateIdStr);
+ }
+ }
+
+ if (fateOpsCommand.print) {
+ final Set<FateId> fateIdFilter = new TreeSet<>();
+ fateOpsCommand.fateIdList.forEach(fateIdStr ->
fateIdFilter.add(FateId.from(fateIdStr)));
+ EnumSet<ReadOnlyFateStore.TStatus> statusFilter =
+ getCmdLineStatusFilters(fateOpsCommand.states);
+ EnumSet<FateInstanceType> typesFilter =
+ getCmdLineInstanceTypeFilters(fateOpsCommand.instanceTypes);
+ readOnlyFateStores = createReadOnlyFateStores(context, zk, fateZkPath);
+ admin.print(readOnlyFateStores, zk, zTableLocksPath, new
Formatter(System.out),
+ fateIdFilter, statusFilter, typesFilter);
+ // print line break at the end
+ System.out.println();
}
- } else if (fateOpsCommand.delete) {
- for (String fateIdStr : fateOpsCommand.fateIdList) {
- if (!admin.prepDelete(fateStores, zk, zLockManagerPath, fateIdStr)) {
- throw new AccumuloException("Could not delete transaction: " +
fateIdStr);
+
+ if (fateOpsCommand.summarize) {
+ if (readOnlyFateStores == null) {
+ readOnlyFateStores = createReadOnlyFateStores(context, zk,
fateZkPath);
}
- admin.deleteLocks(zk, zTableLocksPath, fateIdStr);
+ summarizeFateTx(context, fateOpsCommand, admin, readOnlyFateStores,
zTableLocksPath);
+ }
+ } finally {
+ if (adminLock != null) {
+ adminLock.unlock();
}
}
+ }
- if (fateOpsCommand.print) {
- final Set<FateId> fateIdFilter = new TreeSet<>();
- fateOpsCommand.fateIdList.forEach(fateIdStr ->
fateIdFilter.add(FateId.from(fateIdStr)));
- EnumSet<ReadOnlyFateStore.TStatus> statusFilter =
- getCmdLineStatusFilters(fateOpsCommand.states);
- EnumSet<FateInstanceType> typesFilter =
- getCmdLineInstanceTypeFilters(fateOpsCommand.instanceTypes);
- admin.print(readOnlyFateStores, zk, zTableLocksPath, new
Formatter(System.out), fateIdFilter,
- statusFilter, typesFilter);
- // print line break at the end
- System.out.println();
- }
+ private Map<FateInstanceType,FateStore<Admin>>
createFateStores(ServerContext context,
+ ZooReaderWriter zk, String fateZkPath, ServiceLock adminLock)
+ throws InterruptedException, KeeperException {
+ var lockId = adminLock.getLockID();
+ MetaFateStore<Admin> mfs = new MetaFateStore<>(fateZkPath, zk, lockId,
null);
+ UserFateStore<Admin> ufs =
+ new UserFateStore<>(context, AccumuloTable.FATE.tableName(), lockId,
null);
+ return Map.of(FateInstanceType.META, mfs, FateInstanceType.USER, ufs);
+ }
- if (fateOpsCommand.summarize) {
- summarizeFateTx(context, fateOpsCommand, admin, readOnlyFateStores,
zTableLocksPath);
+ private Map<FateInstanceType,ReadOnlyFateStore<Admin>>
+ createReadOnlyFateStores(ServerContext context, ZooReaderWriter zk,
String fateZkPath)
+ throws InterruptedException, KeeperException {
+ MetaFateStore<Admin> readOnlyMFS = new MetaFateStore<>(fateZkPath, zk,
null, null);
+ UserFateStore<Admin> readOnlyUFS =
+ new UserFateStore<>(context, AccumuloTable.FATE.tableName(), null,
null);
+ return Map.of(FateInstanceType.META, readOnlyMFS, FateInstanceType.USER,
readOnlyUFS);
+ }
+
+ private ServiceLock createAdminLock(ServerContext context) throws
InterruptedException {
+ var zk = context.getZooReaderWriter().getZooKeeper();
+ UUID uuid = UUID.randomUUID();
+ ServiceLockPath slp = context.getServerPaths().createAdminLockPath();
+ ServiceLock adminLock = new
ServiceLock(context.getZooReaderWriter().getZooKeeper(), slp, uuid);
+ AdminLockWatcher lw = new AdminLockWatcher();
+ ServiceLockData.ServiceDescriptors descriptors = new
ServiceLockData.ServiceDescriptors();
+ descriptors.addService(new ServiceLockData.ServiceDescriptor(uuid,
+ ServiceLockData.ThriftService.NONE, "localhost",
Constants.DEFAULT_RESOURCE_GROUP_NAME));
+ ServiceLockData sld = new ServiceLockData(descriptors);
+ String lockPath = slp.toString();
+ String parentLockPath = lockPath.substring(0, lockPath.indexOf("/lock"));
+
+ try {
+ if (zk.exists(parentLockPath, false) == null) {
+ zk.create(parentLockPath, new byte[0], ZooUtil.PUBLIC,
CreateMode.PERSISTENT);
+ log.info("Created: {}", parentLockPath);
+ }
+ if (zk.exists(lockPath, false) == null) {
+ zk.create(lockPath, new byte[0], ZooUtil.PUBLIC,
CreateMode.PERSISTENT);
+ log.info("Created: {}", lockPath);
Review Comment:
```suggestion
log.info("Created: {} in zookeeper", lockPath);
```
##########
server/base/src/main/java/org/apache/accumulo/server/util/Admin.java:
##########
@@ -368,6 +374,29 @@ static class FateOpsCommand {
List<String> instanceTypes = new ArrayList<>();
}
+ class AdminLockWatcher implements ServiceLock.AccumuloLockWatcher {
+ @Override
+ public void lostLock(ServiceLock.LockLossReason reason) {
+ log.warn("Lost lock: " + reason.toString());
Review Comment:
May want to halt the process after the warning. If we are no longer holding
the lock, then any reservations may no longer be valid. Could use
org.apache.accumulo.core.util.Halt.
##########
core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java:
##########
@@ -504,39 +502,75 @@ public boolean
prepFail(Map<FateInstanceType,FateStore<T>> stores, ZooReaderWrit
// determine which store to use
FateStore<T> store = stores.get(fateId.getType());
- FateTxStore<T> txStore = store.reserve(fateId);
- try {
- TStatus ts = txStore.getStatus();
- switch (ts) {
- case UNKNOWN:
- System.out.println("Invalid fate ID: " + fateId);
- break;
-
- case SUBMITTED:
- case IN_PROGRESS:
- case NEW:
- System.out.printf("Failing transaction: %s (%s)%n", fateId, ts);
- txStore.setStatus(TStatus.FAILED_IN_PROGRESS);
- state = true;
- break;
-
- case SUCCESSFUL:
- System.out.printf("Transaction already completed: %s (%s)%n",
fateId, ts);
- break;
-
- case FAILED:
- case FAILED_IN_PROGRESS:
- System.out.printf("Transaction already failed: %s (%s)%n", fateId,
ts);
- state = true;
- break;
+ Optional<FateTxStore<T>> opTxStore = tryReserve(store, fateId, "fail");
+ if (opTxStore.isPresent()) {
+ var txStore = opTxStore.orElseThrow();
+
+ try {
+ TStatus ts = txStore.getStatus();
+ switch (ts) {
+ case UNKNOWN:
+ System.out.println("Invalid fate ID: " + fateId);
+ break;
+
+ case SUBMITTED:
+ case IN_PROGRESS:
+ case NEW:
+ System.out.printf("Failing transaction: %s (%s)%n", fateId, ts);
+ txStore.setStatus(TStatus.FAILED_IN_PROGRESS);
+ state = true;
+ break;
+
+ case SUCCESSFUL:
+ System.out.printf("Transaction already completed: %s (%s)%n",
fateId, ts);
+ break;
+
+ case FAILED:
+ case FAILED_IN_PROGRESS:
+ System.out.printf("Transaction already failed: %s (%s)%n", fateId,
ts);
+ state = true;
+ break;
+ }
+ } finally {
+ txStore.unreserve(Duration.ZERO);
}
- } finally {
- txStore.unreserve(Duration.ZERO);
}
return state;
}
+ /**
+ * Try to reserve the transaction for a minute. If it could not be reserved,
return an empty
+ * optional
+ */
+ private Optional<FateTxStore<T>> tryReserve(FateStore<T> store, FateId
fateId, String op) {
+ var retry = Retry.builder().maxRetriesWithinDuration(Duration.ofMinutes(1))
+ .retryAfter(Duration.ofMillis(25)).incrementBy(Duration.ofMillis(25))
+
.maxWait(Duration.ofSeconds(15)).backOffFactor(1.5).logInterval(Duration.ofSeconds(15))
+ .createRetry();
+
+ Optional<FateTxStore<T>> reserveAttempt = store.tryReserve(fateId);
+ while (reserveAttempt.isEmpty() && retry.canRetry()) {
+ retry.useRetry();
+ try {
+ retry.waitForNextAttempt(log, "Attempting to reserve " + fateId);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IllegalArgumentException(e);
+ }
+ reserveAttempt = store.tryReserve(fateId);
+ }
+ if (reserveAttempt.isPresent()) {
+ retry.logCompletion(log, "Attempting to reserve " + fateId);
+ } else {
+ log.error("Could not {} {} in a reasonable time. This indicates the
Manager is currently "
+ + "working on {}. If {} {} is still desired, the Manager needs to be
stopped and "
+ + "the command needs to be rerun.", op, fateId, fateId, op, fateId);
Review Comment:
This is a nice error message. Could make it repeat info a bit less.
```suggestion
log.error("Could not {} {} in a reasonable time. This indicates the
Manager is currently "
+ "working on it. The manager may need to be stopped to complete
this command.", op, fateId);
```
##########
server/base/src/main/java/org/apache/accumulo/server/util/Admin.java:
##########
@@ -340,11 +346,11 @@ static class FateOpsCommand {
boolean cancel;
@Parameter(names = {"-f", "--fail"},
- description = "<FateId>... Transition FaTE transaction status to
FAILED_IN_PROGRESS (requires Manager to be down)")
+ description = "<FateId>... Transition FaTE transaction status to
FAILED_IN_PROGRESS")
boolean fail;
@Parameter(names = {"-d", "--delete"},
- description = "<FateId>... Delete locks associated with transactions
(Requires Manager to be down)")
+ description = "<FateId>... Delete locks associated with transactions")
Review Comment:
This help message is not quite correct. This was a preexisting problem
though.
```suggestion
description = "<FateId>... Delete fate transaction and its
associated table locks")
```
##########
server/base/src/main/java/org/apache/accumulo/server/util/Admin.java:
##########
@@ -910,50 +939,111 @@ private void executeFateOpsCommand(ServerContext
context, FateOpsCommand fateOps
AdminUtil<Admin> admin = new AdminUtil<>(true);
final String zkRoot = context.getZooKeeperRoot();
- var zLockManagerPath = context.getServerPaths().createManagerPath();
var zTableLocksPath = context.getServerPaths().createTableLocksPath();
String fateZkPath = zkRoot + Constants.ZFATE;
ZooReaderWriter zk = context.getZooReaderWriter();
- MetaFateStore<Admin> mfs = new MetaFateStore<>(fateZkPath, zk,
createDummyLockID(), null);
- UserFateStore<Admin> ufs = new UserFateStore<>(context,
createDummyLockID(), null);
- Map<FateInstanceType,FateStore<Admin>> fateStores =
- Map.of(FateInstanceType.META, mfs, FateInstanceType.USER, ufs);
- Map<FateInstanceType,ReadOnlyFateStore<Admin>> readOnlyFateStores =
- Map.of(FateInstanceType.META, mfs, FateInstanceType.USER, ufs);
-
- if (fateOpsCommand.cancel) {
- cancelSubmittedFateTxs(context, fateOpsCommand.fateIdList);
- } else if (fateOpsCommand.fail) {
- for (String fateIdStr : fateOpsCommand.fateIdList) {
- if (!admin.prepFail(fateStores, zk, zLockManagerPath, fateIdStr)) {
- throw new AccumuloException("Could not fail transaction: " +
fateIdStr);
+ ServiceLock adminLock = null;
+ Map<FateInstanceType,FateStore<Admin>> fateStores;
+ Map<FateInstanceType,ReadOnlyFateStore<Admin>> readOnlyFateStores = null;
+
+ try {
+ if (fateOpsCommand.cancel) {
+ cancelSubmittedFateTxs(context, fateOpsCommand.fateIdList);
+ } else if (fateOpsCommand.fail) {
+ adminLock = createAdminLock(context);
+ fateStores = createFateStores(context, zk, fateZkPath, adminLock);
+ for (String fateIdStr : fateOpsCommand.fateIdList) {
+ if (!admin.prepFail(fateStores, fateIdStr)) {
+ throw new AccumuloException("Could not fail transaction: " +
fateIdStr);
+ }
}
+ } else if (fateOpsCommand.delete) {
+ adminLock = createAdminLock(context);
+ fateStores = createFateStores(context, zk, fateZkPath, adminLock);
+ for (String fateIdStr : fateOpsCommand.fateIdList) {
+ if (!admin.prepDelete(fateStores, fateIdStr)) {
+ throw new AccumuloException("Could not delete transaction: " +
fateIdStr);
+ }
+ admin.deleteLocks(zk, zTableLocksPath, fateIdStr);
+ }
+ }
+
+ if (fateOpsCommand.print) {
+ final Set<FateId> fateIdFilter = new TreeSet<>();
+ fateOpsCommand.fateIdList.forEach(fateIdStr ->
fateIdFilter.add(FateId.from(fateIdStr)));
+ EnumSet<ReadOnlyFateStore.TStatus> statusFilter =
+ getCmdLineStatusFilters(fateOpsCommand.states);
+ EnumSet<FateInstanceType> typesFilter =
+ getCmdLineInstanceTypeFilters(fateOpsCommand.instanceTypes);
+ readOnlyFateStores = createReadOnlyFateStores(context, zk, fateZkPath);
+ admin.print(readOnlyFateStores, zk, zTableLocksPath, new
Formatter(System.out),
+ fateIdFilter, statusFilter, typesFilter);
+ // print line break at the end
+ System.out.println();
}
- } else if (fateOpsCommand.delete) {
- for (String fateIdStr : fateOpsCommand.fateIdList) {
- if (!admin.prepDelete(fateStores, zk, zLockManagerPath, fateIdStr)) {
- throw new AccumuloException("Could not delete transaction: " +
fateIdStr);
+
+ if (fateOpsCommand.summarize) {
+ if (readOnlyFateStores == null) {
+ readOnlyFateStores = createReadOnlyFateStores(context, zk,
fateZkPath);
}
- admin.deleteLocks(zk, zTableLocksPath, fateIdStr);
+ summarizeFateTx(context, fateOpsCommand, admin, readOnlyFateStores,
zTableLocksPath);
+ }
+ } finally {
+ if (adminLock != null) {
+ adminLock.unlock();
}
}
+ }
- if (fateOpsCommand.print) {
- final Set<FateId> fateIdFilter = new TreeSet<>();
- fateOpsCommand.fateIdList.forEach(fateIdStr ->
fateIdFilter.add(FateId.from(fateIdStr)));
- EnumSet<ReadOnlyFateStore.TStatus> statusFilter =
- getCmdLineStatusFilters(fateOpsCommand.states);
- EnumSet<FateInstanceType> typesFilter =
- getCmdLineInstanceTypeFilters(fateOpsCommand.instanceTypes);
- admin.print(readOnlyFateStores, zk, zTableLocksPath, new
Formatter(System.out), fateIdFilter,
- statusFilter, typesFilter);
- // print line break at the end
- System.out.println();
- }
+ private Map<FateInstanceType,FateStore<Admin>>
createFateStores(ServerContext context,
+ ZooReaderWriter zk, String fateZkPath, ServiceLock adminLock)
+ throws InterruptedException, KeeperException {
+ var lockId = adminLock.getLockID();
+ MetaFateStore<Admin> mfs = new MetaFateStore<>(fateZkPath, zk, lockId,
null);
+ UserFateStore<Admin> ufs =
+ new UserFateStore<>(context, AccumuloTable.FATE.tableName(), lockId,
null);
+ return Map.of(FateInstanceType.META, mfs, FateInstanceType.USER, ufs);
+ }
- if (fateOpsCommand.summarize) {
- summarizeFateTx(context, fateOpsCommand, admin, readOnlyFateStores,
zTableLocksPath);
+ private Map<FateInstanceType,ReadOnlyFateStore<Admin>>
+ createReadOnlyFateStores(ServerContext context, ZooReaderWriter zk,
String fateZkPath)
+ throws InterruptedException, KeeperException {
+ MetaFateStore<Admin> readOnlyMFS = new MetaFateStore<>(fateZkPath, zk,
null, null);
+ UserFateStore<Admin> readOnlyUFS =
+ new UserFateStore<>(context, AccumuloTable.FATE.tableName(), null,
null);
+ return Map.of(FateInstanceType.META, readOnlyMFS, FateInstanceType.USER,
readOnlyUFS);
+ }
+
+ private ServiceLock createAdminLock(ServerContext context) throws
InterruptedException {
+ var zk = context.getZooReaderWriter().getZooKeeper();
+ UUID uuid = UUID.randomUUID();
+ ServiceLockPath slp = context.getServerPaths().createAdminLockPath();
+ ServiceLock adminLock = new
ServiceLock(context.getZooReaderWriter().getZooKeeper(), slp, uuid);
+ AdminLockWatcher lw = new AdminLockWatcher();
+ ServiceLockData.ServiceDescriptors descriptors = new
ServiceLockData.ServiceDescriptors();
+ descriptors.addService(new ServiceLockData.ServiceDescriptor(uuid,
+ ServiceLockData.ThriftService.NONE, "localhost",
Constants.DEFAULT_RESOURCE_GROUP_NAME));
Review Comment:
Could use something that is more distinct that localhost like
`fake_admin_util_host`.
##########
core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java:
##########
@@ -275,6 +269,10 @@ protected void verifyFateKey(FateId fateId,
Optional<FateKey> fateKeySeen,
"Collision detected for fate id " + fateId);
}
+ protected void verifyLock(ZooUtil.LockID lockID, FateId fateId) {
Review Comment:
Feel free to ignore this comment, it was something I noticed but not sure
about what if any changes to make based on this observation.
In every place where this method is called the next line is something like
`FateReservation.from(lockID, UUID.randomUUID())`. The call to
FateReservation.from will fail if the lockId is null. So this method may be
redundant, however its error message is more informative because it includes
the fate id.
##########
server/base/src/main/java/org/apache/accumulo/server/util/Admin.java:
##########
@@ -910,50 +939,111 @@ private void executeFateOpsCommand(ServerContext
context, FateOpsCommand fateOps
AdminUtil<Admin> admin = new AdminUtil<>(true);
final String zkRoot = context.getZooKeeperRoot();
- var zLockManagerPath = context.getServerPaths().createManagerPath();
var zTableLocksPath = context.getServerPaths().createTableLocksPath();
String fateZkPath = zkRoot + Constants.ZFATE;
ZooReaderWriter zk = context.getZooReaderWriter();
- MetaFateStore<Admin> mfs = new MetaFateStore<>(fateZkPath, zk,
createDummyLockID(), null);
- UserFateStore<Admin> ufs = new UserFateStore<>(context,
createDummyLockID(), null);
- Map<FateInstanceType,FateStore<Admin>> fateStores =
- Map.of(FateInstanceType.META, mfs, FateInstanceType.USER, ufs);
- Map<FateInstanceType,ReadOnlyFateStore<Admin>> readOnlyFateStores =
- Map.of(FateInstanceType.META, mfs, FateInstanceType.USER, ufs);
-
- if (fateOpsCommand.cancel) {
- cancelSubmittedFateTxs(context, fateOpsCommand.fateIdList);
- } else if (fateOpsCommand.fail) {
- for (String fateIdStr : fateOpsCommand.fateIdList) {
- if (!admin.prepFail(fateStores, zk, zLockManagerPath, fateIdStr)) {
- throw new AccumuloException("Could not fail transaction: " +
fateIdStr);
+ ServiceLock adminLock = null;
+ Map<FateInstanceType,FateStore<Admin>> fateStores;
+ Map<FateInstanceType,ReadOnlyFateStore<Admin>> readOnlyFateStores = null;
+
+ try {
+ if (fateOpsCommand.cancel) {
+ cancelSubmittedFateTxs(context, fateOpsCommand.fateIdList);
+ } else if (fateOpsCommand.fail) {
+ adminLock = createAdminLock(context);
+ fateStores = createFateStores(context, zk, fateZkPath, adminLock);
+ for (String fateIdStr : fateOpsCommand.fateIdList) {
+ if (!admin.prepFail(fateStores, fateIdStr)) {
+ throw new AccumuloException("Could not fail transaction: " +
fateIdStr);
+ }
}
+ } else if (fateOpsCommand.delete) {
+ adminLock = createAdminLock(context);
+ fateStores = createFateStores(context, zk, fateZkPath, adminLock);
+ for (String fateIdStr : fateOpsCommand.fateIdList) {
+ if (!admin.prepDelete(fateStores, fateIdStr)) {
+ throw new AccumuloException("Could not delete transaction: " +
fateIdStr);
+ }
+ admin.deleteLocks(zk, zTableLocksPath, fateIdStr);
+ }
+ }
+
+ if (fateOpsCommand.print) {
+ final Set<FateId> fateIdFilter = new TreeSet<>();
+ fateOpsCommand.fateIdList.forEach(fateIdStr ->
fateIdFilter.add(FateId.from(fateIdStr)));
+ EnumSet<ReadOnlyFateStore.TStatus> statusFilter =
+ getCmdLineStatusFilters(fateOpsCommand.states);
+ EnumSet<FateInstanceType> typesFilter =
+ getCmdLineInstanceTypeFilters(fateOpsCommand.instanceTypes);
+ readOnlyFateStores = createReadOnlyFateStores(context, zk, fateZkPath);
+ admin.print(readOnlyFateStores, zk, zTableLocksPath, new
Formatter(System.out),
+ fateIdFilter, statusFilter, typesFilter);
+ // print line break at the end
+ System.out.println();
}
- } else if (fateOpsCommand.delete) {
- for (String fateIdStr : fateOpsCommand.fateIdList) {
- if (!admin.prepDelete(fateStores, zk, zLockManagerPath, fateIdStr)) {
- throw new AccumuloException("Could not delete transaction: " +
fateIdStr);
+
+ if (fateOpsCommand.summarize) {
+ if (readOnlyFateStores == null) {
+ readOnlyFateStores = createReadOnlyFateStores(context, zk,
fateZkPath);
}
- admin.deleteLocks(zk, zTableLocksPath, fateIdStr);
+ summarizeFateTx(context, fateOpsCommand, admin, readOnlyFateStores,
zTableLocksPath);
+ }
+ } finally {
+ if (adminLock != null) {
+ adminLock.unlock();
}
}
+ }
- if (fateOpsCommand.print) {
- final Set<FateId> fateIdFilter = new TreeSet<>();
- fateOpsCommand.fateIdList.forEach(fateIdStr ->
fateIdFilter.add(FateId.from(fateIdStr)));
- EnumSet<ReadOnlyFateStore.TStatus> statusFilter =
- getCmdLineStatusFilters(fateOpsCommand.states);
- EnumSet<FateInstanceType> typesFilter =
- getCmdLineInstanceTypeFilters(fateOpsCommand.instanceTypes);
- admin.print(readOnlyFateStores, zk, zTableLocksPath, new
Formatter(System.out), fateIdFilter,
- statusFilter, typesFilter);
- // print line break at the end
- System.out.println();
- }
+ private Map<FateInstanceType,FateStore<Admin>>
createFateStores(ServerContext context,
+ ZooReaderWriter zk, String fateZkPath, ServiceLock adminLock)
+ throws InterruptedException, KeeperException {
+ var lockId = adminLock.getLockID();
+ MetaFateStore<Admin> mfs = new MetaFateStore<>(fateZkPath, zk, lockId,
null);
+ UserFateStore<Admin> ufs =
+ new UserFateStore<>(context, AccumuloTable.FATE.tableName(), lockId,
null);
+ return Map.of(FateInstanceType.META, mfs, FateInstanceType.USER, ufs);
+ }
- if (fateOpsCommand.summarize) {
- summarizeFateTx(context, fateOpsCommand, admin, readOnlyFateStores,
zTableLocksPath);
+ private Map<FateInstanceType,ReadOnlyFateStore<Admin>>
+ createReadOnlyFateStores(ServerContext context, ZooReaderWriter zk,
String fateZkPath)
+ throws InterruptedException, KeeperException {
+ MetaFateStore<Admin> readOnlyMFS = new MetaFateStore<>(fateZkPath, zk,
null, null);
+ UserFateStore<Admin> readOnlyUFS =
+ new UserFateStore<>(context, AccumuloTable.FATE.tableName(), null,
null);
+ return Map.of(FateInstanceType.META, readOnlyMFS, FateInstanceType.USER,
readOnlyUFS);
+ }
+
+ private ServiceLock createAdminLock(ServerContext context) throws
InterruptedException {
+ var zk = context.getZooReaderWriter().getZooKeeper();
+ UUID uuid = UUID.randomUUID();
+ ServiceLockPath slp = context.getServerPaths().createAdminLockPath();
+ ServiceLock adminLock = new
ServiceLock(context.getZooReaderWriter().getZooKeeper(), slp, uuid);
+ AdminLockWatcher lw = new AdminLockWatcher();
+ ServiceLockData.ServiceDescriptors descriptors = new
ServiceLockData.ServiceDescriptors();
+ descriptors.addService(new ServiceLockData.ServiceDescriptor(uuid,
+ ServiceLockData.ThriftService.NONE, "localhost",
Constants.DEFAULT_RESOURCE_GROUP_NAME));
+ ServiceLockData sld = new ServiceLockData(descriptors);
+ String lockPath = slp.toString();
+ String parentLockPath = lockPath.substring(0, lockPath.indexOf("/lock"));
+
+ try {
+ if (zk.exists(parentLockPath, false) == null) {
+ zk.create(parentLockPath, new byte[0], ZooUtil.PUBLIC,
CreateMode.PERSISTENT);
+ log.info("Created: {}", parentLockPath);
Review Comment:
```suggestion
log.info("Created: {} in zookeeper", parentLockPath);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]