Copilot commented on code in PR #6779:
URL: https://github.com/apache/ignite-3/pull/6779#discussion_r2536665861
##########
modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java:
##########
@@ -409,12 +435,19 @@ public static TableNotFoundException
tableIdNotFoundException(Integer tableId) {
boolean readOnly = in.unpackBoolean();
long timeoutMillis = in.unpackLong();
- InternalTxOptions txOptions = InternalTxOptions.builder()
- .timeoutMillis(timeoutMillis)
- .build();
+ var builder =
InternalTxOptions.builder().timeoutMillis(timeoutMillis);
+ if (options.contains(RequestOptions.HAS_PRIORITY)) {
+ boolean lowPriority = in.unpackBoolean();
+ // Currently we use low priority with getAll fragments to
avoid conflicts with subsequent explicit RW transactions,
+ // because locks are released asynchronously. This makes
client's getAll a subject for starvation.
+ // TODO https://issues.apache.org/jira/browse/IGNITE-27039
+ if (lowPriority) {
+ builder.priority(TxPriority.LOW);
Review Comment:
[nitpick] There's a potential issue with the condition check. The code
checks if `HAS_PRIORITY` option is present but only applies low priority if
`lowPriority` is true. However, when the feature is supported but not enabled
(e.g., in compatibility mode), the client might not send the priority flag at
all, which could cause an unpacking error. Consider validating that the
unpacker has data available before reading:
```java
if (options.contains(RequestOptions.HAS_PRIORITY)) {
boolean lowPriority = in.unpackBoolean();
if (lowPriority) {
builder.priority(TxPriority.LOW);
}
}
```
This appears correct, but ensure the client always sends the boolean when
the feature is supported on the server side.
```suggestion
// Only unpack the boolean if it is present (not nil).
if (!in.tryUnpackNil()) {
boolean lowPriority = in.unpackBoolean();
// Currently we use low priority with getAll
fragments to avoid conflicts with subsequent explicit RW transactions,
// because locks are released asynchronously. This
makes client's getAll a subject for starvation.
// TODO
https://issues.apache.org/jira/browse/IGNITE-27039
if (lowPriority) {
builder.priority(TxPriority.LOW);
}
```
##########
modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java:
##########
@@ -739,6 +740,29 @@ int tryGetPartitionCount() {
return partitionCount;
}
+ /**
+ * Implicit getAll/containsAll transaction is executed as multiple
independent transactions with lightweight coordination from a client.
+ * TODO https://issues.apache.org/jira/browse/IGNITE-27040
Review Comment:
[nitpick] The TODO comment lacks a description of what needs to be
addressed. Consider adding context:
```java
// TODO https://issues.apache.org/jira/browse/IGNITE-27040 Document or
refactor fragmented transaction approach
```
##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTransactionsTest.java:
##########
@@ -855,6 +881,136 @@ void testBatchScenarioWithNoopEnlistmentImplicit() {
assertEquals(batch.size(), view.removeAll(null,
batch.keySet()).size());
}
+ @Test
+ void testImplicitDirectMapping() {
+ Map<Partition, ClusterNode> map =
table().partitionManager().primaryReplicasAsync().join();
+
+ ClientTable table = (ClientTable) table();
+
+ IgniteImpl server0 = TestWrappers.unwrapIgniteImpl(server(0));
+ IgniteImpl server1 = TestWrappers.unwrapIgniteImpl(server(1));
+
+ List<Tuple> tuples0 = generateKeysForNode(600, 2, map,
server0.cluster().localNode(), table);
+ List<Tuple> tuples1 = generateKeysForNode(610, 1, map,
server1.cluster().localNode(), table);
+
+ assertEquals(2, tuples0.size());
+ assertEquals(1, tuples1.size());
+
+ Map<Tuple, Tuple> batch = new HashMap<>();
+
+ for (Tuple tup : tuples0) {
+ batch.put(tup, val(tup.intValue(0) + ""));
+ }
+
+ for (Tuple tup : tuples1) {
+ batch.put(tup, val(tup.intValue(0) + ""));
+ }
+
+ KeyValueView<Tuple, Tuple> view = table.keyValueView();
+ Transaction tx = client().transactions().begin();
+ view.putAll(tx, batch);
+
+ // Should retry until timeout.
+ CompletableFuture<Map<Tuple, Tuple>> fut = view.getAllAsync(null,
batch.keySet());
+
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ assertFalse(fut.isDone());
+ tx.commit();
+
+ assertEquals(batch.size(), fut.join().size(), "Implicit tx should be
retried until timeout");
+
+ // Retry transaction without other locker.
+ assertEquals(batch.size(), view.getAll(null, batch.keySet()).size());
+
+ // Retry expliti transaction.
+ Transaction tx1 = client().transactions().begin();
+ assertEquals(batch.size(), view.getAll(tx1, batch.keySet()).size());
+ tx1.commit();
+
+ // Test if we don't stuck in locks in subsequent rw txn.
+ CompletableFuture<Void> fut0 = CompletableFuture.runAsync(() -> {
+ Transaction tx0 = client().transactions().begin();
+ view.put(tx0, tuples0.get(0), val("newval0"));
+ tx0.commit();
+ });
+
+ CompletableFuture<Void> fut1 = CompletableFuture.runAsync(() -> {
+ view.put(null, tuples1.get(0), val("newval1"));
+ });
+
+ fut0.join();
+ fut1.join();
+ }
+
+ @Test
+ void testImplicitRecordDirectMapping() {
+ Map<Partition, ClusterNode> map =
table().partitionManager().primaryReplicasAsync().join();
+
+ ClientTable table = (ClientTable) table();
+
+ IgniteImpl server0 = TestWrappers.unwrapIgniteImpl(server(0));
+ IgniteImpl server1 = TestWrappers.unwrapIgniteImpl(server(1));
+
+ List<Tuple> keys0 = generateKeysForNode(600, 2, map,
server0.cluster().localNode(), table);
+ List<Tuple> keys1 = generateKeysForNode(610, 1, map,
server1.cluster().localNode(), table);
+
+ assertEquals(2, keys0.size());
+ assertEquals(1, keys1.size());
+
+ List<Tuple> keys = new ArrayList<>();
+ List<Tuple> recsBatch = new ArrayList<>();
+
+ for (Tuple tup : keys0) {
+ recsBatch.add(kv(tup.intValue(0), tup.intValue(0) + ""));
+ keys.add(tup);
+ }
+
+ for (Tuple tup : keys1) {
+ recsBatch.add(kv(tup.intValue(0), tup.intValue(0) + ""));
+ keys.add(tup);
+ }
+
+ RecordView<Tuple> view = table.recordView();
+ Transaction tx = client().transactions().begin();
+ view.upsertAll(tx, recsBatch);
+
+ // Should retry until timeout.
+ CompletableFuture<List<Tuple>> fut = view.getAllAsync(null, keys);
+
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ assertFalse(fut.isDone());
+ tx.commit();
+
+ assertEquals(recsBatch.size(), fut.join().size(), "Implicit tx should
be retried until timeout");
+
+ // Retry transaction without other locker.
+ assertEquals(recsBatch.size(), view.getAll(null, keys).size());
+
+ // Retry explitit transaction.
Review Comment:
Typo in comment: "expliti" should be "explicit"
##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java:
##########
@@ -389,9 +391,9 @@ private void handleReplicaRequest(ReplicaRequest request,
InternalClusterNode se
msg = prepareReplicaResponse(sendTimestamp, res);
} else {
if (indicatesUnexpectedProblem(ex)) {
- throttledLog.warn("Failed to process replica request
[request={}].", ex, request);
+ throttledLog.warn(THROTTLE_REQUEST_KEY, "{}
[request={}].", ex, THROTTLE_REQUEST_KEY, request);
} else {
- throttledLog.debug("Failed to process replica request
[request={}].", ex, request);
+ throttledLog.debug(THROTTLE_REQUEST_KEY, "{}
[request={}].", ex, THROTTLE_REQUEST_KEY, request);
Review Comment:
The log message formatting appears incorrect. The throttle key is being
logged twice - once as the message template placeholder and again as a
parameter. This should be:
```java
throttledLog.warn(THROTTLE_REQUEST_KEY, "Failed to process replica request
[request={}].", ex, request);
```
The current code will result in messages like "Failed to process replica
request [request=...]" being duplicated.
##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTransactionsTest.java:
##########
@@ -855,6 +881,136 @@ void testBatchScenarioWithNoopEnlistmentImplicit() {
assertEquals(batch.size(), view.removeAll(null,
batch.keySet()).size());
}
+ @Test
+ void testImplicitDirectMapping() {
+ Map<Partition, ClusterNode> map =
table().partitionManager().primaryReplicasAsync().join();
+
+ ClientTable table = (ClientTable) table();
+
+ IgniteImpl server0 = TestWrappers.unwrapIgniteImpl(server(0));
+ IgniteImpl server1 = TestWrappers.unwrapIgniteImpl(server(1));
+
+ List<Tuple> tuples0 = generateKeysForNode(600, 2, map,
server0.cluster().localNode(), table);
+ List<Tuple> tuples1 = generateKeysForNode(610, 1, map,
server1.cluster().localNode(), table);
+
+ assertEquals(2, tuples0.size());
+ assertEquals(1, tuples1.size());
+
+ Map<Tuple, Tuple> batch = new HashMap<>();
+
+ for (Tuple tup : tuples0) {
+ batch.put(tup, val(tup.intValue(0) + ""));
+ }
+
+ for (Tuple tup : tuples1) {
+ batch.put(tup, val(tup.intValue(0) + ""));
+ }
+
+ KeyValueView<Tuple, Tuple> view = table.keyValueView();
+ Transaction tx = client().transactions().begin();
+ view.putAll(tx, batch);
+
+ // Should retry until timeout.
+ CompletableFuture<Map<Tuple, Tuple>> fut = view.getAllAsync(null,
batch.keySet());
+
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ assertFalse(fut.isDone());
+ tx.commit();
+
+ assertEquals(batch.size(), fut.join().size(), "Implicit tx should be
retried until timeout");
+
+ // Retry transaction without other locker.
+ assertEquals(batch.size(), view.getAll(null, batch.keySet()).size());
+
+ // Retry expliti transaction.
Review Comment:
Typo in comment: "Retry expliti transaction" should be "Retry explicit
transaction"
##########
modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientRecordView.java:
##########
@@ -481,7 +480,7 @@ public CompletableFuture<List<R>>
deleteAllExactAsync(@Nullable Transaction tx,
return emptyListCompletedFuture();
}
- BiFunction<Collection<R>, PartitionAwarenessProvider,
CompletableFuture<List<R>>> clo = (batch, provider) -> {
+ MapFunction<R, List<R>> clo = (batch, provider, startImlpicit) -> {
Review Comment:
Typo in parameter name: "startImlpicit" should be "startImplicit"
##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTransactionsTest.java:
##########
@@ -531,6 +531,32 @@ static List<Tuple> generateKeysForNode(
return keys;
}
+ static List<Tuple> generateKeysForPartition(
+ int start,
+ int count,
+ Map<Partition, ClusterNode> map,
+ int partId,
+ Table table
+ ) {
+ List<Tuple> keys = new ArrayList<>();
+ PartitionManager partitionManager = table.partitionManager();
+
+ int k = start;
+ while (keys.size() != count) {
+ k++;
+ Tuple t = key(k);
+
+ Partition part = partitionManager.partitionAsync(t).orTimeout(5,
TimeUnit.SECONDS).join();
+ HashPartition hashPart = (HashPartition) part;
+
+ if (hashPart.partitionId() == partId) {
+ keys.add(t);
+ }
+ }
+
+ return keys;
+ }
Review Comment:
[nitpick] The `generateKeysForPartition` function is added but appears to be
unused in the visible test code. If this is intentional for future use,
consider adding a `@SuppressWarnings("unused")` annotation or a comment
explaining its purpose. Otherwise, if it's not needed, it should be removed.
##########
modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java:
##########
@@ -409,12 +435,19 @@ public static TableNotFoundException
tableIdNotFoundException(Integer tableId) {
boolean readOnly = in.unpackBoolean();
long timeoutMillis = in.unpackLong();
- InternalTxOptions txOptions = InternalTxOptions.builder()
- .timeoutMillis(timeoutMillis)
- .build();
+ var builder =
InternalTxOptions.builder().timeoutMillis(timeoutMillis);
+ if (options.contains(RequestOptions.HAS_PRIORITY)) {
+ boolean lowPriority = in.unpackBoolean();
+ // Currently we use low priority with getAll fragments to
avoid conflicts with subsequent explicit RW transactions,
+ // because locks are released asynchronously. This makes
client's getAll a subject for starvation.
+ // TODO https://issues.apache.org/jira/browse/IGNITE-27039
Review Comment:
[nitpick] The TODO comment lacks context about what needs to be done.
Consider adding a description:
```java
// Currently we use low priority with getAll fragments to avoid conflicts
with subsequent explicit RW transactions,
// because locks are released asynchronously. This makes client's getAll a
subject for starvation.
// TODO https://issues.apache.org/jira/browse/IGNITE-27039 Implement proper
priority management or synchronous lock release
```
```suggestion
// TODO: Implement proper priority management or
synchronous lock release to avoid starvation of getAll fragments. See
https://issues.apache.org/jira/browse/IGNITE-27039
```
##########
modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java:
##########
@@ -409,12 +435,19 @@ public static TableNotFoundException
tableIdNotFoundException(Integer tableId) {
boolean readOnly = in.unpackBoolean();
long timeoutMillis = in.unpackLong();
- InternalTxOptions txOptions = InternalTxOptions.builder()
- .timeoutMillis(timeoutMillis)
- .build();
+ var builder =
InternalTxOptions.builder().timeoutMillis(timeoutMillis);
+ if (options.contains(RequestOptions.HAS_PRIORITY)) {
Review Comment:
Extra space before the opening brace in the if condition. Should be:
```java
if (options.contains(RequestOptions.HAS_PRIORITY)) {
```
instead of:
```java
if (options.contains(RequestOptions.HAS_PRIORITY)) {
```
```suggestion
if (options.contains(RequestOptions.HAS_PRIORITY)) {
```
##########
modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java:
##########
@@ -434,7 +438,12 @@ private <T> CompletableFuture<T> send(
return resFut;
} catch (Throwable t) {
- log.warn("Failed to send request [id=" + id + ", op=" + opCode +
", remoteAddress=" + cfg.getAddress() + "]: "
+ if (expectedException) {
+ // Just re-throw.
+ throw sneakyThrow(t);
+ }
+
+ log.warn("Failed to send request sync [id=" + id + ", op=" +
opCode + ", remoteAddress=" + cfg.getAddress() + "]: "
+ t.getMessage(), t);
Review Comment:
The error handling logic has an issue. When `expectedException` is true and
an exception is re-thrown at line 443, the code at lines 446-447 will still
execute because the `throw` at line 443 uses `sneakyThrow()` which doesn't
actually return. This will cause the warning to be logged even for expected
exceptions. Consider restructuring to avoid this:
```java
if (!expectedException) {
log.warn("Failed to send request sync [id=" + id + ", op=" + opCode + ",
remoteAddress=" + cfg.getAddress() + "]: "
+ t.getMessage(), t);
}
throw sneakyThrow(expectedException ? t :
ViewUtils.ensurePublicException(t));
```
##########
modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTableMapUtils.java:
##########
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.table;
+
+import static
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.TX_CLIENT_GETALL_SUPPORTS_PRIORITY;
+import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
+import static org.apache.ignite.lang.ErrorGroups.Transactions.ACQUIRE_LOCK_ERR;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.client.table.ClientTable.Batch;
+import org.apache.ignite.internal.client.table.ClientTable.Reducer;
+import org.apache.ignite.internal.client.tx.ClientLazyTransaction;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.util.CompletableFutures;
+import org.apache.ignite.internal.util.ExceptionUtils;
+import org.apache.ignite.tx.Transaction;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Provides batch map utility methods.
+ */
+class ClientTableMapUtils {
+ // TODO https://issues.apache.org/jira/browse/IGNITE-27073
+ private static final long DEFAULT_IMPLICIT_GET_ALL_TIMEOUT_NANOS =
TimeUnit.MILLISECONDS.toNanos(5000);
Review Comment:
[nitpick] The TODO comment references a JIRA issue but lacks context. It
would be helpful to add a brief description of what needs to be addressed. For
example:
```java
// TODO https://issues.apache.org/jira/browse/IGNITE-27073 Make timeout
configurable
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]