timoninmaxim commented on code in PR #11528:
URL: https://github.com/apache/ignite/pull/11528#discussion_r1795307233
##########
modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java:
##########
@@ -160,6 +164,12 @@ public class JdbcThinTcpIo {
/** Protocol context (version, supported features, etc). */
private JdbcProtocolContext protoCtx;
+ /**
+ * Transaction modes supported by the server.
+ * @see
org.apache.ignite.configuration.TransactionConfiguration#TX_AWARE_QUERIES_SUPPORTED_MODES
+ */
+ private Set<TransactionIsolation> isolationLevelsSupported;
Review Comment:
final
##########
modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcConnectionContext.java:
##########
@@ -274,6 +284,41 @@ public JdbcConnectionContext(GridKernalContext ctx,
GridNioSession ses, GridSpin
super.onDisconnected();
}
+ /** {@inheritDoc} */
+ @Override public ClientTxContext txContext(int txId) {
+ ensureSameTransaction(txId);
+
+ return txCtx;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void addTxContext(ClientTxContext txCtx) {
+ if (this.txCtx != null)
+ throw new IgniteSQLException("Too many transactions",
IgniteQueryErrorCode.QUERY_CANCELED);
+
+ this.txCtx = txCtx;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void removeTxContext(int txId) {
+ ensureSameTransaction(txId);
+
+ txCtx = null;
+ }
+
+ /** */
+ private void ensureSameTransaction(int txId) {
+ if (txCtx != null && txCtx.txId() != txId) {
+ throw new IllegalStateException("Unknown transaction " +
+ "[serverTxId=" + (txCtx == null ? null : txCtx.txId()) + ",
txId=" + txId + ']');
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void cleanupTxs() {
+ txCtx = null;
Review Comment:
`txCtx.close()` before setting to null? Similar to `ClientConnectionContext`
##########
modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/ConnectionPropertiesImpl.java:
##########
@@ -247,6 +248,23 @@ public class ConnectionPropertiesImpl implements
ConnectionProperties, Serializa
private final StringProperty qryEngine = new StringProperty("queryEngine",
"Use specified SQL query engine for a connection.", null, null, false,
null);
+ /** Transaction concurrency. */
+ private StringProperty transactionConcurrency = new
StringProperty("transactionConcurrency",
Review Comment:
final
##########
modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerAbstractConnectionContext.java:
##########
@@ -36,6 +38,9 @@
* Base connection context.
*/
public abstract class ClientListenerAbstractConnectionContext implements
ClientListenerConnectionContext {
+ /** */
Review Comment:
Let's add doc, that it is used as value for `txId`.
##########
modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/ConnectionPropertiesImpl.java:
##########
@@ -247,6 +248,23 @@ public class ConnectionPropertiesImpl implements
ConnectionProperties, Serializa
private final StringProperty qryEngine = new StringProperty("queryEngine",
"Use specified SQL query engine for a connection.", null, null, false,
null);
+ /** Transaction concurrency. */
+ private StringProperty transactionConcurrency = new
StringProperty("transactionConcurrency",
+ "Transaction concurrencty level.",
+ TransactionConcurrency.OPTIMISTIC.name(),
+ new String[]{TransactionConcurrency.OPTIMISTIC.name(),
TransactionConcurrency.PESSIMISTIC.name()},
+ false,
+ null
+ );
+
+ /** JDBC transaction timeout. */
+ private IntegerProperty transactionTimeout = new
IntegerProperty("transactionTimeout",
Review Comment:
final
##########
modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java:
##########
@@ -296,6 +307,11 @@ public JdbcThinConnection(ConnectionProperties connProps)
throws SQLException {
baseEndpointVer = null;
}
+
+ holdability = isTxAwareQueriesSupported ? CLOSE_CURSORS_AT_COMMIT :
HOLD_CURSORS_OVER_COMMIT;
+ txIsolation = defaultTransactionIsolation();
+
+ updateTransactionParameters();
Review Comment:
Looks like setting transaction params can be done on handshake. To avoid
multiple sequential requests in case of partition awareness.
##########
modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientTxSupport.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.odbc;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import
org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
+import
org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
+import
org.apache.ignite.internal.processors.platform.client.tx.ClientTxContext;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+
+/** */
+public interface ClientTxSupport {
+ /**
+ * Starts new client transaction.
+ *
+ * @param ctx Client connection context.
+ * @param concurrency Transaction concurrency.
+ * @param isolation Transaction isolation.
+ * @param timeout Transaction timeout.
+ * @param lb Transaction label.
+ * @return Transaction id.
+ */
+ default int startClientTransaction(
+ ClientListenerAbstractConnectionContext ctx,
+ TransactionConcurrency concurrency,
+ TransactionIsolation isolation,
+ long timeout,
+ String lb
+ ) {
+ GridNearTxLocal tx;
+
+ ctx.kernalContext().gateway().readLock();
+
+ try {
+ tx = ctx.kernalContext().cache().context().tm().newTx(
+ false,
+ false,
+ null,
+ concurrency,
+ isolation,
+ timeout,
+ true,
+ 0,
+ lb
+ );
+ }
+ finally {
+ ctx.kernalContext().gateway().readUnlock();
+ }
+
+ try {
+ tx.suspend();
+
+ int txId = ctx.nextTxId();
+
+ ctx.addTxContext(new ClientTxContext(txId, tx));
+
+ return txId;
+ }
+ catch (Exception e) {
+ try {
+ tx.close();
+ }
+ catch (Exception e1) {
+ e.addSuppressed(e1);
+ }
+
+ throw startTxException(e);
+ }
+ }
+
+ /**
+ * End transaction asynchronously.
+ * @param ctx Client connection context.
+ * @param txId Transaction id.
+ * @param committed If {@code true} transaction must be committed,
rollback otherwise.
+ */
+ default IgniteInternalFuture<IgniteInternalTx>
endTxAsync(ClientListenerAbstractConnectionContext ctx, int txId, boolean
committed) {
+ ClientTxContext txCtx = ctx.txContext(txId);
+
+ if (txCtx == null && !committed)
+ return new GridFinishedFuture<>();
+
+ if (txCtx == null)
+ throw transactionNotFoundException();
+
+ try {
+ txCtx.acquire(committed);
+
+ if (committed)
+ return txCtx.tx().context().commitTxAsync(txCtx.tx());
+ else
+ return txCtx.tx().rollbackAsync();
+ }
+ catch (IgniteCheckedException e) {
+ throw endTxException(e);
+ }
+ finally {
+ ctx.removeTxContext(txId);
+
+ try {
+ txCtx.release(false);
+ }
+ catch (IgniteCheckedException ignore) {
+ // No-op.
+ }
+ }
+ }
+
+ /**
+ * @param cause Exception cause.
+ * @return Protocol specific start transaction exception.
+ */
+ default RuntimeException startTxException(Exception cause) {
Review Comment:
There is no need to build hierarchy for these one-liners for exception
handling. Let's make class TransactionUtils with static methods. Exceptions can
be re-throwed in `start` and `end` and then re-catched in callers.
##########
modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java:
##########
@@ -2464,4 +2610,72 @@ boolean handleResult(long reqId, JdbcResult res) {
return handled;
}
}
+
+ /** Transaction context. */
+ private class TxContext {
+ /** IO to transaction coordinator. */
+ final JdbcThinTcpIo txIo;
+
+ /** Transaction id. */
+ final int txId;
+
+ /** Closed flag. */
+ boolean closed;
+
+ /** Tracked statements to close results on transaction end. */
+ private final Set<JdbcThinStatement> stmts =
Collections.newSetFromMap(new IdentityHashMap<>());
+
+ /** */
+ public TxContext(JdbcThinTcpIo txIo, int txId) {
+ assert !autoCommit;
+
+ this.txIo = txIo;
+ this.txId = txId;
+ }
+
+ /** */
+ public void end(boolean commit) throws SQLException {
+ if (closed)
+ return;
+
+ closed = true;
+
+ for (JdbcThinStatement stmt : stmts)
+ stmt.closeResults();
Review Comment:
why not just close?
##########
modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java:
##########
@@ -746,8 +781,80 @@ else if (X.cause(e, IgniteSQLException.class) != null) {
}
}
+ /**
+ * Invokes {@code qry} inside a transaction with id equals to {@code txId}.
+ */
+ private List<FieldsQueryCursor<List<?>>> invokeInTransaction(
+ int txId,
+ boolean autoCommit,
+ SqlFieldsQueryEx qry,
+ GridQueryCancel cancel
+ ) throws IgniteCheckedException {
+ ClientTxContext txCtx = connCtx.txContext(txId);
+
+ if (txCtx == null)
+ throw new IgniteException("Transaction not found [txId=" + txId +
']');
+
+ boolean err = false;
+
+ try {
+ txCtx.acquire(true);
+
+ return invokeOutsideTransaction(qry, cancel);
+ }
+ catch (Exception e) {
+ err = true;
+
+ if (autoCommit)
Review Comment:
is invoked in finally block. It's enough to persist `err` field and use as
param in `endTransaction(qry, txId, err == null)`
##########
modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerAbstractConnectionContext.java:
##########
@@ -149,6 +159,39 @@ public String clientDescriptor() {
return clientDesc;
}
+ /**
+ * Next transaction id for this connection.
+ */
+ public int nextTxId() {
+ int txId = txIdSeq.incrementAndGet();
+
+ return txId == NO_TX ? txIdSeq.incrementAndGet() : txId;
+ }
+
+ /**
+ * Transaction context by transaction id.
+ *
+ * @param txId Tx ID.
+ */
+ public abstract ClientTxContext txContext(int txId);
Review Comment:
not fixed
##########
modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcSetTxParametersRequest.java:
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.odbc.jdbc;
+
+import org.apache.ignite.binary.BinaryObjectException;
+import org.apache.ignite.internal.binary.BinaryReaderExImpl;
+import org.apache.ignite.internal.binary.BinaryWriterExImpl;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * JDBC sets transactions parameters for connection.
+ */
+public class JdbcSetTxParametersRequest extends JdbcRequest {
+ /** Transaction concurrency control. */
+ private TransactionConcurrency concurrency;
+
+ /** Transaction isolation level. */
+ private @Nullable TransactionIsolation isolation;
+
+ /** Transaction timeout. */
+ private long timeout;
Review Comment:
int
##########
modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java:
##########
@@ -746,8 +781,80 @@ else if (X.cause(e, IgniteSQLException.class) != null) {
}
}
+ /**
+ * Invokes {@code qry} inside a transaction with id equals to {@code txId}.
+ */
+ private List<FieldsQueryCursor<List<?>>> invokeInTransaction(
+ int txId,
+ boolean autoCommit,
+ SqlFieldsQueryEx qry,
+ GridQueryCancel cancel
+ ) throws IgniteCheckedException {
+ ClientTxContext txCtx = connCtx.txContext(txId);
+
+ if (txCtx == null)
+ throw new IgniteException("Transaction not found [txId=" + txId +
']');
+
+ boolean err = false;
+
+ try {
+ txCtx.acquire(true);
+
+ return invokeOutsideTransaction(qry, cancel);
+ }
+ catch (Exception e) {
+ err = true;
+
+ if (autoCommit)
+ endTransaction(qry, txId, false);
+
+ throw e;
+ }
+ finally {
+ try {
+ txCtx.release(true);
+ }
+ catch (Exception e) {
+ log.warning("Failed to release client transaction context", e);
+ }
+
+ if (autoCommit && !err)
+ endTransaction(qry, txId, true);
+ }
+ }
+
+ /** @return {@code True} if transaction enabled fro connection, {@code
false} otherwise. */
+ private boolean txEnabledForConnection() {
+ return
connCtx.protocolContext().isFeatureSupported(JdbcThinFeature.TX_AWARE_QUERIES)
+ && cliCtx.isolation() != null;
+ }
+
+ /** */
+ private int txId(int txId) {
+ if (txId != NONE_TX || !txEnabledForConnection())
+ return txId;
+
+ return startClientTransaction(
+ connCtx,
+ cliCtx.concurrency(),
+ cliCtx.isolation(),
+ cliCtx.transactionTimeout(),
+ cliCtx.transactionLabel()
+ );
+ }
+
/** */
- private List<FieldsQueryCursor<List<?>>> querySqlFields(SqlFieldsQueryEx
qry, GridQueryCancel cancel) {
+ private void endTransaction(SqlFieldsQueryEx qry, int txId, boolean
committed) throws IgniteCheckedException {
+ IgniteInternalFuture<IgniteInternalTx> endTxFut = endTxAsync(connCtx,
txId, committed);
+
+ if (qry.getTimeout() != -1)
+ endTxFut.get(qry.getTimeout());
+ else
+ endTxFut.get();
+ }
+
+ /** */
+ private List<FieldsQueryCursor<List<?>>>
invokeOutsideTransaction(SqlFieldsQueryEx qry, GridQueryCancel cancel) {
Review Comment:
Let's name it `querySqlFields`? Currently name is misleading as It's
actually perform in transaction if called from `invokeInTransaction`.
##########
modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinConnection.java:
##########
@@ -613,18 +709,27 @@ private void checkCursorOptions(int resSetType, int
resSetConcurrency) throws SQ
ensureNotClosed();
switch (level) {
+ case Connection.TRANSACTION_NONE:
+ break;
case Connection.TRANSACTION_READ_UNCOMMITTED:
+ if (isTxAwareQueriesSupported)
+ throw new SQLException("Requested isolation level not
supported by the server: " + level);
+
case Connection.TRANSACTION_READ_COMMITTED:
case Connection.TRANSACTION_REPEATABLE_READ:
case Connection.TRANSACTION_SERIALIZABLE:
- case Connection.TRANSACTION_NONE:
+ if (isTxAwareQueriesSupported &&
!isolationLevelSupported(level))
+ throw new SQLException("Requested isolation level not
supported by the server: " + level);
+
break;
default:
throw new SQLException("Invalid transaction isolation level.",
SqlStateCode.INVALID_TRANSACTION_LEVEL);
}
txIsolation = level;
+
+ updateTransactionParameters();
Review Comment:
Should do nothing if level didn't change.
##########
modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java:
##########
@@ -746,8 +781,80 @@ else if (X.cause(e, IgniteSQLException.class) != null) {
}
}
+ /**
+ * Invokes {@code qry} inside a transaction with id equals to {@code txId}.
+ */
+ private List<FieldsQueryCursor<List<?>>> invokeInTransaction(
+ int txId,
+ boolean autoCommit,
+ SqlFieldsQueryEx qry,
+ GridQueryCancel cancel
+ ) throws IgniteCheckedException {
+ ClientTxContext txCtx = connCtx.txContext(txId);
+
+ if (txCtx == null)
+ throw new IgniteException("Transaction not found [txId=" + txId +
']');
+
+ boolean err = false;
+
+ try {
+ txCtx.acquire(true);
+
+ return invokeOutsideTransaction(qry, cancel);
+ }
+ catch (Exception e) {
+ err = true;
+
+ if (autoCommit)
+ endTransaction(qry, txId, false);
+
+ throw e;
+ }
+ finally {
+ try {
+ txCtx.release(true);
+ }
+ catch (Exception e) {
+ log.warning("Failed to release client transaction context", e);
+ }
+
+ if (autoCommit && !err)
+ endTransaction(qry, txId, true);
+ }
+ }
+
+ /** @return {@code True} if transaction enabled fro connection, {@code
false} otherwise. */
+ private boolean txEnabledForConnection() {
+ return
connCtx.protocolContext().isFeatureSupported(JdbcThinFeature.TX_AWARE_QUERIES)
+ && cliCtx.isolation() != null;
+ }
+
+ /** */
+ private int txId(int txId) {
+ if (txId != NONE_TX || !txEnabledForConnection())
+ return txId;
+
+ return startClientTransaction(
+ connCtx,
+ cliCtx.concurrency(),
+ cliCtx.isolation(),
+ cliCtx.transactionTimeout(),
+ cliCtx.transactionLabel()
+ );
+ }
+
/** */
- private List<FieldsQueryCursor<List<?>>> querySqlFields(SqlFieldsQueryEx
qry, GridQueryCancel cancel) {
+ private void endTransaction(SqlFieldsQueryEx qry, int txId, boolean
committed) throws IgniteCheckedException {
Review Comment:
You don't need qry, only timeout
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]