This is an automated email from the ASF dual-hosted git repository.
duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new f84f01a915f Add switching transaction rule test case (#34207)
f84f01a915f is described below
commit f84f01a915fff329a322e414fc3506bea355167f
Author: ZhangCheng <[email protected]>
AuthorDate: Mon Dec 30 13:08:45 2024 +0800
Add switching transaction rule test case (#34207)
---
.../SwitchingTransactionRuleTestCase.java | 237 +++++++++++++++++++++
1 file changed, 237 insertions(+)
diff --git
a/test/e2e/operation/transaction/src/test/java/org/apache/shardingsphere/test/e2e/transaction/cases/alterresource/SwitchingTransactionRuleTestCase.java
b/test/e2e/operation/transaction/src/test/java/org/apache/shardingsphere/test/e2e/transaction/cases/alterresource/SwitchingTransactionRuleTestCase.java
new file mode 100644
index 00000000000..219cd3b5d0f
--- /dev/null
+++
b/test/e2e/operation/transaction/src/test/java/org/apache/shardingsphere/test/e2e/transaction/cases/alterresource/SwitchingTransactionRuleTestCase.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.test.e2e.transaction.cases.alterresource;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import
org.apache.shardingsphere.test.e2e.transaction.cases.base.BaseTransactionTestCase;
+import
org.apache.shardingsphere.test.e2e.transaction.engine.base.TransactionContainerComposer;
+import
org.apache.shardingsphere.test.e2e.transaction.engine.base.TransactionTestCase;
+import
org.apache.shardingsphere.test.e2e.transaction.engine.command.CommonSQLCommand;
+import
org.apache.shardingsphere.test.e2e.transaction.engine.constants.TransactionTestConstants;
+import org.apache.shardingsphere.transaction.api.TransactionType;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+
+import javax.sql.DataSource;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Switching transaction rule test case.
+ */
+@Slf4j
+@TransactionTestCase(adapters = TransactionTestConstants.PROXY, dbTypes =
TransactionTestConstants.MYSQL)
+public final class SwitchingTransactionRuleTestCase extends
BaseTransactionTestCase {
+
+ private static final int THREAD_SIZE = 1;
+
+ private static final int TRANSACTION_SIZE = 1000;
+
+ private static final int MAX_SWITCH_COUNT = 6;
+
+ private static final AtomicBoolean IS_FINISHED = new AtomicBoolean(false);
+
+ private static final AtomicInteger SWITCH_COUNT = new AtomicInteger();
+
+ public SwitchingTransactionRuleTestCase(final TransactionTestCaseParameter
testCaseParam) {
+ super(testCaseParam);
+ }
+
+ @Override
+ protected void executeTest(final TransactionContainerComposer
containerComposer) throws SQLException {
+ innerRun(containerComposer);
+ }
+
+ @SneakyThrows(InterruptedException.class)
+ private void innerRun(final TransactionContainerComposer
containerComposer) {
+ List<Thread> tasks = new ArrayList<>(THREAD_SIZE);
+ for (int i = 0; i < THREAD_SIZE; i++) {
+ Thread updateThread = new Thread(new
TransactionOperationsTask(getDataSource()));
+ updateThread.start();
+ tasks.add(updateThread);
+ }
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+ executor.execute(new AlterTransactionRuleTask(containerComposer,
getBaseTransactionITCase().getCommonSQL()));
+ for (Thread each : tasks) {
+ each.join();
+ }
+ IS_FINISHED.set(true);
+ executor.shutdown();
+ }
+
+ @RequiredArgsConstructor
+ @Getter
+ private static class AlterTransactionRuleTask implements Runnable {
+
+ private final TransactionContainerComposer containerComposer;
+
+ private final CommonSQLCommand commonSQL;
+
+ @SneakyThrows({SQLException.class, InterruptedException.class})
+ @Override
+ public void run() {
+ while (!IS_FINISHED.get()) {
+ alterLocalTransactionRule();
+ TimeUnit.SECONDS.sleep(20);
+ alterXaTransactionRule("Narayana");
+ if (SWITCH_COUNT.incrementAndGet() >= MAX_SWITCH_COUNT) {
+ IS_FINISHED.set(true);
+ break;
+ }
+ TimeUnit.SECONDS.sleep(20);
+ }
+ }
+
+ private void alterLocalTransactionRule() throws SQLException {
+ try (Connection connection =
containerComposer.getDataSource().getConnection()) {
+ if (isExpectedTransactionRule(connection,
TransactionType.LOCAL, "")) {
+ return;
+ }
+ String alterLocalTransactionRule =
commonSQL.getAlterLocalTransactionRule();
+ log.info("Alter local transaction rule: {}",
alterLocalTransactionRule);
+ SWITCH_COUNT.getAndIncrement();
+ executeWithLog(connection, alterLocalTransactionRule);
+ }
+ assertTrue(waitExpectedTransactionRule(TransactionType.LOCAL, "",
containerComposer));
+ }
+
+ private void alterXaTransactionRule(final String providerType) throws
SQLException {
+ try (Connection connection =
containerComposer.getDataSource().getConnection()) {
+ if (isExpectedTransactionRule(connection, TransactionType.XA,
providerType)) {
+ return;
+ }
+ String alterXaTransactionRule =
commonSQL.getAlterXATransactionRule().replace("${providerType}", providerType);
+ log.info("Alter XA transaction rule: {}",
alterXaTransactionRule);
+ SWITCH_COUNT.getAndIncrement();
+ executeWithLog(connection, alterXaTransactionRule);
+ }
+ assertTrue(waitExpectedTransactionRule(TransactionType.XA,
providerType, containerComposer));
+ }
+
+ private boolean isExpectedTransactionRule(final Connection connection,
final TransactionType expectedTransType, final String expectedProviderType)
throws SQLException {
+ Map<String, String> transactionRuleMap =
executeShowTransactionRule(connection);
+ return
Objects.equals(transactionRuleMap.get(TransactionTestConstants.DEFAULT_TYPE),
expectedTransType.toString())
+ &&
Objects.equals(transactionRuleMap.get(TransactionTestConstants.PROVIDER_TYPE),
expectedProviderType);
+ }
+
+ private Map<String, String> executeShowTransactionRule(final
Connection connection) throws SQLException {
+ Statement statement = connection.createStatement();
+ ResultSet resultSet = statement.executeQuery("SHOW TRANSACTION
RULE;");
+ Map<String, String> result = new HashMap<>();
+ while (resultSet.next()) {
+ String defaultType =
resultSet.getString(TransactionTestConstants.DEFAULT_TYPE);
+ String providerType =
resultSet.getString(TransactionTestConstants.PROVIDER_TYPE);
+ result.put(TransactionTestConstants.DEFAULT_TYPE, defaultType);
+ result.put(TransactionTestConstants.PROVIDER_TYPE,
providerType);
+ }
+ statement.close();
+ return result;
+ }
+
+ private boolean waitExpectedTransactionRule(final TransactionType
expectedTransType, final String expectedProviderType,
+ final
TransactionContainerComposer containerComposer) throws SQLException {
+ Awaitility.await().pollDelay(5L, TimeUnit.SECONDS).until(() ->
true);
+ try (Connection connection =
containerComposer.getDataSource().getConnection()) {
+ int waitTimes = 0;
+ do {
+ if (isExpectedTransactionRule(connection,
expectedTransType, expectedProviderType)) {
+ return true;
+ }
+ Awaitility.await().pollDelay(2L,
TimeUnit.SECONDS).until(() -> true);
+ waitTimes++;
+ } while (waitTimes <= 3);
+ return false;
+ }
+ }
+ }
+
+ @RequiredArgsConstructor
+ @Getter
+ private static class TransactionOperationsTask implements Runnable {
+
+ private static final AtomicInteger ID_COUNTER = new AtomicInteger();
+
+ private final DataSource dataSource;
+
+ @SneakyThrows(SQLException.class)
+ public void run() {
+ Connection connection = dataSource.getConnection();
+ for (int i = 0; i < TRANSACTION_SIZE; i++) {
+ log.info("Transaction {} start", i);
+ executeOneTransaction(connection);
+ if (IS_FINISHED.get()) {
+ break;
+ }
+ log.info("Transaction {} end", i);
+ }
+ connection.close();
+ }
+
+ private static void executeOneTransaction(final Connection connection)
throws SQLException {
+ boolean isErrorOccured = false;
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ try {
+ connection.setAutoCommit(false);
+ int id = ID_COUNTER.incrementAndGet();
+ PreparedStatement insertStatement =
connection.prepareStatement("insert into account(id, balance, transaction_id)
values(?, ?, ?)");
+ insertStatement.setObject(1, id);
+ insertStatement.setObject(2, id);
+ insertStatement.setObject(3, id);
+ insertStatement.execute();
+ PreparedStatement updateStatement =
connection.prepareStatement("update account set balance = balance - 1 where id
= ?");
+ updateStatement.setObject(1, id);
+ updateStatement.execute();
+ PreparedStatement selectStatement =
connection.prepareStatement("select * from account where id = ?");
+ selectStatement.setObject(1, id);
+ selectStatement.executeQuery();
+ PreparedStatement deleteStatement =
connection.prepareStatement("delete from account where id = ?");
+ deleteStatement.setObject(1, id);
+ deleteStatement.execute();
+ Thread.sleep(random.nextInt(900) + 100);
+ // CHECKSTYLE:OFF
+ } catch (final Exception ex) {
+ // CHECKSTYLE:ON
+ log.error("Execute transaction exception occurred", ex);
+ isErrorOccured = true;
+ connection.rollback();
+ }
+ if (!isErrorOccured) {
+ connection.commit();
+ }
+ connection.setAutoCommit(true);
+ }
+ }
+}