rpuch commented on code in PR #2566:
URL: https://github.com/apache/ignite-3/pull/2566#discussion_r1322944983
##########
modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java:
##########
@@ -1693,8 +1743,107 @@ private static Stream<Arguments>
multiRowsWriteRequestTypes() {
.map(Arguments::of);
}
- private static UUID beginTx() {
- return TestTransactionIds.newTransactionId();
+ @CartesianTest
+ @CartesianTest.MethodFactory("singleRowReadOrWriteRequestTypesFactory")
+ void
singleRowRwReadsAndWritesFailIfTableSchemaVersionIncreasedSinceTxStart(
+ RequestType requestType, boolean onExistingRow, boolean full
+ ) {
+
testRwReadsAndWritesFailIfTableSchemaVersionIncreasedSinceTxStart(onExistingRow,
(targetTxId, key) -> {
+ return doSingleRowRequest(targetTxId,
marshalKeyOrKeyValue(requestType, key), requestType);
+ });
+ }
+
+ @SuppressWarnings("unused")
+ private static ArgumentSets singleRowReadOrWriteRequestTypesFactory() {
+ return
ArgumentSets.argumentsForFirstParameter(singleRowReadOrWriteRequestTypes())
+ .argumentsForNextParameter(false, true)
+ .argumentsForNextParameter(false, true);
+ }
+
+ private static Stream<RequestType> singleRowReadOrWriteRequestTypes() {
+ return Arrays.stream(RequestType.values())
+ .filter(type -> RequestTypes.isSingleRowRwRead(type) ||
RequestTypes.isSingleRowWrite(type));
+ }
+
+ private void
testRwReadsAndWritesFailIfTableSchemaVersionIncreasedSinceTxStart(
+ boolean onExistingRow, ListenerInvocation listenerInvocation
+ ) {
+ TestKey key = nextKey();
+
+ if (onExistingRow) {
+ upsertInNewTxFor(key);
+ }
+
+ UUID txId = beginTx();
+ HybridTimestamp txBeginTs = TransactionIds.beginTimestamp(txId);
+
+ CatalogTableDescriptor tableVersion1 =
mock(CatalogTableDescriptor.class);
+ CatalogTableDescriptor tableVersion2 =
mock(CatalogTableDescriptor.class);
+ when(tableVersion1.tableVersion()).thenReturn(CURRENT_SCHEMA_VERSION);
+ when(tableVersion2.tableVersion()).thenReturn(NEXT_SCHEMA_VERSION);
+
+ when(catalogTables.table(tblId,
txBeginTs.longValue())).thenReturn(tableVersion1);
+ when(catalogTables.table(eq(tblId),
gt(txBeginTs.longValue()))).thenReturn(tableVersion2);
+
+ CompletableFuture<?> future = listenerInvocation.invoke(txId, key);
+
+ TransactionException ex = assertWillThrowFast(future,
TransactionException.class);
+ assertThat(ex.code(), is(Transactions.TX_INCOMPATIBLE_SCHEMA_ERR));
+ assertThat(
+ ex.getMessage(),
+ is("Table schema was updated since the transaction was started
[table=1, startSchema=1, operationSchema=2")
Review Comment:
It was not, good catch!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]