keith-turner commented on code in PR #4524:
URL: https://github.com/apache/accumulo/pull/4524#discussion_r1696006961
##########
core/src/main/java/org/apache/accumulo/core/fate/MetaFateStore.java:
##########
@@ -104,19 +109,108 @@ public FateId create() {
}
@Override
- protected void create(FateId fateId, FateKey key) {
+ public Optional<FateTxStore<T>> createAndReserve(FateKey fateKey) {
+ final var reservation = FateReservation.from(lockID, UUID.randomUUID());
+ final var fateId = fateIdGenerator.fromTypeAndKey(type(), fateKey);
+
try {
- zk.putPersistentData(getTXPath(fateId), new NodeValue(TStatus.NEW,
key).serialize(),
- NodeExistsPolicy.FAIL);
- } catch (KeeperException | InterruptedException e) {
+ byte[] nodeVal = zk.mutateOrCreate(getTXPath(fateId),
+ new NodeValue(TStatus.NEW, reservation, fateKey).serialize(),
currSerNodeVal -> {
+ // We are only returning a non-null value for the following cases:
+ // 1) The existing NodeValue for fateId is exactly the same as the
value set for the
+ // node if it doesn't yet exist:
+ // TStatus = TStatus.NEW, FateReservation = reservation, FateKey =
fateKey
+ // This might occur if there was a ZK server fault and the same
write is running a 2nd
+ // time
+ // 2) The existing NodeValue for fateId has:
+ // TStatus = TStatus.NEW, no FateReservation present, FateKey =
fateKey
+ // The fateId is NEW/unseeded and not reserved, so we can allow it
to be reserved
+ NodeValue currNodeVal = new NodeValue(currSerNodeVal);
+ if (currNodeVal.status == TStatus.NEW &&
currNodeVal.isReservedBy(reservation)) {
+ verifyFateKey(fateId, currNodeVal.fateKey, fateKey);
+ return currSerNodeVal;
+ } else if (currNodeVal.status == TStatus.NEW &&
!currNodeVal.isReserved()) {
+ verifyFateKey(fateId, currNodeVal.fateKey, fateKey);
+ // NEW/unseeded transaction and not reserved, so we can allow it
to be reserved
+ return new NodeValue(TStatus.NEW, reservation,
fateKey).serialize();
+ } else {
+ log.trace(
+ "fate id {} tstatus {} fate key {} is reserved {} is either
currently reserved "
+ + "or has already been seeded with work (non-NEW
status), or both",
+ fateId, currNodeVal.status, currNodeVal.fateKey.orElse(null),
+ currNodeVal.isReserved());
+ // This will not change the value to null but will return null
+ return null;
+ }
+ });
+ if (nodeVal != null) {
+ return Optional.of(new FateTxStoreImpl(fateId, reservation));
+ } else {
+ return Optional.empty();
+ }
+ } catch (InterruptedException | KeeperException |
AcceptableThriftTableOperationException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ @Override
+ public Optional<FateTxStore<T>> tryReserve(FateId fateId) {
+ // uniquely identify this attempt to reserve the fate operation data
+ FateReservation reservation = FateReservation.from(lockID,
UUID.randomUUID());
+
+ try {
+ byte[] newSerNodeVal = zk.mutateExisting(getTXPath(fateId),
currSerNodeVal -> {
Review Comment:
Its possible nothing exists for this fateId in zookeeper. Looking at the
impl of `mutateExisting` it calls
[Zookeeper.getData()](https://zookeeper.apache.org/doc/r3.9.2/apidocs/zookeeper-server/org/apache/zookeeper/ZooKeeper.html#getData(java.lang.String,boolean,org.apache.zookeeper.data.Stat))
which will throw a NoNodeException for this case. Looking at the usage of
`mutateExisting` seems like its currently only used in places where we expect
the node to always exists. This case is a bit different, because the fate id
could be deleted. So could add the following to the try/catch.
```java
}catch(KeeperException.NoNodeException nne) {
//TODO log trace?
return Optional.empty();
```
Also maybe can test this case of attempting to reserve something that does
not exists in the ITs.
##########
core/src/main/java/org/apache/accumulo/core/fate/MetaFateStore.java:
##########
@@ -104,19 +109,108 @@ public FateId create() {
}
@Override
- protected void create(FateId fateId, FateKey key) {
+ public Optional<FateTxStore<T>> createAndReserve(FateKey fateKey) {
+ final var reservation = FateReservation.from(lockID, UUID.randomUUID());
+ final var fateId = fateIdGenerator.fromTypeAndKey(type(), fateKey);
+
try {
- zk.putPersistentData(getTXPath(fateId), new NodeValue(TStatus.NEW,
key).serialize(),
- NodeExistsPolicy.FAIL);
- } catch (KeeperException | InterruptedException e) {
+ byte[] nodeVal = zk.mutateOrCreate(getTXPath(fateId),
+ new NodeValue(TStatus.NEW, reservation, fateKey).serialize(),
currSerNodeVal -> {
+ // We are only returning a non-null value for the following cases:
+ // 1) The existing NodeValue for fateId is exactly the same as the
value set for the
+ // node if it doesn't yet exist:
+ // TStatus = TStatus.NEW, FateReservation = reservation, FateKey =
fateKey
+ // This might occur if there was a ZK server fault and the same
write is running a 2nd
+ // time
+ // 2) The existing NodeValue for fateId has:
+ // TStatus = TStatus.NEW, no FateReservation present, FateKey =
fateKey
+ // The fateId is NEW/unseeded and not reserved, so we can allow it
to be reserved
+ NodeValue currNodeVal = new NodeValue(currSerNodeVal);
+ if (currNodeVal.status == TStatus.NEW &&
currNodeVal.isReservedBy(reservation)) {
+ verifyFateKey(fateId, currNodeVal.fateKey, fateKey);
+ return currSerNodeVal;
+ } else if (currNodeVal.status == TStatus.NEW &&
!currNodeVal.isReserved()) {
+ verifyFateKey(fateId, currNodeVal.fateKey, fateKey);
+ // NEW/unseeded transaction and not reserved, so we can allow it
to be reserved
+ return new NodeValue(TStatus.NEW, reservation,
fateKey).serialize();
+ } else {
+ log.trace(
+ "fate id {} tstatus {} fate key {} is reserved {} is either
currently reserved "
+ + "or has already been seeded with work (non-NEW
status), or both",
+ fateId, currNodeVal.status, currNodeVal.fateKey.orElse(null),
+ currNodeVal.isReserved());
+ // This will not change the value to null but will return null
+ return null;
+ }
+ });
+ if (nodeVal != null) {
+ return Optional.of(new FateTxStoreImpl(fateId, reservation));
+ } else {
+ return Optional.empty();
+ }
+ } catch (InterruptedException | KeeperException |
AcceptableThriftTableOperationException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ @Override
+ public Optional<FateTxStore<T>> tryReserve(FateId fateId) {
+ // uniquely identify this attempt to reserve the fate operation data
+ FateReservation reservation = FateReservation.from(lockID,
UUID.randomUUID());
+
+ try {
+ byte[] newSerNodeVal = zk.mutateExisting(getTXPath(fateId),
currSerNodeVal -> {
+ NodeValue currNodeVal = new NodeValue(currSerNodeVal);
+ // The uuid handles the case where there was a ZK server fault and the
write for this thread
+ // went through but that was not acknowledged, and we are reading our
own write for 2nd
+ // time.
+ if (!currNodeVal.isReserved() ||
currNodeVal.isReservedBy(reservation)) {
+ FateKey currFateKey = currNodeVal.fateKey.orElse(null);
+ // Add the FateReservation to the node to reserve
+ return new NodeValue(currNodeVal.status, reservation,
currFateKey).serialize();
+ } else {
+ // This will not change the value to null but will return null
+ return null;
+ }
+ });
+ if (newSerNodeVal != null) {
+ return Optional.of(new FateTxStoreImpl(fateId, reservation));
+ } else {
+ return Optional.empty();
+ }
+ } catch (InterruptedException | KeeperException |
AcceptableThriftTableOperationException e) {
throw new IllegalStateException(e);
}
}
@Override
- protected Pair<TStatus,Optional<FateKey>> getStatusAndKey(FateId fateId) {
- final NodeValue node = getNode(fateId);
- return new Pair<>(node.status, node.fateKey);
+ public void deleteDeadReservations() {
+ for (Map.Entry<FateId,FateReservation> entry :
getActiveReservations().entrySet()) {
+ FateId fateId = entry.getKey();
+ FateReservation reservation = entry.getValue();
+ if (isLockHeld.test(reservation.getLockID())) {
+ continue;
+ }
+ try {
+ zk.mutateExisting(getTXPath(fateId), currSerNodeVal -> {
+ NodeValue currNodeVal = new NodeValue(currSerNodeVal);
+ // Make sure the current node is still reserved and reserved with
the expected reservation
+ // and it is dead
+ if (currNodeVal.isReservedBy(reservation)
+ &&
!isLockHeld.test(currNodeVal.reservation.orElseThrow().getLockID())) {
+ // Delete the reservation
+ log.trace("Deleted the dead reservation {} for fate id {}",
reservation, fateId);
+ return new NodeValue(currNodeVal.status, null,
currNodeVal.fateKey.orElse(null))
+ .serialize();
+ } else {
+ // No change
+ return null;
+ }
+ });
+ } catch (KeeperException | InterruptedException |
AcceptableThriftTableOperationException e) {
Review Comment:
We could catch NoNodeException here and just ignore it, means it was deleted
between when we read it and tried to act on it.
##########
core/src/main/java/org/apache/accumulo/core/fate/MetaFateStore.java:
##########
@@ -363,13 +498,23 @@ protected Stream<FateIdStatus>
getTransactions(Set<TStatus> statuses) {
Stream<FateIdStatus> stream = zk.getChildren(path).stream().map(strTxid
-> {
String txUUIDStr = strTxid.split("_")[1];
FateId fateId = FateId.from(fateInstanceType, txUUIDStr);
- // Memoizing for two reasons. First the status may never be requested,
so in that case avoid
- // the lookup. Second, if its requested multiple times the result will
always be consistent.
- Supplier<TStatus> statusSupplier = Suppliers.memoize(() ->
_getStatus(fateId));
+ // Memoizing for two reasons. First the status or reservation may
never be requested, so
+ // in that case avoid the lookup. Second, if it's requested multiple
times the result will
+ // always be consistent.
+ Supplier<Pair<TStatus,Optional<FateReservation>>> statusAndResSupplier
=
+ Suppliers.memoize(() -> {
+ NodeValue zkNode = getNode(fateId);
+ return new Pair<>(zkNode.status, zkNode.reservation);
+ });
Review Comment:
Seems like the Pair could be dropped and this could be simplified to return
the NodeValue
```suggestion
Supplier<NodeValue> statusAndResSupplier =
Suppliers.memoize(() -> getNode(fateId));
```
##########
core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java:
##########
@@ -120,35 +127,161 @@ public FateId getFateId() {
}
@Override
- protected void create(FateId fateId, FateKey fateKey) {
- final int maxAttempts = 5;
-
+ public Optional<FateTxStore<T>> createAndReserve(FateKey fateKey) {
+ final var reservation = FateReservation.from(lockID, UUID.randomUUID());
+ final var fateId = fateIdGenerator.fromTypeAndKey(type(), fateKey);
+ Optional<FateTxStore<T>> txStore = Optional.empty();
+ int maxAttempts = 5;
+ FateMutator.Status status = null;
+
+ // We first need to write the initial/unreserved value for the reservation
column
+ // Only need to retry if it is UNKNOWN
for (int attempt = 0; attempt < maxAttempts; attempt++) {
-
- if (attempt >= 1) {
- log.debug("Failed to create transaction with fateId {} and fateKey {},
trying again",
- fateId, fateKey);
- UtilWaitThread.sleep(100);
+ status = newMutator(fateId).putInitReservationVal().tryMutate();
+ if (status != FateMutator.Status.UNKNOWN) {
+ break;
}
+ UtilWaitThread.sleep(100);
+ }
+ for (int attempt = 0; attempt < maxAttempts; attempt++) {
Review Comment:
The way this code is structured its writing two mutations. Wondering if it
could write everything in a single conditional mutation. The conditional
mutation would have the following
* condition that reservation column is absent
* condition that status column is absent
* set status column to NEW
* set reservation column to expected reservation
* set key column
* set time column
I suspect the way it structured has to do with the NOT_RESERVED, but I do
not understand why it needs to be this way.
##########
core/src/main/java/org/apache/accumulo/core/fate/user/FateStatusFilter.java:
##########
@@ -54,8 +55,15 @@ public void init(SortedKeyValueIterator<Key,Value> source,
Map<String,String> op
@Override
public boolean accept(Key k, Value v) {
- var tstatus = ReadOnlyFateStore.TStatus.valueOf(v.toString());
- return valuesToAccept.contains(tstatus);
+ // We may see TStatus values or FateReservation values with how this
filter is used,
+ // only accept TStatus values, return false on FateReservation values,
error otherwise
+ try {
+ var tstatus = ReadOnlyFateStore.TStatus.valueOf(v.toString());
+ return valuesToAccept.contains(tstatus);
+ } catch (IllegalArgumentException e) {
Review Comment:
This iterator may need to be reworked. The iterator should probably return
a the status and reservation column for row only if the status matches. This
iterator is looking at individual key values, so it can not consider the higher
level of the row. Could do the following
* Make this class instead extent WholeRowIterator
* Override the `boolean filter(Text currentRow, List<Key> keys, List<Value>
values)` method in WholeRowIterator and make it look for the status column and
check its value to determine wether to keep the row.
* Change the `configureScanner` method on this class to either setup this
filter or just a WholeRowIterator w/o filtering when all statuses are
requested.
##########
core/src/main/java/org/apache/accumulo/core/fate/MetaFateStore.java:
##########
@@ -224,9 +322,20 @@ public void setStatus(TStatus status) {
verifyReserved(true);
try {
- zk.putPersistentData(getTXPath(fateId), new
NodeValue(status).serialize(),
- NodeExistsPolicy.OVERWRITE);
- } catch (KeeperException | InterruptedException e) {
+ zk.mutateExisting(getTXPath(fateId), currSerializedData -> {
+ NodeValue currNodeVal = new NodeValue(currSerializedData);
+ // Ensure the FateId is reserved in ZK, and it is reserved with the
expected reservation
+ if (currNodeVal.isReservedBy(this.reservation)) {
+ FateReservation currFateReservation =
currNodeVal.reservation.orElseThrow();
+ FateKey currFateKey = currNodeVal.fateKey.orElse(null);
+ NodeValue newNodeValue = new NodeValue(status,
currFateReservation, currFateKey);
+ return newNodeValue.serialize();
+ } else {
+ throw new IllegalStateException("Either the FateId is not reserved
in ZK, or it is"
+ + " but the reservation in ZK differs from that in the
store.");
+ }
+ });
+ } catch (KeeperException | InterruptedException |
AcceptableThriftTableOperationException e) {
Review Comment:
For this one we expect the node to exist in zookeeper, so if we see a
NoNodeException here its ok throw an exception.
##########
core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java:
##########
@@ -120,35 +127,161 @@ public FateId getFateId() {
}
@Override
- protected void create(FateId fateId, FateKey fateKey) {
- final int maxAttempts = 5;
-
+ public Optional<FateTxStore<T>> createAndReserve(FateKey fateKey) {
+ final var reservation = FateReservation.from(lockID, UUID.randomUUID());
+ final var fateId = fateIdGenerator.fromTypeAndKey(type(), fateKey);
+ Optional<FateTxStore<T>> txStore = Optional.empty();
+ int maxAttempts = 5;
+ FateMutator.Status status = null;
+
+ // We first need to write the initial/unreserved value for the reservation
column
+ // Only need to retry if it is UNKNOWN
for (int attempt = 0; attempt < maxAttempts; attempt++) {
-
- if (attempt >= 1) {
- log.debug("Failed to create transaction with fateId {} and fateKey {},
trying again",
- fateId, fateKey);
- UtilWaitThread.sleep(100);
+ status = newMutator(fateId).putInitReservationVal().tryMutate();
+ if (status != FateMutator.Status.UNKNOWN) {
+ break;
}
+ UtilWaitThread.sleep(100);
+ }
+ for (int attempt = 0; attempt < maxAttempts; attempt++) {
+ status =
newMutator(fateId).requireStatus().putStatus(TStatus.NEW).putKey(fateKey)
+
.putReservedTx(reservation).putCreateTime(System.currentTimeMillis()).tryMutate();
+ if (status != FateMutator.Status.UNKNOWN) {
+ break;
+ }
+ UtilWaitThread.sleep(100);
+ }
- var status =
newMutator(fateId).requireStatus().putStatus(TStatus.NEW).putKey(fateKey)
- .putCreateTime(System.currentTimeMillis()).tryMutate();
+ switch (status) {
+ case ACCEPTED:
+ txStore = Optional.of(new FateTxStoreImpl(fateId, reservation));
+ break;
+ case REJECTED:
+ // If the status is REJECTED, we need to check what about the mutation
was REJECTED:
+ // 1) Possible something like the following occurred:
+ // the first attempt was UNKNOWN but written, the next attempt would
be rejected
+ // We return the FateTxStore in this case.
+ // 2) If there is a collision with existing fate id, throw error
+ // 3) If the fate id is already reserved, return an empty optional
+ // 4) If the fate id is still NEW/unseeded and unreserved, we can try
to reserve it
+ try (Scanner scanner = context.createScanner(tableName,
Authorizations.EMPTY)) {
+ scanner.setRange(getRow(fateId));
+ scanner.fetchColumn(TxColumnFamily.STATUS_COLUMN.getColumnFamily(),
+ TxColumnFamily.STATUS_COLUMN.getColumnQualifier());
+ scanner.fetchColumn(TxColumnFamily.TX_KEY_COLUMN.getColumnFamily(),
+ TxColumnFamily.TX_KEY_COLUMN.getColumnQualifier());
+
scanner.fetchColumn(TxColumnFamily.RESERVATION_COLUMN.getColumnFamily(),
+ TxColumnFamily.RESERVATION_COLUMN.getColumnQualifier());
+ TStatus statusSeen = TStatus.UNKNOWN;
+ Optional<FateKey> fateKeySeen = Optional.empty();
+ Optional<FateReservation> reservationSeen = Optional.empty();
+
+ for (Entry<Key,Value> entry :
scanner.stream().collect(Collectors.toList())) {
Review Comment:
```suggestion
for (Entry<Key,Value> entry : scanner) {
```
##########
core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java:
##########
@@ -67,27 +76,36 @@ public FateId fromTypeAndKey(FateInstanceType instanceType,
FateKey fateKey) {
}
};
- protected final Set<FateId> reserved;
+ // The ZooKeeper lock for the process that's running this store instance
+ protected final ZooUtil.LockID lockID;
+ protected final Predicate<ZooUtil.LockID> isLockHeld;
protected final Map<FateId,NanoTime> deferred;
+ protected final FateIdGenerator fateIdGenerator;
private final int maxDeferred;
private final AtomicBoolean deferredOverflow = new AtomicBoolean();
- private final FateIdGenerator fateIdGenerator;
-
- // This is incremented each time a transaction was unreserved that was non
new
- protected final SignalCount unreservedNonNewCount = new SignalCount();
// This is incremented each time a transaction is unreserved that was
runnable
- protected final SignalCount unreservedRunnableCount = new SignalCount();
+ private final SignalCount unreservedRunnableCount = new SignalCount();
+
+ // Keeps track of the number of concurrent callers to waitForStatusChange()
+ private final AtomicInteger concurrentStatusChangeCallers = new
AtomicInteger(0);
public AbstractFateStore() {
- this(DEFAULT_MAX_DEFERRED, DEFAULT_FATE_ID_GENERATOR);
+ this(null, null, DEFAULT_MAX_DEFERRED, DEFAULT_FATE_ID_GENERATOR);
}
- public AbstractFateStore(int maxDeferred, FateIdGenerator fateIdGenerator) {
+ public AbstractFateStore(ZooUtil.LockID lockID, Predicate<ZooUtil.LockID>
isLockHeld) {
+ this(lockID, isLockHeld, DEFAULT_MAX_DEFERRED, DEFAULT_FATE_ID_GENERATOR);
+ }
+
+ public AbstractFateStore(ZooUtil.LockID lockID, Predicate<ZooUtil.LockID>
isLockHeld,
+ int maxDeferred, FateIdGenerator fateIdGenerator) {
this.maxDeferred = maxDeferred;
this.fateIdGenerator = Objects.requireNonNull(fateIdGenerator);
- this.reserved = new HashSet<>();
- this.deferred = new HashMap<>();
+ this.deferred = Collections.synchronizedMap(new HashMap<>());
+ this.lockID = Objects.requireNonNullElseGet(lockID,
AbstractFateStore::createDummyLockID);
Review Comment:
Falling back to calling createDummyLockID here could hide bugs. Could
instead have the caller of this constructor that does not have a lockid call
`AbstractFateStore.createDummyLockID()`. This removes the ambiguity of whether
null was intentional or unintentional. This could be a follow on issue.
```suggestion
this.lockID = Objects.requireNonNull(lockID);
```
##########
core/src/main/java/org/apache/accumulo/core/fate/MetaFateStore.java:
##########
@@ -421,14 +569,30 @@ private Optional<FateKey>
deserializeFateKey(DataInputBuffer buffer) throws IOEx
return Optional.empty();
}
+ private Optional<FateReservation>
deserializeFateReservation(DataInputBuffer buffer)
+ throws IOException {
+ int length = buffer.readInt();
+ if (length > 0) {
Review Comment:
If negative values are not expected, could add a check for that.
```suggestion
int length = buffer.readInt();
Preconditions.checkArgument(length >=0);
if (length > 0) {
```
##########
core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java:
##########
@@ -120,35 +127,161 @@ public FateId getFateId() {
}
@Override
- protected void create(FateId fateId, FateKey fateKey) {
- final int maxAttempts = 5;
-
+ public Optional<FateTxStore<T>> createAndReserve(FateKey fateKey) {
+ final var reservation = FateReservation.from(lockID, UUID.randomUUID());
+ final var fateId = fateIdGenerator.fromTypeAndKey(type(), fateKey);
+ Optional<FateTxStore<T>> txStore = Optional.empty();
+ int maxAttempts = 5;
+ FateMutator.Status status = null;
+
+ // We first need to write the initial/unreserved value for the reservation
column
+ // Only need to retry if it is UNKNOWN
for (int attempt = 0; attempt < maxAttempts; attempt++) {
-
- if (attempt >= 1) {
- log.debug("Failed to create transaction with fateId {} and fateKey {},
trying again",
- fateId, fateKey);
- UtilWaitThread.sleep(100);
+ status = newMutator(fateId).putInitReservationVal().tryMutate();
+ if (status != FateMutator.Status.UNKNOWN) {
+ break;
}
+ UtilWaitThread.sleep(100);
+ }
+ for (int attempt = 0; attempt < maxAttempts; attempt++) {
+ status =
newMutator(fateId).requireStatus().putStatus(TStatus.NEW).putKey(fateKey)
+
.putReservedTx(reservation).putCreateTime(System.currentTimeMillis()).tryMutate();
+ if (status != FateMutator.Status.UNKNOWN) {
+ break;
+ }
+ UtilWaitThread.sleep(100);
+ }
- var status =
newMutator(fateId).requireStatus().putStatus(TStatus.NEW).putKey(fateKey)
- .putCreateTime(System.currentTimeMillis()).tryMutate();
+ switch (status) {
+ case ACCEPTED:
+ txStore = Optional.of(new FateTxStoreImpl(fateId, reservation));
+ break;
+ case REJECTED:
+ // If the status is REJECTED, we need to check what about the mutation
was REJECTED:
+ // 1) Possible something like the following occurred:
+ // the first attempt was UNKNOWN but written, the next attempt would
be rejected
+ // We return the FateTxStore in this case.
+ // 2) If there is a collision with existing fate id, throw error
+ // 3) If the fate id is already reserved, return an empty optional
+ // 4) If the fate id is still NEW/unseeded and unreserved, we can try
to reserve it
+ try (Scanner scanner = context.createScanner(tableName,
Authorizations.EMPTY)) {
+ scanner.setRange(getRow(fateId));
+ scanner.fetchColumn(TxColumnFamily.STATUS_COLUMN.getColumnFamily(),
+ TxColumnFamily.STATUS_COLUMN.getColumnQualifier());
+ scanner.fetchColumn(TxColumnFamily.TX_KEY_COLUMN.getColumnFamily(),
+ TxColumnFamily.TX_KEY_COLUMN.getColumnQualifier());
+
scanner.fetchColumn(TxColumnFamily.RESERVATION_COLUMN.getColumnFamily(),
+ TxColumnFamily.RESERVATION_COLUMN.getColumnQualifier());
+ TStatus statusSeen = TStatus.UNKNOWN;
+ Optional<FateKey> fateKeySeen = Optional.empty();
+ Optional<FateReservation> reservationSeen = Optional.empty();
+
+ for (Entry<Key,Value> entry :
scanner.stream().collect(Collectors.toList())) {
+ Text colf = entry.getKey().getColumnFamily();
+ Text colq = entry.getKey().getColumnQualifier();
+ Value val = entry.getValue();
+
+ switch (colq.toString()) {
+ case TxColumnFamily.STATUS:
+ statusSeen = TStatus.valueOf(val.toString());
+ break;
+ case TxColumnFamily.TX_KEY:
+ fateKeySeen = Optional.of(FateKey.deserialize(val.get()));
+ break;
+ case TxColumnFamily.RESERVATION:
+ if (FateReservation.isFateReservation(val.get())) {
+ reservationSeen =
Optional.of(FateReservation.deserialize(val.get()));
+ }
+ break;
+ default:
+ throw new IllegalStateException("Unexpected column seen: " +
colf + ":" + colq);
+ }
+ }
- switch (status) {
- case ACCEPTED:
- return;
- case UNKNOWN:
- continue;
- case REJECTED:
- throw new IllegalStateException("Attempt to create transaction with
fateId " + fateId
- + " and fateKey " + fateKey + " was rejected");
- default:
- throw new IllegalStateException("Unknown status " + status);
+ // This will be the case if the mutation status is REJECTED but the
mutation was written
+ if (statusSeen == TStatus.NEW && reservationSeen.isPresent()
+ && reservationSeen.orElseThrow().equals(reservation)) {
+ verifyFateKey(fateId, fateKeySeen, fateKey);
+ txStore = Optional.of(new FateTxStoreImpl(fateId, reservation));
+ } else if (statusSeen == TStatus.NEW && reservationSeen.isEmpty()) {
+ verifyFateKey(fateId, fateKeySeen, fateKey);
+ // NEW/unseeded transaction and not reserved, so we can allow it
to be reserved
+ // we tryReserve() since another thread may have reserved it since
the scan
+ txStore = tryReserve(fateId);
Review Comment:
```suggestion
txStore = tryReserve(fateId);
// the status was known before reserving to be NEW, however it
could change so check after reserving to avoid race conditions.
var statusAfterReserve =
txStore.map(ReadOnlyFateTxStore::getStatus).orElse(TStatus.UNKNOWN);
if (statusAfterReserve != TStatus.NEW) {
txStore.ifPresent(txs->txs.unreserve(Duration.ZERO));
txStore = Optional.empty();
}
```
##########
core/src/main/java/org/apache/accumulo/core/fate/MetaFateStore.java:
##########
@@ -224,9 +322,20 @@ public void setStatus(TStatus status) {
verifyReserved(true);
try {
- zk.putPersistentData(getTXPath(fateId), new
NodeValue(status).serialize(),
- NodeExistsPolicy.OVERWRITE);
- } catch (KeeperException | InterruptedException e) {
+ zk.mutateExisting(getTXPath(fateId), currSerializedData -> {
+ NodeValue currNodeVal = new NodeValue(currSerializedData);
+ // Ensure the FateId is reserved in ZK, and it is reserved with the
expected reservation
+ if (currNodeVal.isReservedBy(this.reservation)) {
+ FateReservation currFateReservation =
currNodeVal.reservation.orElseThrow();
+ FateKey currFateKey = currNodeVal.fateKey.orElse(null);
+ NodeValue newNodeValue = new NodeValue(status,
currFateReservation, currFateKey);
+ return newNodeValue.serialize();
+ } else {
+ throw new IllegalStateException("Either the FateId is not reserved
in ZK, or it is"
Review Comment:
For this exception it would be useful to include the fateId. In general any
exception including the fate id in the message could be very helpful for
debugging problems. This could be a follow on issue.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]