keith-turner commented on code in PR #4524:
URL: https://github.com/apache/accumulo/pull/4524#discussion_r1671222699
##########
core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java:
##########
@@ -114,38 +131,26 @@ public static Object deserialize(byte[] ser) {
}
}
- /**
- * Attempt to reserve the fate transaction.
- *
- * @param fateId The FateId
- * @return An Optional containing the FateTxStore if the transaction was
successfully reserved, or
- * an empty Optional if the transaction was already reserved.
- */
- @Override
- public Optional<FateTxStore<T>> tryReserve(FateId fateId) {
- synchronized (this) {
- if (!reserved.contains(fateId)) {
- return Optional.of(reserve(fateId));
- }
- return Optional.empty();
- }
- }
-
@Override
public FateTxStore<T> reserve(FateId fateId) {
- synchronized (AbstractFateStore.this) {
- while (reserved.contains(fateId)) {
- try {
- AbstractFateStore.this.wait(100);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new IllegalStateException(e);
- }
+ Preconditions.checkState(!_getStatus(fateId).equals(TStatus.UNKNOWN),
+ "Attempted to reserve a tx that does not exist: " + fateId);
Review Comment:
Something could exist and be reserved when we enter the method and be
deleted while we are waiting to reserve it. This will do an RPC to read data
and then tryRerserve will do another RPC. For efficiency and correctness would
be better to only do this check in the case when tryReserve returns empty. If
the check is done in the loop it will handle the case of something being
deleted while waiting. Also if the check is done in the loop, then when
tryReserve is successful then the 2nd RPC will never be made.
##########
core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java:
##########
@@ -361,90 +356,108 @@ public Optional<FateTxStore<T>> createAndReserve(FateKey
fateKey) {
protected abstract Optional<FateKey> getKey(FateId fateId);
- protected abstract FateTxStore<T> newFateTxStore(FateId fateId, boolean
isReserved);
-
- protected abstract FateInstanceType getInstanceType();
+ protected abstract FateTxStore<T> newUnreservedFateTxStore(FateId fateId);
protected abstract class AbstractFateTxStoreImpl<T> implements
FateTxStore<T> {
protected final FateId fateId;
- protected final boolean isReserved;
+ protected boolean deleted;
+ protected FateReservation reservation;
protected TStatus observedStatus = null;
- protected AbstractFateTxStoreImpl(FateId fateId, boolean isReserved) {
+ protected AbstractFateTxStoreImpl(FateId fateId) {
+ this.fateId = fateId;
+ this.deleted = false;
+ this.reservation = null;
+ }
+
+ protected AbstractFateTxStoreImpl(FateId fateId, FateReservation
reservation) {
this.fateId = fateId;
- this.isReserved = isReserved;
+ this.deleted = false;
+ this.reservation = Objects.requireNonNull(reservation);
+ }
+
+ protected boolean isReserved() {
+ return this.reservation != null;
}
@Override
public TStatus waitForStatusChange(EnumSet<TStatus> expected) {
- Preconditions.checkState(!isReserved,
- "Attempted to wait for status change while reserved " + fateId);
- while (true) {
+ Preconditions.checkState(!isReserved(),
+ "Attempted to wait for status change while reserved: " + fateId);
+ verifyReserved(false);
- long countBefore = unreservedNonNewCount.getCount();
+ int currNumCallers = concurrentStatusChangeCallers.incrementAndGet();
Review Comment:
This change makes the polling more efficient in the case of many things
waiting for status change. All the code following this should be placed in a
try catch, something like the following
```java
try {
// polling loop
} finally {
concurrentStatusChangeCallers.decrementAndGet();
}
```
this ensures that its decremented even when an exception happens.
One thing that is being lost in this change is the in memory signalling.
Not something for this PR, but would be good to have something that does some
in memory signalling and is efficient w/ the polling as the number of pollers
increases. The less latency there is in the impl of the method the faster
things like create table, bulk import, etc will be to complete from the user
API perspective. The in memory signalling can help reduce latency.
##########
core/src/main/java/org/apache/accumulo/core/fate/FateStore.java:
##########
@@ -107,11 +118,155 @@ interface FateTxStore<T> extends ReadOnlyFateTxStore<T> {
void unreserve(Duration deferTime);
}
+ /**
+ * The value stored to indicate a FATE transaction ID ({@link FateId}) has
been reserved
+ */
+ class FateReservation {
+
+ // The LockID (provided by the Manager running the FATE which uses this
store) which is used for
+ // identifying dead Managers, so their reservations can be deleted and
picked up again since
+ // they can no longer be worked on.
+ private final ZooUtil.LockID lockID; // TODO 4131 not sure if this is the
best type for this
+ // The UUID generated on a reservation attempt (tryReserve()) used to
uniquely identify that
+ // attempt. This is useful for the edge case where the reservation is sent
to the server
+ // (Tablet Server for UserFateStore and the ZooKeeper Server for
MetaFateStore), but the server
+ // dies before the store receives the response. It allows us to determine
if the reservation
+ // was successful and was written by this reservation attempt (could have
been successfully
+ // reserved by another attempt or not reserved at all, in which case, we
wouldn't want to
+ // expose a FateTxStore).
+ private final UUID reservationUUID;
+ private final byte[] serialized;
+ private static final Pattern UUID_PATTERN =
+
Pattern.compile("^[0-9a-fA-F]{8}-([0-9a-fA-F]{4}-){3}[0-9a-fA-F]{12}$");
+ private static final Pattern LOCKID_PATTERN =
Pattern.compile("^.+/.+\\$[0-9a-fA-F]+$");
+
+ private FateReservation(ZooUtil.LockID lockID, UUID reservationUUID) {
+ this.lockID = Objects.requireNonNull(lockID);
+ this.reservationUUID = Objects.requireNonNull(reservationUUID);
+ this.serialized = serialize(lockID, reservationUUID);
+ }
+
+ public static FateReservation from(ZooUtil.LockID lockID, UUID
reservationUUID) {
+ return new FateReservation(lockID, reservationUUID);
+ }
+
+ public static FateReservation from(byte[] serialized) {
+ try (DataInputBuffer buffer = new DataInputBuffer()) {
+ buffer.reset(serialized, serialized.length);
+ ZooUtil.LockID lockID = new ZooUtil.LockID("", buffer.readUTF());
+ UUID reservationUUID = UUID.fromString(buffer.readUTF());
+ return new FateReservation(lockID, reservationUUID);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ public static FateReservation from(String fateReservationStr) {
+ if (isFateReservation(fateReservationStr)) {
+ String[] fields = fateReservationStr.split(":");
+ ZooUtil.LockID lockId = new ZooUtil.LockID("", fields[0]);
+ UUID reservationUUID = UUID.fromString(fields[1]);
+ return new FateReservation(lockId, reservationUUID);
+ } else {
+ throw new IllegalArgumentException(
+ "Tried to create a FateReservation from an invalid string: " +
fateReservationStr);
+ }
+ }
+
+ /**
+ *
+ * @param fateReservationStr the string from a call to {@link
FateReservation#toString()}
+ * @return true if the string represents a valid FateReservation object,
false otherwise
+ */
+ public static boolean isFateReservation(String fateReservationStr) {
+ if (fateReservationStr != null) {
+ String[] fields = fateReservationStr.split(":");
+ if (fields.length == 2) {
+ return LOCKID_PATTERN.matcher(fields[0]).matches()
+ && UUID_PATTERN.matcher(fields[1]).matches();
Review Comment:
Could use UuidUtil.isUUID here
##########
core/src/main/java/org/apache/accumulo/core/fate/MetaFateStore.java:
##########
@@ -106,13 +112,95 @@ public FateId create() {
@Override
protected void create(FateId fateId, FateKey key) {
try {
- zk.putPersistentData(getTXPath(fateId), new NodeValue(TStatus.NEW,
key).serialize(),
+ zk.putPersistentData(getTXPath(fateId), new NodeValue(TStatus.NEW, null,
key).serialize(),
NodeExistsPolicy.FAIL);
} catch (KeeperException | InterruptedException e) {
throw new IllegalStateException(e);
}
}
+ @Override
+ public Optional<FateTxStore<T>> tryReserve(FateId fateId) {
+ // uniquely identify this attempt to reserve the fate operation data
+ FateReservation reservation = FateReservation.from(lockID,
UUID.randomUUID());
+
+ try {
+ byte[] newSerNodeVal = zk.mutateExisting(getTXPath(fateId),
currSerNodeVal -> {
+ NodeValue currNodeVal = new NodeValue(currSerNodeVal);
+ // The uuid handles the case where there was a ZK server fault and the
write for this thread
+ // went through but that was not acknowledged, and we are reading our
own write for 2nd
+ // time.
+ if (!currNodeVal.isReserved() || (currNodeVal.isReserved()
+ && currNodeVal.reservation.orElseThrow().equals(reservation))) {
+ FateKey currFateKey = currNodeVal.fateKey.orElse(null);
+ // Add the FateReservation to the node to reserve
+ return new NodeValue(currNodeVal.status, reservation,
currFateKey).serialize();
+ } else {
+ // This will not change the value to null but will return null
+ return null;
+ }
+ });
+ if (newSerNodeVal != null) {
+ return Optional.of(new FateTxStoreImpl(fateId, reservation));
+ } else {
+ return Optional.empty();
+ }
+ } catch (InterruptedException | KeeperException |
AcceptableThriftTableOperationException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ @Override
+ public boolean isReserved(FateId fateId) {
+ return getNode(fateId).isReserved();
+ }
+
+ @Override
+ public Map<FateId,FateReservation> getActiveReservations() {
+ Map<FateId,FateReservation> activeReservations = new HashMap<>();
+
+ try {
+ for (String strTxId : zk.getChildren(path)) {
+ String txUUIDStr = strTxId.split("_")[1];
+ FateId fateId = FateId.from(fateInstanceType, txUUIDStr);
+ if (isReserved(fateId)) {
+ FateReservation reservation =
getNode(fateId).reservation.orElseThrow();
+ activeReservations.put(fateId, reservation);
+ }
+ }
+ } catch (KeeperException | InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+
+ return activeReservations;
+ }
+
+ @Override
+ public void deleteDeadReservations() {
+ for (Map.Entry<FateId,FateReservation> entry :
getActiveReservations().entrySet()) {
+ FateId fateId = entry.getKey();
+ FateReservation reservation = entry.getValue();
Review Comment:
Can avoid calling `zk.mutateExisting` and reading from zookeeper in the
case where the lock is held with a check like the following.
```suggestion
FateReservation reservation = entry.getValue();
if(isLockHeld(reservation.getLockID())) {
continue;
}
```
##########
core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java:
##########
@@ -175,6 +176,21 @@ public Optional<FateTxStore<T>> createAndReserve(FateKey
fateKey) {
}
return txStore;
}
+
+ @Override
+ public boolean isReserved(FateId fateId) {
+ return store.isReserved(fateId);
+ }
+
+ @Override
+ public Map<FateId,FateReservation> getActiveReservations() {
+ return store.getActiveReservations();
+ }
+
+ @Override
+ public void deleteDeadReservations() {
+ store.deleteDeadReservations();
Review Comment:
Probably would not go here, but it would be nice to have consistent logging
somehow for each dead reservation that is detected and deleted.
##########
core/src/main/java/org/apache/accumulo/core/fate/MetaFateStore.java:
##########
@@ -55,7 +60,7 @@
//TODO use zoocache? - ACCUMULO-1297
//TODO handle zookeeper being down gracefully - ACCUMULO-1297
-
+// TODO 4131 noticed this class is not in the fate.zookeeper package. Should
it be?
Review Comment:
Yeah it probably should be. That would be consistent with UserFateStore,
which is in a sub package.
##########
core/src/main/java/org/apache/accumulo/core/fate/MetaFateStore.java:
##########
@@ -224,9 +316,21 @@ public void setStatus(TStatus status) {
verifyReserved(true);
try {
- zk.putPersistentData(getTXPath(fateId), new
NodeValue(status).serialize(),
- NodeExistsPolicy.OVERWRITE);
- } catch (KeeperException | InterruptedException e) {
+ zk.mutateExisting(getTXPath(fateId), currSerializedData -> {
+ NodeValue currNodeVal = new NodeValue(currSerializedData);
+ // Ensure the FateId is reserved in ZK, and it is reserved with the
expected reservation
+ if (currNodeVal.isReserved()
+ &&
currNodeVal.reservation.orElseThrow().equals(this.reservation)) {
Review Comment:
Not sure if this change is worthwhile. Saw this pattern a few times, could
create a method in NodeValue and push the check there to shorten code
elsewhere.
```suggestion
if (currNodeVal.isReservedBy(this.reservation)) {
```
##########
core/src/main/java/org/apache/accumulo/core/fate/user/FateMutatorImpl.java:
##########
@@ -79,6 +82,33 @@ public FateMutator<T> putCreateTime(long ctime) {
return this;
}
+ @Override
+ public FateMutator<T> putReservedTx(FateStore.FateReservation reservation) {
+ Condition condition = new
Condition(TxColumnFamily.RESERVATION_COLUMN.getColumnFamily(),
+
TxColumnFamily.RESERVATION_COLUMN.getColumnQualifier()).setValue(NOT_RESERVED);
+ mutation.addCondition(condition);
+ TxColumnFamily.RESERVATION_COLUMN.put(mutation, new
Value(reservation.toString()));
Review Comment:
Its nice to avoid calling toString() when its return value is used with
serialized data as the expectations around toString() are not well defined. For
example someone could change the toString impl to improve a log message or
something like that and not realize the implications for serialized data.
```suggestion
TxColumnFamily.RESERVATION_COLUMN.put(mutation, new
Value(reservation.getSerialized()));
```
##########
core/src/main/java/org/apache/accumulo/core/fate/user/FateMutatorImpl.java:
##########
@@ -79,6 +82,33 @@ public FateMutator<T> putCreateTime(long ctime) {
return this;
}
+ @Override
+ public FateMutator<T> putReservedTx(FateStore.FateReservation reservation) {
+ Condition condition = new
Condition(TxColumnFamily.RESERVATION_COLUMN.getColumnFamily(),
+
TxColumnFamily.RESERVATION_COLUMN.getColumnQualifier()).setValue(NOT_RESERVED);
+ mutation.addCondition(condition);
+ TxColumnFamily.RESERVATION_COLUMN.put(mutation, new
Value(reservation.toString()));
+ return this;
+ }
+
+ @Override
+ public FateMutator<T> putUnreserveTx(FateStore.FateReservation reservation) {
+ Condition condition = new
Condition(TxColumnFamily.RESERVATION_COLUMN.getColumnFamily(),
+
TxColumnFamily.RESERVATION_COLUMN.getColumnQualifier()).setValue(reservation.toString());
Review Comment:
```suggestion
TxColumnFamily.RESERVATION_COLUMN.getColumnQualifier()).setValue(reservation.getSerialized());
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]