Copilot commented on code in PR #7799:
URL: https://github.com/apache/ignite-3/pull/7799#discussion_r2944806848
##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxMessageSender.java:
##########
@@ -308,4 +315,16 @@ private static Map<ZonePartitionIdMessage,
PartitionEnlistmentMessage> toEnliste
return messages;
}
+
+ /**
+ * Sends a message to kill a transaction to it's coordinator.
+ *
Review Comment:
The `Docstring` says "Sends a message to kill a transaction to it's
coordinator." — "it's" should be "its" (possessive, not contraction).
##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/TpccBenchmarkNodeRunner.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.benchmark;
+
+import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
+import static
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.getAllResultSet;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.io.File;
+import java.nio.file.Path;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteServer;
+import org.apache.ignite.InitParameters;
+import org.apache.ignite.internal.app.IgniteImpl;
+import
org.apache.ignite.internal.failure.handlers.configuration.StopNodeOrHaltFailureHandlerConfigurationSchema;
+import org.apache.ignite.internal.lang.IgniteStringFormatter;
+import org.apache.ignite.internal.testframework.TestIgnitionManager;
+import org.apache.ignite.sql.IgniteSql;
+import org.apache.ignite.sql.ResultSet;
+import org.apache.ignite.sql.SqlRow;
+import org.apache.ignite.sql.Statement;
+import org.apache.ignite.sql.Statement.StatementBuilder;
+import org.apache.ignite.tx.Transaction;
+import org.intellij.lang.annotations.Language;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Extendable class to start a dedicated cluster node for TPC-C benchmark.
+ */
+public class TpccBenchmarkNodeRunner {
+ private static final int BASE_PORT = 3344;
+ private static final int BASE_CLIENT_PORT = 10800;
+ private static final int BASE_REST_PORT = 10300;
+
+ private static final List<IgniteServer> igniteServers = new ArrayList<>();
+
+ protected static Ignite publicIgnite;
+ protected static IgniteImpl igniteImpl;
+
+ public static void main(String[] args) throws Exception {
+ TpccBenchmarkNodeRunner runner = new TpccBenchmarkNodeRunner();
+ runner.startCluster();
+ }
+
+ public IgniteImpl node(int idx) {
+ return unwrapIgniteImpl(igniteServers.get(idx).api());
+ }
+
+ private void startCluster() throws Exception {
+ Path workDir = workDir();
+
+ String connectNodeAddr = "\"localhost:" + BASE_PORT + '\"';
+
+ @Language("HOCON")
+ String configTemplate = "ignite {\n"
+ + " \"network\": {\n"
+ + " \"port\":{},\n"
+ + " \"nodeFinder\":{\n"
+ + " \"netClusterNodes\": [ {} ]\n"
+ + " }\n"
+ + " },\n"
+ + " storage.profiles: {"
+ + " " + DEFAULT_STORAGE_PROFILE + ".engine: aipersist, "
+ + " " + DEFAULT_STORAGE_PROFILE + ".sizeBytes: " +
pageMemorySize() + " "
+ + " },\n"
+ + " clientConnector: { port:{} },\n"
+ + " clientConnector.sendServerExceptionStackTraceToClient:
true\n"
+ + " rest.port: {},\n"
+ + " raft.fsync = " + fsync() + ",\n"
+ + " system.partitionsLogPath = \"" + logPath() + "\",\n"
+ + " failureHandler.handler: {\n"
+ + " type: \"" +
StopNodeOrHaltFailureHandlerConfigurationSchema.TYPE + "\",\n"
+ + " tryStop: true,\n"
+ + " timeoutMillis: 60000,\n" // 1 minute for graceful
shutdown
+ + " },\n"
+ + "}";
+
+ for (int i = 0; i < nodes(); i++) {
+ int port = BASE_PORT + i;
+ String nodeName = nodeName(port);
+
+ String config = IgniteStringFormatter.format(configTemplate, port,
connectNodeAddr,
+ BASE_CLIENT_PORT + i, BASE_REST_PORT + i);
+
+
igniteServers.add(TestIgnitionManager.startWithProductionDefaults(nodeName,
config, workDir.resolve(nodeName)));
+ }
+
+ String metaStorageNodeName = nodeName(BASE_PORT);
+
+ InitParameters initParameters = InitParameters.builder()
+ .metaStorageNodeNames(metaStorageNodeName)
+ .clusterName("cluster")
+ .clusterConfiguration(clusterConfiguration())
+ .build();
+
+ TestIgnitionManager.init(igniteServers.get(0), initParameters);
+
+ for (IgniteServer node : igniteServers) {
+ assertThat(node.waitForInitAsync(), willCompleteSuccessfully());
+
+ if (publicIgnite == null) {
+ publicIgnite = node.api();
+ igniteImpl = unwrapIgniteImpl(publicIgnite);
+ }
+ }
+ }
+
+ @Nullable
+ protected String clusterConfiguration() {
+ return "ignite {}";
+ }
+
+ protected static String nodeName(int port) {
+ return "node_" + port;
+ }
+
+ protected Path workDir() throws Exception {
+ return new File("c:/work/tpcc").toPath();
Review Comment:
The `TpccBenchmarkNodeRunner` contains a hardcoded Windows path
(`c:/work/tpcc`). This class appears to be a developer-local benchmark runner
and shouldn't be committed to the repository.
##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/WoundWaitDeadlockPreventionPolicy.java:
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx.impl;
+
+import java.util.Comparator;
+import java.util.UUID;
+import org.apache.ignite.internal.tx.DeadlockPreventionPolicy;
+import org.apache.ignite.internal.tx.Waiter;
+
+/**
+ * Wound-wait prevention policy. TODO desc.
+ */
Review Comment:
Javadoc says "TODO desc." — the class documentation is incomplete.
##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java:
##########
@@ -1203,6 +1238,46 @@ public CompletableFuture<Void>
discardLocalWriteIntents(List<EnlistedPartitionGr
});
}
+ @Override
+ public <T> T runInTransaction(Function<Transaction, T> clo,
HybridTimestampTracker observableTimestampTracker,
+ @Nullable TransactionOptions options) {
+ boolean readOnly = options != null && options.readOnly();
+
+ InternalTxOptions internalTxOptions = options == null
+ ? InternalTxOptions.defaults()
+ : InternalTxOptions.builder()
+ .timeoutMillis(options.timeoutMillis())
+ .txLabel(options.label())
+ .build();
+
+ long startTimestamp = IgniteUtils.monotonicMs();
+ long timeout = getTimeoutOrDefault(internalTxOptions,
txConfig.readWriteTimeoutMillis().value());
+ long initialTimeout = startTimestamp + timeout;
+
+ return runInTransactionInternal(old -> {
+ InternalTxOptions opts;
+ if (old != null) {
+// InternalTransaction oldInt = (InternalTransaction) old;
+// UUID id = oldInt.id();
+//
+// int cnt = TransactionIds.retryCnt(id);
+// int nodeId = TransactionIds.nodeId(id);
+// TxPriority priority = TransactionIds.priority(id);
+// UUID retryId =
TransactionIds.transactionId(id.getMostSignificantBits(), cnt + 1, nodeId,
priority);
+
+ opts =
InternalTxOptions.builder().priority(internalTxOptions.priority())
+ //.retryId(retryId)
+ .timeoutMillis(timeout) // TODO
+ .txLabel(internalTxOptions.txLabel()).build();
+
+ //LOG.info("Restarting the transaction [oldId=" + id + ",
newId=" + retryId + ", remaining=" + opts.timeoutMillis());
+ } else {
+ opts = internalTxOptions;
+ }
+ return beginExplicit(observableTimestampTracker, readOnly, opts);
+ }, clo, startTimestamp, initialTimeout);
Review Comment:
The `runInTransaction` method contains significant amounts of commented-out
code (lines 1260-1266, 1269, 1273). This looks like incomplete/experimental
work that shouldn't be merged as-is.
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionInflights.java:
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.replicator;
+
+import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Predicate;
+import
org.apache.ignite.internal.partition.replicator.network.replication.RequestType;
+import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.TestOnly;
+
+/**
+ * Client transaction inflights tracker.
+ */
+public class PartitionInflights {
+ /** Hint for maximum concurrent txns. */
+ private static final int MAX_CONCURRENT_TXNS_HINT = 1024;
+
+ /** Txn contexts. */
+ private final ConcurrentHashMap<UUID, CleanupContext> txCtxMap = new
ConcurrentHashMap<>(MAX_CONCURRENT_TXNS_HINT);
+
+ /**
+ * Registers the inflight update for a transaction.
+ *
+ * @param txId The transaction id.
+ */
+ public @Nullable CleanupContext addInflight(UUID txId, Predicate<UUID>
testPred, RequestType requestType) {
+ boolean[] res = {true};
+
+ CleanupContext ctx0 = txCtxMap.compute(txId, (uuid, ctx) -> {
+ if (ctx == null) {
+ ctx = new CleanupContext();
+ }
+
+ //ctx.opFuts.add(new IgniteBiTuple<>(new Exception(), fut));
+
+ if (ctx.finishFut != null || testPred.test(txId)) {
+ res[0] = false;
+ } else {
+ //ctx.addInflight();
+ ctx.inflights.incrementAndGet();
+ ctx.hasWrites = true;
+ }
+
+ return ctx;
+ });
+
+ return res[0] ? ctx0 : null;
+ }
+
+ /**
+ * Unregisters the inflight for a transaction.
+ *
+ * @param ctx Cleanup context.
+ */
+ public static void removeInflight(CleanupContext ctx) {
+ long val = ctx.inflights.decrementAndGet();
+
+ if (ctx.finishFut != null && val == 0) {
+ ctx.finishFut.complete(null);
+ }
+ }
+
+ /**
+ * Get finish future.
+ *
+ * @param txId Transaction id.
+ * @return The future.
+ */
+ public @Nullable CleanupContext finishFuture(UUID txId) {
+ return txCtxMap.compute(txId, (uuid, ctx) -> {
+ if (ctx == null) {
+ return null;
+ }
+
+ // LOG.info("DBG: finishFuture " + txId + " " + ctx.inflights);
+
+ if (ctx.finishFut == null) {
+ ctx.finishFut = ctx.inflights.get() == 0 ?
nullCompletedFuture() : new CompletableFuture<>();
+ }
+
+ // Avoiding a data race with a concurrent decrementing thread,
which might not see finishFut publication.
+ if (ctx.inflights.get() == 0 && !ctx.finishFut.isDone()) {
+ ctx.finishFut = nullCompletedFuture();
+ }
+
+ return ctx;
+ });
+ }
+
+ /**
+ * Cleanup inflights context for this transaction.
+ *
+ * @param uuid Tx id.
+ */
+ public void erase(UUID uuid) {
+ txCtxMap.remove(uuid);
+ }
+
+ /**
+ * Check if the inflights map contains a given transaction.
+ *
+ * @param txId Tx id.
+ * @return {@code True} if contains.
+ */
+ public boolean contains(UUID txId) {
+ return txCtxMap.containsKey(txId);
+ }
+
+ /**
+ * Shared Cleanup context.
+ */
+ public static class CleanupContext {
+ volatile CompletableFuture<Void> finishFut;
+ AtomicLong inflights = new AtomicLong(0); // TODO atomic updater
+ volatile boolean hasWrites = false;
+
+// void addInflight() {
+// inflights.incrementAndGet();
+// }
+//
+// void removeInflight(UUID txId) {
+// //assert inflights > 0 : format("No inflights, cannot remove any
[txId={}, ctx={}]", txId, this);
+//
+// inflights.decrementAndGet();
+// }
+ }
+
+ @TestOnly
+ public ConcurrentHashMap<UUID, CleanupContext> map() {
+ return txCtxMap;
+ }
+}
Review Comment:
`PartitionInflights` has commented-out code (lines 54, 59, 137-145) and TODO
comments (line 134). The class also exposes mutable internal state via `map()`
annotated `@TestOnly`, and the `CleanupContext` inner class has public mutable
fields (`finishFut`, `inflights`, `hasWrites`) instead of proper encapsulation.
This needs cleanup before merge.
##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/IgniteTransactionsImpl.java:
##########
@@ -80,4 +83,18 @@ public InternalTransaction beginImplicit(boolean readOnly) {
public Transaction beginWithPriority(boolean readOnly, TxPriority
priority) {
return txManager.beginExplicit(observableTimestampTracker, readOnly,
InternalTxOptions.defaultsWithPriority(priority));
}
+
+ @Override
+ public <T> T runInTransaction(Function<Transaction, T> clo, @Nullable
TransactionOptions options) throws TransactionException {
+ return txManager.runInTransaction(clo, observableTimestampTracker,
options);
+ }
+
+ @Override
+ public <T> CompletableFuture<T> runInTransactionAsync(
+ Function<Transaction, CompletableFuture<T>> clo,
+ @Nullable TransactionOptions options
+ ) {
+ //return txManager.runInTransaction(clo, observableTimestampTracker,
tx);
+ return CompletableFuture.failedFuture(new Exception());
Review Comment:
`runInTransactionAsync` returns a `failedFuture(new Exception())` — this is
a stub that will break any caller of this API. Either implement it properly or
throw `UnsupportedOperationException` with a clear message.
##########
modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransactions.java:
##########
@@ -108,4 +110,15 @@ private static ClientTransaction readTx(
return new ClientTransaction(r.clientChannel(), ch, id, isReadOnly,
EMPTY, null, EMPTY, null, timeout);
}
+
+ @Override
+ public <T> T runInTransaction(Function<Transaction, T> clo, @Nullable
TransactionOptions options) throws TransactionException {
+ throw new IllegalArgumentException();
+ }
+
+ @Override
+ public <T> CompletableFuture<T>
runInTransactionAsync(Function<Transaction, CompletableFuture<T>> clo,
+ @Nullable TransactionOptions options) {
+ throw new IllegalArgumentException();
Review Comment:
The `ClientTransactions` implementation throws `IllegalArgumentException`
instead of providing a proper implementation or
`UnsupportedOperationException`. This will produce confusing errors for thin
client users.
##########
modules/transactions/src/testFixtures/java/org/apache/ignite/internal/tx/test/LockWaiterMatcher.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx.test;
+
+import java.util.UUID;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.ignite.internal.tx.Lock;
+import org.hamcrest.Description;
+import org.hamcrest.TypeSafeMatcher;
+
+public class LockWaiterMatcher extends
TypeSafeMatcher<CompletableFuture<Lock>> {
+ private final UUID waiterId;
+ private CompletableFuture<Lock> item;
+
+ private LockWaiterMatcher(UUID txId) {
+ this.waiterId = txId;
+ }
+
+ @Override
+ protected boolean matchesSafely(CompletableFuture<Lock> item) {
+ try {
+ this.item = item;
+ item.get(50, TimeUnit.MILLISECONDS);
+ return false; // Timeout exception is expected.
+ } catch (TimeoutException e) {
+ return true;
+ } catch (InterruptedException | ExecutionException |
CancellationException e) {
+ throw new AssertionError(e);
+ }
+ }
+
+ @Override
+ protected void describeMismatchSafely(CompletableFuture<Lock> item,
Description mismatchDescription) {
+ mismatchDescription.appendText("lock future is completed
").appendValue(item);
+ }
+
+ @Override
+ public void describeTo(Description description) {
+ description.appendText("lock future which should wait for
").appendValue(waiterId);
+ }
+
+ public static LockWaiterMatcher waitsFor(UUID... txIds) {
+ return new LockWaiterMatcher(txIds[0]);
+ }
Review Comment:
The `LockWaiterMatcher` has an unused field `item` (line 32) that is
assigned in `matchesSafely` but never read. Additionally, the `waitsFor(UUID...
txIds)` factory method accepts varargs but only uses `txIds[0]`, silently
ignoring additional arguments. This is misleading API design.
##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java:
##########
@@ -1031,15 +1035,44 @@ public void processDelayedAck(Object ignored, @Nullable
Throwable err) {
@Override
public CompletableFuture<Void> startAsync(ComponentContext
componentContext) {
- var deadlockPreventionPolicy = new
DeadlockPreventionPolicyImpl(DEFAULT_TX_ID_COMPARATOR, DEFAULT_LOCK_TIMEOUT);
+ var deadlockPreventionPolicy = new WoundWaitDeadlockPreventionPolicy()
{
+ @Override
+ public long waitTimeout() {
+ return DEFAULT_LOCK_TIMEOUT;
+ }
+
+ @Override
+ public void failAction(UUID owner) {
+ // TODO resolve tx with ABORT and delete locks
+ TxStateMeta state = txStateVolatileStorage.state(owner);
+ if (state == null || state.txCoordinatorId() == null) {
+ return; // tx state is invalid. locks should be cleaned up
by tx recovery process.
+ }
+
+ InternalClusterNode coordinator =
topologyService.getById(state.txCoordinatorId());
+ if (coordinator == null) {
+ return; // tx is abandoned. locks should be cleaned up by
tx recovery process.
+ }
+
+ txMessageSender.kill(coordinator, owner);
+ }
+ };
+
+// var deadlockPreventionPolicy = new WaitDieDeadlockPreventionPolicy()
{
+// @Override
+// public long waitTimeout() {
+// return DEFAULT_LOCK_TIMEOUT;
+// }
+// };
Review Comment:
There is a large block of commented-out code (lines 1061-1066) and multiple
TODO/debug comments throughout this method. This should be cleaned up before
merging.
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TraceableFuture.java:
##########
@@ -0,0 +1,20 @@
+package org.apache.ignite.internal.table.distributed.replicator;
+
+import java.io.StringWriter;
+import java.util.concurrent.CompletableFuture;
+
+public class TraceableFuture<T> extends CompletableFuture<T> {
+ private StringWriter log = new StringWriter();
+
+ public synchronized void log(String msg) {
+ log.append("<" + msg + ">");
+ }
+
+ public String message() {
+ String str;
+ synchronized (this) {
+ str = log.toString();
+ }
+ return str;
+ }
+}
Review Comment:
`TraceableFuture` is missing a license header. Also, it appears to be a
debug utility that is not used in production code — it should be removed or
moved to test fixtures.
##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxMessageSender.java:
##########
@@ -132,6 +137,8 @@ public CompletableFuture<NetworkMessage> cleanup(
boolean commit,
@Nullable HybridTimestamp commitTimestamp
) {
+ //LOG.info("DBG: send cleanup " + txId);
Review Comment:
Commented-out debug logging (`//LOG.info("DBG: send cleanup"...`) should be
removed before merging.
##########
modules/api/src/main/java/org/apache/ignite/tx/IgniteTransactions.java:
##########
@@ -342,14 +331,8 @@ default <T> CompletableFuture<T>
runInTransactionAsync(Function<Transaction, Com
* @param <T> Closure result type.
* @return The result.
*/
- default <T> CompletableFuture<T> runInTransactionAsync(
+ <T> CompletableFuture<T> runInTransactionAsync(
Function<Transaction, CompletableFuture<T>> clo,
@Nullable TransactionOptions options
- ) {
- // This start timestamp is not related to transaction's begin
timestamp and only serves as local time for counting the timeout of
- // possible retries.
- long startTimestamp = System.currentTimeMillis();
- long initialTimeout = options == null ?
TimeUnit.SECONDS.toMillis(DEFAULT_RW_TX_TIMEOUT_SECONDS) :
options.timeoutMillis();
- return runInTransactionAsyncInternal(this, clo, options,
startTimestamp, initialTimeout, null);
- }
+ );
Review Comment:
The `runInTransaction` default implementations were removed from
`IgniteTransactions` interface and made abstract. This is a breaking API change
— any existing third-party implementation of `IgniteTransactions` will fail to
compile. The previous design with default methods was more backwards-compatible.
##########
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ThreadAssertingMvPartitionStorage.java:
##########
@@ -87,7 +87,7 @@ public void committedGroupConfiguration(byte[] config) {
@Override
public ReadResult read(RowId rowId, HybridTimestamp timestamp) throws
StorageException {
- assertThreadAllowsToRead();
+ //assertThreadAllowsToRead();
Review Comment:
This line disables a thread safety assertion (`assertThreadAllowsToRead`).
This looks like a debug workaround that should not be merged — it weakens the
thread safety guarantees of the storage layer.
##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/HeapLockManager.java:
##########
@@ -104,14 +111,18 @@ public class HeapLockManager extends
AbstractEventProducer<LockEvent, LockEventP
private Executor delayedExecutor;
/** Enlisted transactions. */
- private final ConcurrentHashMap<UUID, ConcurrentLinkedQueue<Releasable>>
txMap = new ConcurrentHashMap<>(1024);
+ private final ConcurrentHashMap<UUID, SealableQueue> txMap = new
ConcurrentHashMap<>(1024);
/** Coarse locks. */
private final ConcurrentHashMap<Object, CoarseLockState> coarseMap = new
ConcurrentHashMap<>();
/** Tx state required to present tx labels in logs and exceptions. */
private final VolatileTxStateMetaStorage txStateVolatileStorage;
+ private static class SealableQueue extends
ConcurrentLinkedQueue<Releasable> {
+ boolean sealed;
Review Comment:
The `sealed` field on `SealableQueue` is not volatile or otherwise
synchronized, but it's accessed under `txMap.compute` and also read
independently via the `sealed()` method. This can lead to visibility issues in
`tryAcquireInternal` where `sealed(waiter.txId)` is called outside of
`txMap.compute`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]