kevinrr888 commented on code in PR #4524:
URL: https://github.com/apache/accumulo/pull/4524#discussion_r1700589603
##########
core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java:
##########
@@ -120,35 +127,161 @@ public FateId getFateId() {
}
@Override
- protected void create(FateId fateId, FateKey fateKey) {
- final int maxAttempts = 5;
-
+ public Optional<FateTxStore<T>> createAndReserve(FateKey fateKey) {
+ final var reservation = FateReservation.from(lockID, UUID.randomUUID());
+ final var fateId = fateIdGenerator.fromTypeAndKey(type(), fateKey);
+ Optional<FateTxStore<T>> txStore = Optional.empty();
+ int maxAttempts = 5;
+ FateMutator.Status status = null;
+
+ // We first need to write the initial/unreserved value for the reservation
column
+ // Only need to retry if it is UNKNOWN
for (int attempt = 0; attempt < maxAttempts; attempt++) {
-
- if (attempt >= 1) {
- log.debug("Failed to create transaction with fateId {} and fateKey {},
trying again",
- fateId, fateKey);
- UtilWaitThread.sleep(100);
+ status = newMutator(fateId).putInitReservationVal().tryMutate();
+ if (status != FateMutator.Status.UNKNOWN) {
+ break;
}
+ UtilWaitThread.sleep(100);
+ }
+ for (int attempt = 0; attempt < maxAttempts; attempt++) {
+ status =
newMutator(fateId).requireStatus().putStatus(TStatus.NEW).putKey(fateKey)
+
.putReservedTx(reservation).putCreateTime(System.currentTimeMillis()).tryMutate();
+ if (status != FateMutator.Status.UNKNOWN) {
+ break;
+ }
+ UtilWaitThread.sleep(100);
+ }
- var status =
newMutator(fateId).requireStatus().putStatus(TStatus.NEW).putKey(fateKey)
- .putCreateTime(System.currentTimeMillis()).tryMutate();
+ switch (status) {
+ case ACCEPTED:
+ txStore = Optional.of(new FateTxStoreImpl(fateId, reservation));
+ break;
+ case REJECTED:
+ // If the status is REJECTED, we need to check what about the mutation
was REJECTED:
+ // 1) Possible something like the following occurred:
+ // the first attempt was UNKNOWN but written, the next attempt would
be rejected
+ // We return the FateTxStore in this case.
+ // 2) If there is a collision with existing fate id, throw error
+ // 3) If the fate id is already reserved, return an empty optional
+ // 4) If the fate id is still NEW/unseeded and unreserved, we can try
to reserve it
+ try (Scanner scanner = context.createScanner(tableName,
Authorizations.EMPTY)) {
+ scanner.setRange(getRow(fateId));
+ scanner.fetchColumn(TxColumnFamily.STATUS_COLUMN.getColumnFamily(),
+ TxColumnFamily.STATUS_COLUMN.getColumnQualifier());
+ scanner.fetchColumn(TxColumnFamily.TX_KEY_COLUMN.getColumnFamily(),
+ TxColumnFamily.TX_KEY_COLUMN.getColumnQualifier());
+
scanner.fetchColumn(TxColumnFamily.RESERVATION_COLUMN.getColumnFamily(),
+ TxColumnFamily.RESERVATION_COLUMN.getColumnQualifier());
+ TStatus statusSeen = TStatus.UNKNOWN;
+ Optional<FateKey> fateKeySeen = Optional.empty();
+ Optional<FateReservation> reservationSeen = Optional.empty();
+
+ for (Entry<Key,Value> entry :
scanner.stream().collect(Collectors.toList())) {
+ Text colf = entry.getKey().getColumnFamily();
+ Text colq = entry.getKey().getColumnQualifier();
+ Value val = entry.getValue();
+
+ switch (colq.toString()) {
+ case TxColumnFamily.STATUS:
+ statusSeen = TStatus.valueOf(val.toString());
+ break;
+ case TxColumnFamily.TX_KEY:
+ fateKeySeen = Optional.of(FateKey.deserialize(val.get()));
+ break;
+ case TxColumnFamily.RESERVATION:
+ if (FateReservation.isFateReservation(val.get())) {
+ reservationSeen =
Optional.of(FateReservation.deserialize(val.get()));
+ }
+ break;
+ default:
+ throw new IllegalStateException("Unexpected column seen: " +
colf + ":" + colq);
+ }
+ }
- switch (status) {
- case ACCEPTED:
- return;
- case UNKNOWN:
- continue;
- case REJECTED:
- throw new IllegalStateException("Attempt to create transaction with
fateId " + fateId
- + " and fateKey " + fateKey + " was rejected");
- default:
- throw new IllegalStateException("Unknown status " + status);
+ // This will be the case if the mutation status is REJECTED but the
mutation was written
+ if (statusSeen == TStatus.NEW && reservationSeen.isPresent()
+ && reservationSeen.orElseThrow().equals(reservation)) {
+ verifyFateKey(fateId, fateKeySeen, fateKey);
+ txStore = Optional.of(new FateTxStoreImpl(fateId, reservation));
+ } else if (statusSeen == TStatus.NEW && reservationSeen.isEmpty()) {
+ verifyFateKey(fateId, fateKeySeen, fateKey);
+ // NEW/unseeded transaction and not reserved, so we can allow it
to be reserved
+ // we tryReserve() since another thread may have reserved it since
the scan
+ txStore = tryReserve(fateId);
Review Comment:
Added
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]