kevinrr888 commented on code in PR #4524:
URL: https://github.com/apache/accumulo/pull/4524#discussion_r1688548666
##########
core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java:
##########
@@ -151,6 +157,95 @@ protected void create(FateId fateId, FateKey fateKey) {
+ " and fateKey " + fateKey + " after " + maxAttempts + " attempts");
}
+ @Override
+ public Optional<FateTxStore<T>> tryReserve(FateId fateId) {
+ // Create a unique FateReservation for this reservation attempt
+ FateReservation reservation = FateReservation.from(lockID,
UUID.randomUUID());
+
+ FateMutator.Status status =
newMutator(fateId).putReservedTx(reservation).tryMutate();
+ if (status.equals(FateMutator.Status.ACCEPTED)) {
+ return Optional.of(new FateTxStoreImpl(fateId, reservation));
+ } else if (status.equals(FateMutator.Status.UNKNOWN)) {
+ // If the status is UNKNOWN, this means an error occurred after the
mutation was
+ // sent to the TabletServer, and it is unknown if the mutation was
written. We
+ // need to check if the mutation was written and if it was written by
this
+ // attempt at reservation. If it was written by this reservation attempt,
+ // we can return the FateTxStore since it was successfully reserved in
this
+ // attempt, otherwise we return empty (was written by another reservation
+ // attempt or was not written at all).
+ try (Scanner scanner = context.createScanner(tableName,
Authorizations.EMPTY)) {
+ scanner.setRange(getRow(fateId));
+
scanner.fetchColumn(TxColumnFamily.RESERVATION_COLUMN.getColumnFamily(),
+ TxColumnFamily.RESERVATION_COLUMN.getColumnQualifier());
+ FateReservation persistedRes = scanner.stream()
+ .filter(entry ->
FateReservation.isFateReservation(entry.getValue().toString()))
+ .map(entry ->
FateReservation.from(entry.getValue().toString())).findFirst()
+ .orElse(null);
+ if (persistedRes != null && persistedRes.equals(reservation)) {
+ return Optional.of(new FateTxStoreImpl(fateId, reservation));
+ }
Review Comment:
I wrote what would be the equivalent stream needed, and it was pretty
complicated. I personally think the way it is now is a bit more readable/easy
to follow. Since it would be the same, I'm gonna leave it as is.
##########
core/src/main/java/org/apache/accumulo/core/fate/MetaFateStore.java:
##########
@@ -106,13 +112,95 @@ public FateId create() {
@Override
protected void create(FateId fateId, FateKey key) {
try {
- zk.putPersistentData(getTXPath(fateId), new NodeValue(TStatus.NEW,
key).serialize(),
+ zk.putPersistentData(getTXPath(fateId), new NodeValue(TStatus.NEW, null,
key).serialize(),
NodeExistsPolicy.FAIL);
} catch (KeeperException | InterruptedException e) {
throw new IllegalStateException(e);
}
}
+ @Override
+ public Optional<FateTxStore<T>> tryReserve(FateId fateId) {
+ // uniquely identify this attempt to reserve the fate operation data
+ FateReservation reservation = FateReservation.from(lockID,
UUID.randomUUID());
+
+ try {
+ byte[] newSerNodeVal = zk.mutateExisting(getTXPath(fateId),
currSerNodeVal -> {
+ NodeValue currNodeVal = new NodeValue(currSerNodeVal);
+ // The uuid handles the case where there was a ZK server fault and the
write for this thread
+ // went through but that was not acknowledged, and we are reading our
own write for 2nd
+ // time.
+ if (!currNodeVal.isReserved() || (currNodeVal.isReserved()
+ && currNodeVal.reservation.orElseThrow().equals(reservation))) {
+ FateKey currFateKey = currNodeVal.fateKey.orElse(null);
+ // Add the FateReservation to the node to reserve
+ return new NodeValue(currNodeVal.status, reservation,
currFateKey).serialize();
+ } else {
+ // This will not change the value to null but will return null
+ return null;
+ }
+ });
+ if (newSerNodeVal != null) {
+ return Optional.of(new FateTxStoreImpl(fateId, reservation));
+ } else {
+ return Optional.empty();
+ }
+ } catch (InterruptedException | KeeperException |
AcceptableThriftTableOperationException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ @Override
+ public boolean isReserved(FateId fateId) {
+ return getNode(fateId).isReserved();
+ }
+
+ @Override
+ public Map<FateId,FateReservation> getActiveReservations() {
+ Map<FateId,FateReservation> activeReservations = new HashMap<>();
+
+ try {
+ for (String strTxId : zk.getChildren(path)) {
+ String txUUIDStr = strTxId.split("_")[1];
+ FateId fateId = FateId.from(fateInstanceType, txUUIDStr);
+ if (isReserved(fateId)) {
+ FateReservation reservation =
getNode(fateId).reservation.orElseThrow();
Review Comment:
Addressed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]