This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 4df81a7 #6872, concurrent insert (#7356)
4df81a7 is described below
commit 4df81a714c966f9362226008ef29b8c0f59da830
Author: Zhang Yonglun <[email protected]>
AuthorDate: Wed Sep 9 18:44:58 2020 +0800
#6872, concurrent insert (#7356)
---
.../env/dataset/DataSetEnvironmentManager.java | 91 +++++++++++++---------
1 file changed, 56 insertions(+), 35 deletions(-)
diff --git
a/shardingsphere-integration-test/shardingsphere-test-suite/src/test/java/org/apache/shardingsphere/dbtest/env/dataset/DataSetEnvironmentManager.java
b/shardingsphere-integration-test/shardingsphere-test-suite/src/test/java/org/apache/shardingsphere/dbtest/env/dataset/DataSetEnvironmentManager.java
index 1ba6408..01510f4 100644
---
a/shardingsphere-integration-test/shardingsphere-test-suite/src/test/java/org/apache/shardingsphere/dbtest/env/dataset/DataSetEnvironmentManager.java
+++
b/shardingsphere-integration-test/shardingsphere-test-suite/src/test/java/org/apache/shardingsphere/dbtest/env/dataset/DataSetEnvironmentManager.java
@@ -66,6 +66,21 @@ public final class DataSetEnvironmentManager {
this.dataSourceMap = dataSourceMap;
}
+ private static String generateTableName(final String tableName, final
DatabaseType databaseType) {
+ switch (databaseType.getName()) {
+ case "H2":
+ case "PostgreSQL":
+ case "Oracle":
+ return "\"" + tableName + "\"";
+ case "MySQL":
+ return "`" + tableName + "`";
+ case "SQLServer":
+ return "[" + tableName + "]";
+ default:
+ throw new UnsupportedOperationException(String.format("Cannot
support database [%s].", databaseType));
+ }
+ }
+
/**
* Initialize data.
*
@@ -74,6 +89,7 @@ public final class DataSetEnvironmentManager {
*/
public void initialize() throws SQLException, ParseException {
Map<DataNode, List<DataSetRow>> dataNodeListMap = getDataSetRowMap();
+ List<Callable<Void>> insertTasks = new LinkedList<>();
for (Entry<DataNode, List<DataSetRow>> entry :
dataNodeListMap.entrySet()) {
DataNode dataNode = entry.getKey();
List<DataSetRow> dataSetRows = entry.getValue();
@@ -82,11 +98,17 @@ public final class DataSetEnvironmentManager {
for (DataSetRow row : dataSetRows) {
sqlValueGroups.add(new SQLValueGroup(dataSetMetadata,
row.getValues()));
}
+ String insertSQL;
try (Connection connection =
dataSourceMap.get(dataNode.getDataSourceName()).getConnection()) {
- String insertSQL =
generateInsertSQL(generateTableName(dataNode.getTableName(),
-
DatabaseTypes.getDatabaseTypeByURL(connection.getMetaData().getURL())),
dataSetMetadata.getColumns());
- executeBatch(connection, insertSQL, sqlValueGroups);
+ insertSQL =
generateInsertSQL(generateTableName(dataNode.getTableName(),
DatabaseTypes.getDatabaseTypeByURL(connection.getMetaData().getURL())),
dataSetMetadata.getColumns());
}
+ insertTasks.add(new
InsertTask(dataSourceMap.get(dataNode.getDataSourceName()), insertSQL,
sqlValueGroups));
+ }
+ try {
+
SHARDING_SPHERE_EXECUTOR_SERVICE.getExecutorService().invokeAll(insertTasks);
+ // CHECKSTYLE:OFF
+ } catch (final Exception ex) {
+ // CHECKSTYLE:ON
}
}
@@ -112,33 +134,17 @@ public final class DataSetEnvironmentManager {
return String.format("INSERT INTO %s (%s) VALUES (%s)", tableName,
Joiner.on(",").join(columnNames), Joiner.on(",").join(placeholders));
}
- private void executeBatch(final Connection connection, final String sql,
final List<SQLValueGroup> sqlValueGroups) throws SQLException {
- try (PreparedStatement preparedStatement =
connection.prepareStatement(sql)) {
- for (SQLValueGroup each : sqlValueGroups) {
- setParameters(preparedStatement, each);
- preparedStatement.addBatch();
- }
- preparedStatement.executeBatch();
- }
- }
-
- private void setParameters(final PreparedStatement preparedStatement,
final SQLValueGroup sqlValueGroup) throws SQLException {
- for (SQLValue each : sqlValueGroup.getSqlValues()) {
- preparedStatement.setObject(each.getIndex(), each.getValue());
- }
- }
-
/**
* Clear data.
*
*/
public void clear() {
- List<Callable<Void>> clearTasks = new LinkedList<>();
+ List<Callable<Void>> deleteTasks = new LinkedList<>();
for (Entry<String, Collection<String>> entry :
getDataNodeMap().entrySet()) {
- clearTasks.add(new ClearTask(dataSourceMap.get(entry.getKey()),
entry.getValue()));
+ deleteTasks.add(new DeleteTask(dataSourceMap.get(entry.getKey()),
entry.getValue()));
}
try {
-
SHARDING_SPHERE_EXECUTOR_SERVICE.getExecutorService().invokeAll(clearTasks);
+
SHARDING_SPHERE_EXECUTOR_SERVICE.getExecutorService().invokeAll(deleteTasks);
// CHECKSTYLE:OFF
} catch (final Exception ex) {
// CHECKSTYLE:ON
@@ -170,23 +176,38 @@ public final class DataSetEnvironmentManager {
return result;
}
- private static String generateTableName(final String tableName, final
DatabaseType databaseType) {
- switch (databaseType.getName()) {
- case "H2":
- case "PostgreSQL":
- case "Oracle":
- return "\"" + tableName + "\"";
- case "MySQL":
- return "`" + tableName + "`";
- case "SQLServer":
- return "[" + tableName + "]";
- default:
- throw new UnsupportedOperationException(String.format("Cannot
support database [%s].", databaseType));
+ @RequiredArgsConstructor
+ private static class InsertTask implements Callable<Void> {
+
+ private final DataSource dataSource;
+
+ private final String insertSQL;
+
+ private final Collection<SQLValueGroup> sqlValueGroups;
+
+ @Override
+ public Void call() throws SQLException {
+ try (Connection connection = dataSource.getConnection()) {
+ try (PreparedStatement preparedStatement =
connection.prepareStatement(insertSQL)) {
+ for (SQLValueGroup each : sqlValueGroups) {
+ setParameters(preparedStatement, each);
+ preparedStatement.addBatch();
+ }
+ preparedStatement.executeBatch();
+ }
+ }
+ return null;
+ }
+
+ private void setParameters(final PreparedStatement preparedStatement,
final SQLValueGroup sqlValueGroup) throws SQLException {
+ for (SQLValue each : sqlValueGroup.getSqlValues()) {
+ preparedStatement.setObject(each.getIndex(), each.getValue());
+ }
}
}
@RequiredArgsConstructor
- private static class ClearTask implements Callable<Void> {
+ private static class DeleteTask implements Callable<Void> {
private final DataSource dataSource;