sashapolo commented on code in PR #2761:
URL: https://github.com/apache/ignite-3/pull/2761#discussion_r1375389539
##########
modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java:
##########
@@ -95,17 +101,52 @@ protected final <T> T sync(CompletableFuture<T> fut) {
* @return Whatever the action returns.
*/
protected final <T> CompletableFuture<T> withSchemaSync(@Nullable
Transaction tx, KvAction<T> action) {
- // TODO: IGNITE-20106 - retry if our request is rejected by the server
due to a changed schema version.
+ return withSchemaSync(tx, null, action);
+ }
+ private <T> CompletableFuture<T> withSchemaSync(@Nullable Transaction tx,
@Nullable Integer previousSchemaVersion, KvAction<T> action) {
CompletableFuture<Integer> schemaVersionFuture = tx == null
? schemaVersions.schemaVersionAtNow(tbl.tableId())
: schemaVersions.schemaVersionAt(((InternalTransaction)
tx).startTimestamp(), tbl.tableId());
- CompletableFuture<T> future =
schemaVersionFuture.thenCompose(action::act);
+ CompletableFuture<T> future = schemaVersionFuture
Review Comment:
May I propose the following code re-arrangement to make it shorter?
```
CompletableFuture<T> future = schemaVersionFuture
.thenCompose(schemaVersion -> action.act(schemaVersion)
.handle((res, ex) -> {
if
(isOrCausedBy(InternalSchemaVersionMismatchException.class, ex)) {
assert tx == null : "Only for implicit transactions
a retry might be requested";
assert previousSchemaVersion == null ||
!Objects.equals(schemaVersion, previousSchemaVersion)
: "Same schema version (" + schemaVersion
+ ") on a retry: something is wrong,
is this caused by the test setup?";
// Repeat.
return withSchemaSync(tx, schemaVersion, action);
} else {
return ex == null ? completedFuture(res) :
CompletableFuture.<T>failedFuture(ex);
}
}))
.thenCompose(identity());
```
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -472,34 +472,30 @@ private CompletableFuture<?>
processRequest(ReplicaRequest request, @Nullable Bo
}
}
- return waitForSchemasBeforeReading(request)
- .thenCompose(unused -> validateTableExistence(request))
- .thenCompose(opStartTimestamp ->
processOperationRequest(request, isPrimary, senderId, opStartTimestamp));
+ HybridTimestamp opTsIfDirectRo = (request instanceof
ReadOnlyDirectReplicaRequest) ? hybridClock.now() : null;
+
+ return validateTableExistence(request, opTsIfDirectRo)
+ .thenCompose(unused -> validateSchemaMatch(request,
opTsIfDirectRo))
+ .thenCompose(unused -> waitForSchemasBeforeReading(request,
opTsIfDirectRo))
+ .thenCompose(opStartTimestamp ->
processOperationRequest(request, isPrimary, senderId, opTsIfDirectRo));
}
/**
- * Makes sure that we have schemas corresponding to the moment of tx
start; this makes PK extraction safe WRT
- * {@link org.apache.ignite.internal.schema.SchemaRegistry#schema(int)}.
+ * Validates that the table exists at a timestamp corresponding to the
request operation.
*
- * @param request Request that's being processed.
+ * <ul>
+ * <li>For an RW read/write, it's 'now'</li>
Review Comment:
```suggestion
* <li>For a read/write, it's 'now'</li>
```
##########
modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java:
##########
@@ -95,17 +101,52 @@ protected final <T> T sync(CompletableFuture<T> fut) {
* @return Whatever the action returns.
*/
protected final <T> CompletableFuture<T> withSchemaSync(@Nullable
Transaction tx, KvAction<T> action) {
- // TODO: IGNITE-20106 - retry if our request is rejected by the server
due to a changed schema version.
+ return withSchemaSync(tx, null, action);
+ }
+ private <T> CompletableFuture<T> withSchemaSync(@Nullable Transaction tx,
@Nullable Integer previousSchemaVersion, KvAction<T> action) {
CompletableFuture<Integer> schemaVersionFuture = tx == null
? schemaVersions.schemaVersionAtNow(tbl.tableId())
: schemaVersions.schemaVersionAt(((InternalTransaction)
tx).startTimestamp(), tbl.tableId());
- CompletableFuture<T> future =
schemaVersionFuture.thenCompose(action::act);
+ CompletableFuture<T> future = schemaVersionFuture
+ .thenCompose(schemaVersion -> {
+ return action.act(schemaVersion)
+ .handle((BiFunction<T, Throwable,
CompletableFuture<T>>) (res, ex) -> {
+ if (ex != null &&
isOrCausedBy(InternalSchemaVersionMismatchException.class, ex)) {
+ assert tx == null : "Only for implicit
transactions a retry might be requested";
+ assert previousSchemaVersion == null ||
!Objects.equals(schemaVersion, previousSchemaVersion)
+ : "Same schema version (" +
schemaVersion
+ + ") on a retry: something
is wrong, is this caused by the test setup?";
+
+ // Repeat.
+ return withSchemaSync(tx, schemaVersion,
action);
+ }
+
+ if (ex != null) {
+ return failedFuture(ex);
+ }
+
+ return completedFuture(res);
+ });
+ })
+ .thenCompose(identity());
return convertToPublicFuture(future);
}
+ private static boolean isOrCausedBy(Class<? extends Exception>
exceptionClass, @Nullable Throwable ex) {
+ if (ex == null) {
Review Comment:
Can we replace this with `return ex != null &&
(exceptionClass.isInstance(ex) || isOrCausedBy(exceptionClass,
ex.getCause()));` ?
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/SchemaCompatValidator.java:
##########
@@ -221,4 +221,14 @@ void failIfTableDoesNotExistAt(HybridTimestamp
operationTimestamp, int tableId)
throw tableWasDroppedException(tableId);
}
}
+
+ void failIfRequestSchemaDiffersFromTxTs(HybridTimestamp txTs, int
requestSchemaVersion, int tableId) {
Review Comment:
Missing javadoc
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -472,34 +472,30 @@ private CompletableFuture<?>
processRequest(ReplicaRequest request, @Nullable Bo
}
}
- return waitForSchemasBeforeReading(request)
- .thenCompose(unused -> validateTableExistence(request))
- .thenCompose(opStartTimestamp ->
processOperationRequest(request, isPrimary, senderId, opStartTimestamp));
+ HybridTimestamp opTsIfDirectRo = (request instanceof
ReadOnlyDirectReplicaRequest) ? hybridClock.now() : null;
+
+ return validateTableExistence(request, opTsIfDirectRo)
+ .thenCompose(unused -> validateSchemaMatch(request,
opTsIfDirectRo))
+ .thenCompose(unused -> waitForSchemasBeforeReading(request,
opTsIfDirectRo))
+ .thenCompose(opStartTimestamp ->
processOperationRequest(request, isPrimary, senderId, opTsIfDirectRo));
}
/**
- * Makes sure that we have schemas corresponding to the moment of tx
start; this makes PK extraction safe WRT
- * {@link org.apache.ignite.internal.schema.SchemaRegistry#schema(int)}.
+ * Validates that the table exists at a timestamp corresponding to the
request operation.
*
- * @param request Request that's being processed.
+ * <ul>
+ * <li>For an RW read/write, it's 'now'</li>
+ * <li>For an RO read (with readTimestamp), it's readTimestamp
(matches readTimestamp in the transaction)</li>
Review Comment:
```suggestion
* <li>For a read (with readTimestamp), it's readTimestamp (matches
readTimestamp in the transaction)</li>
```
##########
modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java:
##########
@@ -2256,6 +2258,129 @@ private void testCommitRequestIfTableWasDropped(
assertThat("The transaction must have been aborted", committed.get(),
is(false));
}
+ @CartesianTest
+ @CartesianTest.MethodFactory("singleRowRwOperationTypesFactory")
+ void singleRowRwOperationsFailIfSchemaVersionMismatchesTx(RequestType
requestType, boolean onExistingRow, boolean full) {
+ RwListenerInvocation invocation = null;
+
+ if (RequestTypes.isSingleRowRwPkOnly(requestType)) {
+ invocation = (targetTxId, key) -> {
+ return doSingleRowPkRequest(targetTxId,
marshalKeyOrKeyValue(requestType, key), requestType, full);
Review Comment:
you can use expression lambda here and in similar places
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -520,18 +518,84 @@ private CompletableFuture<HybridTimestamp>
validateTableExistence(ReplicaRequest
}
return schemaSyncService.waitForMetadataCompleteness(opStartTs)
- .thenApply(unused -> {
- schemaCompatValidator.failIfTableDoesNotExistAt(opStartTs,
tableId());
+ .thenRun(() ->
schemaCompatValidator.failIfTableDoesNotExistAt(opStartTs, tableId()));
+ }
+
+ /**
+ * Makes sure that {@link
SchemaVersionAwareReplicaRequest#schemaVersion()} sent in a request matches
table schema version
+ * corresponding to the operation.
+ *
+ * @param request Replica request corresponding to the operation.
+ * @param opTsIfDirectRo Operation timestamp for a direct RO, {@code null}
otherwise.
+ * @return Future completed when the validation is finished.
+ */
+ private CompletableFuture<Void> validateSchemaMatch(ReplicaRequest
request, @Nullable HybridTimestamp opTsIfDirectRo) {
+ if (!(request instanceof SchemaVersionAwareReplicaRequest)) {
+ return completedFuture(null);
+ }
+
+ SchemaVersionAwareReplicaRequest versionAwareRequest =
(SchemaVersionAwareReplicaRequest) request;
Review Comment:
Looks like this variable is used at the very end, better move it there
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -472,34 +472,30 @@ private CompletableFuture<?>
processRequest(ReplicaRequest request, @Nullable Bo
}
}
- return waitForSchemasBeforeReading(request)
- .thenCompose(unused -> validateTableExistence(request))
- .thenCompose(opStartTimestamp ->
processOperationRequest(request, isPrimary, senderId, opStartTimestamp));
+ HybridTimestamp opTsIfDirectRo = (request instanceof
ReadOnlyDirectReplicaRequest) ? hybridClock.now() : null;
+
+ return validateTableExistence(request, opTsIfDirectRo)
+ .thenCompose(unused -> validateSchemaMatch(request,
opTsIfDirectRo))
+ .thenCompose(unused -> waitForSchemasBeforeReading(request,
opTsIfDirectRo))
+ .thenCompose(opStartTimestamp ->
processOperationRequest(request, isPrimary, senderId, opTsIfDirectRo));
}
/**
- * Makes sure that we have schemas corresponding to the moment of tx
start; this makes PK extraction safe WRT
- * {@link org.apache.ignite.internal.schema.SchemaRegistry#schema(int)}.
+ * Validates that the table exists at a timestamp corresponding to the
request operation.
*
- * @param request Request that's being processed.
+ * <ul>
+ * <li>For an RW read/write, it's 'now'</li>
+ * <li>For an RO read (with readTimestamp), it's readTimestamp
(matches readTimestamp in the transaction)</li>
+ * <li>For an RO direct read, it's the timestamp chosen (as 'now') to
process the request</li>
Review Comment:
```suggestion
* <li>For a direct read, it's the timestamp chosen (as 'now') to
process the request</li>
```
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -520,18 +518,84 @@ private CompletableFuture<HybridTimestamp>
validateTableExistence(ReplicaRequest
}
return schemaSyncService.waitForMetadataCompleteness(opStartTs)
- .thenApply(unused -> {
- schemaCompatValidator.failIfTableDoesNotExistAt(opStartTs,
tableId());
+ .thenRun(() ->
schemaCompatValidator.failIfTableDoesNotExistAt(opStartTs, tableId()));
+ }
+
+ /**
+ * Makes sure that {@link
SchemaVersionAwareReplicaRequest#schemaVersion()} sent in a request matches
table schema version
+ * corresponding to the operation.
+ *
+ * @param request Replica request corresponding to the operation.
+ * @param opTsIfDirectRo Operation timestamp for a direct RO, {@code null}
otherwise.
+ * @return Future completed when the validation is finished.
+ */
+ private CompletableFuture<Void> validateSchemaMatch(ReplicaRequest
request, @Nullable HybridTimestamp opTsIfDirectRo) {
+ if (!(request instanceof SchemaVersionAwareReplicaRequest)) {
+ return completedFuture(null);
+ }
+
+ SchemaVersionAwareReplicaRequest versionAwareRequest =
(SchemaVersionAwareReplicaRequest) request;
+
+ HybridTimestamp tsToWaitForSchema = getTxStartTimestamp(request);
Review Comment:
It may be a good idea to pass `opTsIfDirectRo` to `getTxStartTimestamp` so
that it will return it instead of `null`, this will make the code shorter
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]