This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new f4b22c3 Overwrite InventoryDataTaskSplitter.splitByPrimaryKeyRange
(#8422)
f4b22c3 is described below
commit f4b22c3d2987b8cca86778ca32b98c6091d1f1e9
Author: 邱鹿 Lucas <[email protected]>
AuthorDate: Tue Dec 1 11:39:48 2020 +0800
Overwrite InventoryDataTaskSplitter.splitByPrimaryKeyRange (#8422)
* Overwrite InventoryDataTaskSplitter.splitByPrimaryKeyRange()
* Rename SQLBuilder to ScalingSQLBuilder
Co-authored-by: qiulu3 <Lucas209910>
---
.../scaling/fixture/FixtureH2ScalingEntry.java | 6 ++
.../core/config/InventoryDumperConfiguration.java | 2 +-
.../scaling/core/config/JobConfiguration.java | 2 +
.../scaling/core/config/SyncConfiguration.java | 2 +-
.../executor/importer/AbstractJDBCImporter.java | 13 ++--
.../AbstractScalingSQLBuilder.java} | 39 ++++--------
.../executor/sqlbuilder/ScalingSQLBuilder.java | 72 ++++++++++++++++++++++
.../sqlbuilder/ScalingSQLBuilderFactory.java | 43 +++++++++++++
.../job/check/AbstractDataConsistencyChecker.java | 4 +-
.../job/preparer/ShardingScalingJobPreparer.java | 8 +--
.../job/preparer/resumer/SyncPositionResumer.java | 4 +-
.../splitter/InventoryDataTaskSplitter.java | 54 ++++++++--------
.../task/inventory/InventoryDataScalingTask.java | 2 +-
.../scaling/core/spi/ScalingEntry.java | 8 +++
.../scaling/core/utils/SyncConfigurationUtil.java | 2 +-
.../importer/AbstractJDBCImporterTest.java | 15 ++---
.../executor/importer/AbstractSqlBuilderTest.java | 31 +++-------
.../fixture/FixtureDataConsistencyChecker.java | 16 +----
.../core/fixture/FixtureH2ScalingEntry.java | 6 ++
.../core/fixture/FixtureScalingSQLBuilder.java} | 34 +++++-----
.../splitter/InventoryDataTaskSplitterTest.java | 36 +++++++----
.../inventory/InventoryDataScalingTaskTest.java | 3 +-
.../scaling/mysql/MySQLScalingEntry.java | 7 +++
.../component/MySQLDataConsistencyChecker.java | 4 +-
.../scaling/mysql/component/MySQLImporter.java | 6 +-
...SQLBuilder.java => MySQLScalingSQLBuilder.java} | 7 ++-
...erTest.java => MySQLScalingSQLBuilderTest.java} | 4 +-
.../scaling/postgresql/PostgreSQLScalingEntry.java | 7 +++
.../PostgreSQLDataConsistencyChecker.java | 6 +-
.../postgresql/component/PostgreSQLImporter.java | 6 +-
...ilder.java => PostgreSQLScalingSQLBuilder.java} | 7 ++-
...t.java => PostgreSQLScalingSQLBuilderTest.java} | 4 +-
32 files changed, 295 insertions(+), 165 deletions(-)
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/fixture/FixtureH2ScalingEntry.java
b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/fixture/FixtureH2ScalingEntry.java
index 9996ecb..bd86ae5 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/fixture/FixtureH2ScalingEntry.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-bootstrap/src/test/java/org/apache/shardingsphere/scaling/fixture/FixtureH2ScalingEntry.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.scaling.fixture;
import
org.apache.shardingsphere.scaling.core.execute.executor.dumper.JDBCDumper;
import
org.apache.shardingsphere.scaling.core.execute.executor.dumper.LogDumper;
import
org.apache.shardingsphere.scaling.core.execute.executor.importer.Importer;
+import
org.apache.shardingsphere.scaling.core.execute.executor.sqlbuilder.ScalingSQLBuilder;
import org.apache.shardingsphere.scaling.core.job.check.DataConsistencyChecker;
import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
import
org.apache.shardingsphere.scaling.core.job.preparer.checker.DataSourceChecker;
@@ -58,6 +59,11 @@ public final class FixtureH2ScalingEntry implements
ScalingEntry {
}
@Override
+ public Class<? extends ScalingSQLBuilder> getSQLBuilderClass() {
+ return null;
+ }
+
+ @Override
public String getDatabaseType() {
return "H2";
}
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/InventoryDumperConfiguration.java
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/InventoryDumperConfiguration.java
index d004c52..839bcd0 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/InventoryDumperConfiguration.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/InventoryDumperConfiguration.java
@@ -31,7 +31,7 @@ public final class InventoryDumperConfiguration extends
DumperConfiguration {
private String primaryKey;
- private Integer spiltNum;
+ private Integer shardingItem;
public InventoryDumperConfiguration(final DumperConfiguration
dumperConfig) {
setDataSourceName(dumperConfig.getDataSourceName());
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JobConfiguration.java
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JobConfiguration.java
index 8fb2f35..cb6bc0d 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JobConfiguration.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/JobConfiguration.java
@@ -37,5 +37,7 @@ public final class JobConfiguration {
private int shardingItem;
+ private int shardingSize = 10000000;
+
private boolean running = true;
}
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/SyncConfiguration.java
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/SyncConfiguration.java
index abffdcc..566c29b 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/SyncConfiguration.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/SyncConfiguration.java
@@ -27,7 +27,7 @@ import lombok.RequiredArgsConstructor;
@RequiredArgsConstructor
public final class SyncConfiguration {
- private final int concurrency;
+ private final JobConfiguration jobConfig;
private final DumperConfiguration dumperConfig;
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporter.java
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporter.java
index ce0f5c5..9f21632 100755
---
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporter.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporter.java
@@ -32,6 +32,7 @@ import
org.apache.shardingsphere.scaling.core.execute.executor.record.FinishedRe
import
org.apache.shardingsphere.scaling.core.execute.executor.record.GroupedDataRecord;
import org.apache.shardingsphere.scaling.core.execute.executor.record.Record;
import
org.apache.shardingsphere.scaling.core.execute.executor.record.RecordUtil;
+import
org.apache.shardingsphere.scaling.core.execute.executor.sqlbuilder.ScalingSQLBuilder;
import javax.sql.DataSource;
import java.sql.Connection;
@@ -54,7 +55,7 @@ public abstract class AbstractJDBCImporter extends
AbstractShardingScalingExecut
private final DataSourceManager dataSourceManager;
- private final AbstractSQLBuilder sqlBuilder;
+ private final ScalingSQLBuilder scalingSqlBuilder;
@Setter
private Channel channel;
@@ -62,7 +63,7 @@ public abstract class AbstractJDBCImporter extends
AbstractShardingScalingExecut
protected AbstractJDBCImporter(final ImporterConfiguration importerConfig,
final DataSourceManager dataSourceManager) {
this.importerConfig = importerConfig;
this.dataSourceManager = dataSourceManager;
- sqlBuilder = createSQLBuilder(importerConfig.getShardingColumnsMap());
+ scalingSqlBuilder =
createSQLBuilder(importerConfig.getShardingColumnsMap());
}
/**
@@ -71,7 +72,7 @@ public abstract class AbstractJDBCImporter extends
AbstractShardingScalingExecut
* @param shardingColumnsMap sharding columns map
* @return SQL builder
*/
- protected abstract AbstractSQLBuilder createSQLBuilder(Map<String,
Set<String>> shardingColumnsMap);
+ protected abstract ScalingSQLBuilder createSQLBuilder(Map<String,
Set<String>> shardingColumnsMap);
@Override
public final void start() {
@@ -153,7 +154,7 @@ public abstract class AbstractJDBCImporter extends
AbstractShardingScalingExecut
}
private void executeBatchInsert(final Connection connection, final
List<DataRecord> dataRecords) throws SQLException {
- String insertSql = sqlBuilder.buildInsertSQL(dataRecords.get(0));
+ String insertSql =
scalingSqlBuilder.buildInsertSQL(dataRecords.get(0));
PreparedStatement ps = connection.prepareStatement(insertSql);
ps.setQueryTimeout(30);
for (DataRecord each : dataRecords) {
@@ -174,7 +175,7 @@ public abstract class AbstractJDBCImporter extends
AbstractShardingScalingExecut
private void executeUpdate(final Connection connection, final DataRecord
record) throws SQLException {
List<Column> conditionColumns =
RecordUtil.extractConditionColumns(record,
importerConfig.getShardingColumnsMap().get(record.getTableName()));
List<Column> updatedColumns = RecordUtil.extractUpdatedColumns(record);
- String updateSql = sqlBuilder.buildUpdateSQL(record, conditionColumns);
+ String updateSql = scalingSqlBuilder.buildUpdateSQL(record,
conditionColumns);
PreparedStatement ps = connection.prepareStatement(updateSql);
for (int i = 0; i < updatedColumns.size(); i++) {
ps.setObject(i + 1, updatedColumns.get(i).getValue());
@@ -190,7 +191,7 @@ public abstract class AbstractJDBCImporter extends
AbstractShardingScalingExecut
private void executeBatchDelete(final Connection connection, final
List<DataRecord> dataRecords) throws SQLException {
List<Column> conditionColumns =
RecordUtil.extractConditionColumns(dataRecords.get(0),
importerConfig.getShardingColumnsMap().get(dataRecords.get(0).getTableName()));
- String deleteSQL = sqlBuilder.buildDeleteSQL(dataRecords.get(0),
conditionColumns);
+ String deleteSQL =
scalingSqlBuilder.buildDeleteSQL(dataRecords.get(0), conditionColumns);
PreparedStatement ps = connection.prepareStatement(deleteSQL);
ps.setQueryTimeout(30);
for (DataRecord each : dataRecords) {
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractSQLBuilder.java
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/sqlbuilder/AbstractScalingSQLBuilder.java
similarity index 90%
rename from
shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractSQLBuilder.java
rename to
shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/sqlbuilder/AbstractScalingSQLBuilder.java
index d40e47a..6f9edb742 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractSQLBuilder.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/sqlbuilder/AbstractScalingSQLBuilder.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.scaling.core.execute.executor.importer;
+package org.apache.shardingsphere.scaling.core.execute.executor.sqlbuilder;
import com.google.common.collect.Collections2;
import lombok.AccessLevel;
@@ -35,7 +35,7 @@ import java.util.concurrent.ConcurrentMap;
* Abstract SQL builder.
*/
@RequiredArgsConstructor
-public abstract class AbstractSQLBuilder {
+public abstract class AbstractScalingSQLBuilder implements ScalingSQLBuilder {
private static final String INSERT_SQL_CACHE_KEY_PREFIX = "INSERT_";
@@ -72,12 +72,7 @@ public abstract class AbstractSQLBuilder {
return new
StringBuilder().append(getLeftIdentifierQuoteString()).append(item).append(getRightIdentifierQuoteString());
}
- /**
- * Build insert SQL.
- *
- * @param dataRecord data record
- * @return insert SQL
- */
+ @Override
public String buildInsertSQL(final DataRecord dataRecord) {
String sqlCacheKey = INSERT_SQL_CACHE_KEY_PREFIX +
dataRecord.getTableName();
if (!sqlCacheMap.containsKey(sqlCacheKey)) {
@@ -98,13 +93,7 @@ public abstract class AbstractSQLBuilder {
return String.format("INSERT INTO %s(%s) VALUES(%s)",
quote(tableName), columnsLiteral, holder);
}
- /**
- * Build update SQL.
- *
- * @param dataRecord data record
- * @param conditionColumns condition columns
- * @return update SQL
- */
+ @Override
public String buildUpdateSQL(final DataRecord dataRecord, final
Collection<Column> conditionColumns) {
String sqlCacheKey = UPDATE_SQL_CACHE_KEY_PREFIX +
dataRecord.getTableName();
if (!sqlCacheMap.containsKey(sqlCacheKey)) {
@@ -126,13 +115,7 @@ public abstract class AbstractSQLBuilder {
return Collections2.filter(columns, Column::isUpdated);
}
- /**
- * Build delete SQL.
- *
- * @param dataRecord data record
- * @param conditionColumns condition columns
- * @return delete SQL
- */
+ @Override
public String buildDeleteSQL(final DataRecord dataRecord, final
Collection<Column> conditionColumns) {
String sqlCacheKey = DELETE_SQL_CACHE_KEY_PREFIX +
dataRecord.getTableName();
if (!sqlCacheMap.containsKey(sqlCacheKey)) {
@@ -154,13 +137,13 @@ public abstract class AbstractSQLBuilder {
return where.toString();
}
- /**
- * Build count SQL.
- *
- * @param tableName table name
- * @return count SQL
- */
+ @Override
public String buildCountSQL(final String tableName) {
return String.format("SELECT COUNT(*) FROM %s", quote(tableName));
}
+
+ @Override
+ public String buildSplitByPrimaryKeyRangeSQL(final String tableName, final
String primaryKey) {
+ return String.format("SELECT MAX(%s) FROM (SELECT %s FROM %s WHERE
%s>=? limit ?) t", quote(primaryKey), quote(primaryKey), quote(tableName),
quote(primaryKey));
+ }
}
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/sqlbuilder/ScalingSQLBuilder.java
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/sqlbuilder/ScalingSQLBuilder.java
new file mode 100644
index 0000000..09c98fa
--- /dev/null
+++
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/sqlbuilder/ScalingSQLBuilder.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.core.execute.executor.sqlbuilder;
+
+import org.apache.shardingsphere.scaling.core.execute.executor.record.Column;
+import
org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord;
+
+import java.util.Collection;
+
+/**
+ * Scaling SQL builder.
+ */
+public interface ScalingSQLBuilder {
+
+ /**
+ * Build insert SQL.
+ *
+ * @param dataRecord data record
+ * @return insert SQL
+ */
+ String buildInsertSQL(DataRecord dataRecord);
+
+ /**
+ * Build update SQL.
+ *
+ * @param dataRecord data record
+ * @param conditionColumns condition columns
+ * @return update SQL
+ */
+ String buildUpdateSQL(DataRecord dataRecord, Collection<Column>
conditionColumns);
+
+ /**
+ * Build delete SQL.
+ *
+ * @param dataRecord data record
+ * @param conditionColumns condition columns
+ * @return delete SQL
+ */
+ String buildDeleteSQL(DataRecord dataRecord, Collection<Column>
conditionColumns);
+
+ /**
+ * Build count SQL.
+ *
+ * @param tableName table name
+ * @return count SQL
+ */
+ String buildCountSQL(String tableName);
+
+ /**
+ * Build split by primary key range SQL.
+ *
+ * @param tableName table name
+ * @param primaryKey primary key
+ * @return split SQL
+ */
+ String buildSplitByPrimaryKeyRangeSQL(String tableName, String primaryKey);
+}
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/sqlbuilder/ScalingSQLBuilderFactory.java
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/sqlbuilder/ScalingSQLBuilderFactory.java
new file mode 100644
index 0000000..40b651c
--- /dev/null
+++
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/sqlbuilder/ScalingSQLBuilderFactory.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.core.execute.executor.sqlbuilder;
+
+import com.google.common.collect.Maps;
+import lombok.SneakyThrows;
+import org.apache.shardingsphere.scaling.core.spi.ScalingEntry;
+import org.apache.shardingsphere.scaling.core.spi.ScalingEntryLoader;
+
+import java.util.Map;
+
+/**
+ * Scaling SQL builder factory.
+ */
+public final class ScalingSQLBuilderFactory {
+
+ /**
+ * New instance of SQL builder.
+ *
+ * @param databaseType database type
+ * @return SQL builder
+ */
+ @SneakyThrows(ReflectiveOperationException.class)
+ public static ScalingSQLBuilder newInstance(final String databaseType) {
+ ScalingEntry scalingEntry =
ScalingEntryLoader.getScalingEntryByDatabaseType(databaseType);
+ return
scalingEntry.getSQLBuilderClass().getConstructor(Map.class).newInstance(Maps.newHashMap());
+ }
+}
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/check/AbstractDataConsistencyChecker.java
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/check/AbstractDataConsistencyChecker.java
index a5efe04..49f8cb9 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/check/AbstractDataConsistencyChecker.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/check/AbstractDataConsistencyChecker.java
@@ -23,7 +23,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceFactory;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceWrapper;
import org.apache.shardingsphere.scaling.core.exception.DataCheckFailException;
-import
org.apache.shardingsphere.scaling.core.execute.executor.importer.AbstractSQLBuilder;
+import
org.apache.shardingsphere.scaling.core.execute.executor.sqlbuilder.ScalingSQLBuilder;
import org.apache.shardingsphere.scaling.core.job.ShardingScalingJob;
import javax.sql.DataSource;
@@ -85,5 +85,5 @@ public abstract class AbstractDataConsistencyChecker
implements DataConsistencyC
return
dataSourceFactory.newInstance(shardingScalingJob.getScalingConfig().getRuleConfiguration().getTarget().unwrap());
}
- protected abstract AbstractSQLBuilder getSqlBuilder();
+ protected abstract ScalingSQLBuilder getSqlBuilder();
}
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ShardingScalingJobPreparer.java
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ShardingScalingJobPreparer.java
index 50dcfff..7f3fb4c 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ShardingScalingJobPreparer.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/ShardingScalingJobPreparer.java
@@ -68,7 +68,7 @@ public final class ShardingScalingJobPreparer {
syncPositionResumer.resumePosition(shardingScalingJob,
dataSourceManager, resumeBreakPointManager);
} else {
initIncrementalDataTasks(databaseType, shardingScalingJob,
dataSourceManager);
- initInventoryDataTasks(shardingScalingJob, dataSourceManager);
+ initInventoryDataTasks(databaseType, shardingScalingJob,
dataSourceManager);
syncPositionResumer.persistPosition(shardingScalingJob,
resumeBreakPointManager);
}
shardingScalingJob.setDataConsistencyChecker(initDataConsistencyChecker(databaseType,
shardingScalingJob));
@@ -90,10 +90,10 @@ public final class ShardingScalingJobPreparer {
dataSourceChecker.checkVariable(dataSourceManager.getSourceDataSources().values());
}
- private void initInventoryDataTasks(final ShardingScalingJob
shardingScalingJob, final DataSourceManager dataSourceManager) {
+ private void initInventoryDataTasks(final String databaseType, final
ShardingScalingJob shardingScalingJob, final DataSourceManager
dataSourceManager) {
List<ScalingTask> allInventoryDataTasks = new LinkedList<>();
for (SyncConfiguration each : shardingScalingJob.getSyncConfigs()) {
-
allInventoryDataTasks.addAll(inventoryDataTaskSplitter.splitInventoryData(each,
dataSourceManager));
+
allInventoryDataTasks.addAll(inventoryDataTaskSplitter.splitInventoryData(databaseType,
each, dataSourceManager));
}
shardingScalingJob.getInventoryDataTasks().addAll(allInventoryDataTasks);
}
@@ -102,7 +102,7 @@ public final class ShardingScalingJobPreparer {
for (SyncConfiguration each : shardingScalingJob.getSyncConfigs()) {
DataSourceConfiguration dataSourceConfig =
each.getDumperConfig().getDataSourceConfig();
each.getDumperConfig().setPositionManager(PositionManagerFactory.newInstance(databaseType,
dataSourceManager.getDataSource(dataSourceConfig)));
-
shardingScalingJob.getIncrementalDataTasks().add(syncTaskFactory.createIncrementalDataSyncTask(each.getConcurrency(),
each.getDumperConfig(), each.getImporterConfig()));
+
shardingScalingJob.getIncrementalDataTasks().add(syncTaskFactory.createIncrementalDataSyncTask(each.getJobConfig().getConcurrency(),
each.getDumperConfig(), each.getImporterConfig()));
}
}
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/resumer/SyncPositionResumer.java
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/resumer/SyncPositionResumer.java
index a9e6d51..f64ce7a 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/resumer/SyncPositionResumer.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/resumer/SyncPositionResumer.java
@@ -78,7 +78,7 @@ public final class SyncPositionResumer {
result.setTableName(splitTable[0].split("\\.")[1]);
result.setPositionManager(entry.getValue());
if (2 == splitTable.length) {
- result.setSpiltNum(Integer.parseInt(splitTable[1]));
+ result.setShardingItem(Integer.parseInt(splitTable[1]));
}
result.setPrimaryKey(metaDataManager.getTableMetaData(result.getTableName()).getPrimaryKeyColumns().get(0));
return result;
@@ -94,7 +94,7 @@ public final class SyncPositionResumer {
private void resumeIncrementalPosition(final ShardingScalingJob
shardingScalingJob, final ResumeBreakPointManager resumeBreakPointManager) {
for (SyncConfiguration each : shardingScalingJob.getSyncConfigs()) {
each.getDumperConfig().setPositionManager(resumeBreakPointManager.getIncrementalPositionManagerMap().get(each.getDumperConfig().getDataSourceName()));
-
shardingScalingJob.getIncrementalDataTasks().add(syncTaskFactory.createIncrementalDataSyncTask(each.getConcurrency(),
each.getDumperConfig(), each.getImporterConfig()));
+
shardingScalingJob.getIncrementalDataTasks().add(syncTaskFactory.createIncrementalDataSyncTask(each.getJobConfig().getConcurrency(),
each.getDumperConfig(), each.getImporterConfig()));
}
}
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryDataTaskSplitter.java
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryDataTaskSplitter.java
index b522cc6..3b66c38 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryDataTaskSplitter.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryDataTaskSplitter.java
@@ -24,6 +24,8 @@ import
org.apache.shardingsphere.scaling.core.config.InventoryDumperConfiguratio
import org.apache.shardingsphere.scaling.core.config.SyncConfiguration;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
import org.apache.shardingsphere.scaling.core.exception.PrepareFailedException;
+import
org.apache.shardingsphere.scaling.core.execute.executor.sqlbuilder.ScalingSQLBuilder;
+import
org.apache.shardingsphere.scaling.core.execute.executor.sqlbuilder.ScalingSQLBuilderFactory;
import org.apache.shardingsphere.scaling.core.job.position.PlaceholderPosition;
import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPosition;
@@ -53,25 +55,27 @@ public final class InventoryDataTaskSplitter {
/**
* Split inventory data to multi-tasks.
*
+ * @param databaseType database type
* @param syncConfig synchronize configuration
* @param dataSourceManager data source manager
* @return split inventory data task
*/
- public Collection<ScalingTask> splitInventoryData(final SyncConfiguration
syncConfig, final DataSourceManager dataSourceManager) {
+ public Collection<ScalingTask> splitInventoryData(final String
databaseType, final SyncConfiguration syncConfig, final DataSourceManager
dataSourceManager) {
Collection<ScalingTask> result = new LinkedList<>();
- for (InventoryDumperConfiguration each :
splitDumperConfig(syncConfig.getConcurrency(), syncConfig.getDumperConfig(),
dataSourceManager)) {
+ for (InventoryDumperConfiguration each :
splitDumperConfig(databaseType, syncConfig.getJobConfig().getShardingSize(),
syncConfig.getDumperConfig(), dataSourceManager)) {
result.add(syncTaskFactory.createInventoryDataSyncTask(each,
syncConfig.getImporterConfig()));
}
return result;
}
- private Collection<InventoryDumperConfiguration> splitDumperConfig(final
int concurrency, final DumperConfiguration dumperConfig, final
DataSourceManager dataSourceManager) {
+ private Collection<InventoryDumperConfiguration> splitDumperConfig(
+ final String databaseType, final int shardingSize, final
DumperConfiguration dumperConfig, final DataSourceManager dataSourceManager) {
Collection<InventoryDumperConfiguration> result = new LinkedList<>();
DataSource dataSource =
dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig());
MetaDataManager metaDataManager = new MetaDataManager(dataSource);
for (InventoryDumperConfiguration each : splitByTable(dumperConfig)) {
if (isSpiltByPrimaryKeyRange(each, metaDataManager)) {
- result.addAll(splitByPrimaryKeyRange(concurrency, each,
metaDataManager, dataSource));
+ result.addAll(splitByPrimaryKeyRange(databaseType,
shardingSize, each, metaDataManager, dataSource));
} else {
result.add(each);
}
@@ -117,31 +121,33 @@ public final class InventoryDataTaskSplitter {
return Types.INTEGER != columnType && Types.BIGINT != columnType &&
Types.SMALLINT != columnType && Types.TINYINT != columnType;
}
- private Collection<InventoryDumperConfiguration>
splitByPrimaryKeyRange(final int concurrency, final
InventoryDumperConfiguration inventoryDumperConfig,
-
final MetaDataManager metaDataManager, final DataSource dataSource) {
+ private Collection<InventoryDumperConfiguration> splitByPrimaryKeyRange(
+ final String databaseType, final int shardingSize, final
InventoryDumperConfiguration inventoryDumperConfig, final MetaDataManager
metaDataManager, final DataSource dataSource) {
Collection<InventoryDumperConfiguration> result = new LinkedList<>();
String tableName = inventoryDumperConfig.getTableName();
String primaryKey =
metaDataManager.getTableMetaData(tableName).getPrimaryKeyColumns().get(0);
+ ScalingSQLBuilder scalingSqlBuilder =
ScalingSQLBuilderFactory.newInstance(databaseType);
inventoryDumperConfig.setPrimaryKey(primaryKey);
- try (Connection connection = dataSource.getConnection()) {
- PreparedStatement ps =
connection.prepareStatement(String.format("SELECT MIN(%s),MAX(%s) FROM %s LIMIT
1", primaryKey, primaryKey, inventoryDumperConfig.getTableName()));
- ResultSet rs = ps.executeQuery();
- rs.next();
- long min = rs.getLong(1);
- long max = rs.getLong(2);
- long step = (max - min) / concurrency;
- for (int i = 0; i < concurrency && min <= max; i++) {
- InventoryDumperConfiguration splitDumperConfig = new
InventoryDumperConfiguration(inventoryDumperConfig);
- if (i < concurrency - 1) {
- splitDumperConfig.setPositionManager(new
PositionManager(new PrimaryKeyPosition(min, min + step)));
- min += step + 1;
- } else {
- splitDumperConfig.setPositionManager(new
PositionManager(new PrimaryKeyPosition(min, max)));
+ try (Connection connection = dataSource.getConnection();
+ PreparedStatement ps =
connection.prepareStatement(scalingSqlBuilder.buildSplitByPrimaryKeyRangeSQL(tableName,
primaryKey))) {
+ long beginId = 0;
+ for (int i = 0; i < Integer.MAX_VALUE; i++) {
+ ps.setLong(1, beginId);
+ ps.setLong(2, shardingSize);
+ try (ResultSet rs = ps.executeQuery()) {
+ rs.next();
+ long endId = rs.getLong(1);
+ if (endId == 0) {
+ break;
+ }
+ InventoryDumperConfiguration splitDumperConfig = new
InventoryDumperConfiguration(inventoryDumperConfig);
+ splitDumperConfig.setPositionManager(new
PositionManager(new PrimaryKeyPosition(beginId, endId)));
+ splitDumperConfig.setShardingItem(i);
+ splitDumperConfig.setPrimaryKey(primaryKey);
+ splitDumperConfig.setTableName(tableName);
+ result.add(splitDumperConfig);
+ beginId = endId + 1;
}
- splitDumperConfig.setSpiltNum(i);
- splitDumperConfig.setPrimaryKey(primaryKey);
- splitDumperConfig.setTableName(tableName);
- result.add(splitDumperConfig);
}
} catch (final SQLException ex) {
throw new PrepareFailedException(String.format("Split task for
table %s by primary key %s error", inventoryDumperConfig.getTableName(),
primaryKey), ex);
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTask.java
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTask.java
index 885b4c6..ca8cac7 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTask.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTask.java
@@ -68,7 +68,7 @@ public final class InventoryDataScalingTask extends
AbstractShardingScalingExecu
private String generateSyncTaskId(final InventoryDumperConfiguration
inventoryDumperConfig) {
String result = String.format("%s.%s",
inventoryDumperConfig.getDataSourceName(),
inventoryDumperConfig.getTableName());
- return null == inventoryDumperConfig.getSpiltNum() ? result : result +
"#" + inventoryDumperConfig.getSpiltNum();
+ return null == inventoryDumperConfig.getShardingItem() ? result :
result + "#" + inventoryDumperConfig.getShardingItem();
}
@Override
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ScalingEntry.java
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ScalingEntry.java
index c765215..692504e 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ScalingEntry.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/spi/ScalingEntry.java
@@ -21,6 +21,7 @@ import
org.apache.shardingsphere.infra.database.type.DatabaseTypeAwareSPI;
import
org.apache.shardingsphere.scaling.core.execute.executor.dumper.JDBCDumper;
import
org.apache.shardingsphere.scaling.core.execute.executor.dumper.LogDumper;
import
org.apache.shardingsphere.scaling.core.execute.executor.importer.Importer;
+import
org.apache.shardingsphere.scaling.core.execute.executor.sqlbuilder.ScalingSQLBuilder;
import org.apache.shardingsphere.scaling.core.job.check.DataConsistencyChecker;
import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
import
org.apache.shardingsphere.scaling.core.job.preparer.checker.DataSourceChecker;
@@ -71,4 +72,11 @@ public interface ScalingEntry extends DatabaseTypeAwareSPI {
* @return data consistency checker type
*/
Class<? extends DataConsistencyChecker> getDataConsistencyCheckerClass();
+
+ /**
+ * Get SQL builder class.
+ *
+ * @return SQL builder type
+ */
+ Class<? extends ScalingSQLBuilder> getSQLBuilderClass();
}
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/SyncConfigurationUtil.java
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/SyncConfigurationUtil.java
index 6d00841..76f74ee 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/SyncConfigurationUtil.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/utils/SyncConfigurationUtil.java
@@ -83,7 +83,7 @@ public final class SyncConfigurationUtil {
for (Entry<String, Map<String, String>> entry :
dataSourceTableNameMap.entrySet()) {
DumperConfiguration dumperConfig =
createDumperConfig(entry.getKey(),
sourceDataSource.get(entry.getKey()).getProps(), entry.getValue());
ImporterConfiguration importerConfig =
createImporterConfig(scalingConfig, shardingColumnsMap);
- result.add(new
SyncConfiguration(scalingConfig.getJobConfiguration().getConcurrency(),
dumperConfig, importerConfig));
+ result.add(new
SyncConfiguration(scalingConfig.getJobConfiguration(), dumperConfig,
importerConfig));
}
return result;
}
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporterTest.java
b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporterTest.java
index fc25ab4..4d19ce5 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporterTest.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporterTest.java
@@ -28,6 +28,7 @@ import
org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord
import
org.apache.shardingsphere.scaling.core.execute.executor.record.FinishedRecord;
import org.apache.shardingsphere.scaling.core.execute.executor.record.Record;
import
org.apache.shardingsphere.scaling.core.execute.executor.record.RecordUtil;
+import
org.apache.shardingsphere.scaling.core.execute.executor.sqlbuilder.ScalingSQLBuilder;
import org.apache.shardingsphere.scaling.core.job.position.PlaceholderPosition;
import org.junit.Before;
import org.junit.Test;
@@ -66,7 +67,7 @@ public final class AbstractJDBCImporterTest {
private DataSourceManager dataSourceManager;
@Mock
- private AbstractSQLBuilder sqlBuilder;
+ private ScalingSQLBuilder scalingSqlBuilder;
@Mock
private DataSourceConfiguration dataSourceConfig;
@@ -90,8 +91,8 @@ public final class AbstractJDBCImporterTest {
jdbcImporter = new AbstractJDBCImporter(mockImporterConfiguration(),
dataSourceManager) {
@Override
- protected AbstractSQLBuilder createSQLBuilder(final Map<String,
Set<String>> shardingColumnsMap) {
- return sqlBuilder;
+ protected ScalingSQLBuilder createSQLBuilder(final Map<String,
Set<String>> shardingColumnsMap) {
+ return scalingSqlBuilder;
}
};
jdbcImporter.setChannel(channel);
@@ -102,7 +103,7 @@ public final class AbstractJDBCImporterTest {
@Test
public void assertWriteInsertDataRecord() throws SQLException {
DataRecord insertRecord = getDataRecord("INSERT");
- when(sqlBuilder.buildInsertSQL(insertRecord)).thenReturn(INSERT_SQL);
+
when(scalingSqlBuilder.buildInsertSQL(insertRecord)).thenReturn(INSERT_SQL);
when(connection.prepareStatement(INSERT_SQL)).thenReturn(preparedStatement);
when(channel.fetchRecords(anyInt(),
anyInt())).thenReturn(mockRecords(insertRecord));
jdbcImporter.run();
@@ -115,7 +116,7 @@ public final class AbstractJDBCImporterTest {
@Test
public void assertDeleteDataRecord() throws SQLException {
DataRecord deleteRecord = getDataRecord("DELETE");
- when(sqlBuilder.buildDeleteSQL(deleteRecord,
mockConditionColumns(deleteRecord))).thenReturn(DELETE_SQL);
+ when(scalingSqlBuilder.buildDeleteSQL(deleteRecord,
mockConditionColumns(deleteRecord))).thenReturn(DELETE_SQL);
when(connection.prepareStatement(DELETE_SQL)).thenReturn(preparedStatement);
when(channel.fetchRecords(anyInt(),
anyInt())).thenReturn(mockRecords(deleteRecord));
jdbcImporter.run();
@@ -127,7 +128,7 @@ public final class AbstractJDBCImporterTest {
@Test
public void assertUpdateDataRecord() throws SQLException {
DataRecord updateRecord = getDataRecord("UPDATE");
- when(sqlBuilder.buildUpdateSQL(updateRecord,
mockConditionColumns(updateRecord))).thenReturn(UPDATE_SQL);
+ when(scalingSqlBuilder.buildUpdateSQL(updateRecord,
mockConditionColumns(updateRecord))).thenReturn(UPDATE_SQL);
when(connection.prepareStatement(UPDATE_SQL)).thenReturn(preparedStatement);
when(channel.fetchRecords(anyInt(),
anyInt())).thenReturn(mockRecords(updateRecord));
jdbcImporter.run();
@@ -141,7 +142,7 @@ public final class AbstractJDBCImporterTest {
@Test
public void assertUpdatePrimaryKeyDataRecord() throws SQLException {
DataRecord updateRecord = getUpdatePrimaryKeyDataRecord();
- when(sqlBuilder.buildUpdateSQL(updateRecord,
mockConditionColumns(updateRecord))).thenReturn(UPDATE_SQL);
+ when(scalingSqlBuilder.buildUpdateSQL(updateRecord,
mockConditionColumns(updateRecord))).thenReturn(UPDATE_SQL);
when(connection.prepareStatement(UPDATE_SQL)).thenReturn(preparedStatement);
when(channel.fetchRecords(anyInt(),
anyInt())).thenReturn(mockRecords(updateRecord));
jdbcImporter.run();
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractSqlBuilderTest.java
b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractSqlBuilderTest.java
index bb86ec7..43d1b33 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractSqlBuilderTest.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractSqlBuilderTest.java
@@ -22,8 +22,9 @@ import com.google.common.collect.Sets;
import org.apache.shardingsphere.scaling.core.execute.executor.record.Column;
import
org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord;
import
org.apache.shardingsphere.scaling.core.execute.executor.record.RecordUtil;
+import
org.apache.shardingsphere.scaling.core.execute.executor.sqlbuilder.ScalingSQLBuilder;
+import org.apache.shardingsphere.scaling.core.fixture.FixtureScalingSQLBuilder;
import org.apache.shardingsphere.scaling.core.job.position.PlaceholderPosition;
-import org.junit.Before;
import org.junit.Test;
import java.util.Collection;
@@ -33,53 +34,37 @@ import static org.junit.Assert.assertThat;
public final class AbstractSqlBuilderTest {
- private AbstractSQLBuilder sqlBuilder;
-
- @Before
- public void setUp() {
- sqlBuilder = new AbstractSQLBuilder(Maps.newHashMap()) {
-
- @Override
- protected String getLeftIdentifierQuoteString() {
- return "`";
- }
-
- @Override
- protected String getRightIdentifierQuoteString() {
- return "`";
- }
- };
- }
+ private final ScalingSQLBuilder scalingSqlBuilder = new
FixtureScalingSQLBuilder(Maps.newHashMap());
@Test
public void assertBuildInsertSQL() {
- String actual = sqlBuilder.buildInsertSQL(mockDataRecord("t1"));
+ String actual = scalingSqlBuilder.buildInsertSQL(mockDataRecord("t1"));
assertThat(actual, is("INSERT INTO `t1`(`id`,`sc`,`c1`,`c2`,`c3`)
VALUES(?,?,?,?,?)"));
}
@Test
public void assertBuildUpdateSQLWithPrimaryKey() {
- String actual = sqlBuilder.buildUpdateSQL(mockDataRecord("t2"),
RecordUtil.extractPrimaryColumns(mockDataRecord("t2")));
+ String actual = scalingSqlBuilder.buildUpdateSQL(mockDataRecord("t2"),
RecordUtil.extractPrimaryColumns(mockDataRecord("t2")));
assertThat(actual, is("UPDATE `t2` SET `c1` = ?,`c2` = ?,`c3` = ?
WHERE `id` = ?"));
}
@Test
public void assertBuildUpdateSQLWithShardingColumns() {
DataRecord dataRecord = mockDataRecord("t2");
- String actual = sqlBuilder.buildUpdateSQL(dataRecord,
mockConditionColumns(dataRecord));
+ String actual = scalingSqlBuilder.buildUpdateSQL(dataRecord,
mockConditionColumns(dataRecord));
assertThat(actual, is("UPDATE `t2` SET `c1` = ?,`c2` = ?,`c3` = ?
WHERE `id` = ? and `sc` = ?"));
}
@Test
public void assertBuildDeleteSQLWithPrimaryKey() {
- String actual = sqlBuilder.buildDeleteSQL(mockDataRecord("t3"),
RecordUtil.extractPrimaryColumns(mockDataRecord("t3")));
+ String actual = scalingSqlBuilder.buildDeleteSQL(mockDataRecord("t3"),
RecordUtil.extractPrimaryColumns(mockDataRecord("t3")));
assertThat(actual, is("DELETE FROM `t3` WHERE `id` = ?"));
}
@Test
public void assertBuildDeleteSQLWithConditionColumns() {
DataRecord dataRecord = mockDataRecord("t3");
- String actual = sqlBuilder.buildDeleteSQL(dataRecord,
mockConditionColumns(dataRecord));
+ String actual = scalingSqlBuilder.buildDeleteSQL(dataRecord,
mockConditionColumns(dataRecord));
assertThat(actual, is("DELETE FROM `t3` WHERE `id` = ? and `sc` = ?"));
}
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureDataConsistencyChecker.java
b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureDataConsistencyChecker.java
index 243b5d3..b35181d 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureDataConsistencyChecker.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureDataConsistencyChecker.java
@@ -18,7 +18,7 @@
package org.apache.shardingsphere.scaling.core.fixture;
import com.google.common.collect.Maps;
-import
org.apache.shardingsphere.scaling.core.execute.executor.importer.AbstractSQLBuilder;
+import
org.apache.shardingsphere.scaling.core.execute.executor.sqlbuilder.ScalingSQLBuilder;
import org.apache.shardingsphere.scaling.core.job.ShardingScalingJob;
import
org.apache.shardingsphere.scaling.core.job.check.AbstractDataConsistencyChecker;
import
org.apache.shardingsphere.scaling.core.job.check.DataConsistencyCheckResult;
@@ -44,17 +44,7 @@ public final class FixtureDataConsistencyChecker extends
AbstractDataConsistency
}
@Override
- protected AbstractSQLBuilder getSqlBuilder() {
- return new AbstractSQLBuilder(Maps.newHashMap()) {
- @Override
- protected String getLeftIdentifierQuoteString() {
- return "`";
- }
-
- @Override
- protected String getRightIdentifierQuoteString() {
- return "`";
- }
- };
+ protected ScalingSQLBuilder getSqlBuilder() {
+ return new FixtureScalingSQLBuilder(Maps.newHashMap());
}
}
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureH2ScalingEntry.java
b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureH2ScalingEntry.java
index 3c47c07..f834cbc 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureH2ScalingEntry.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureH2ScalingEntry.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.scaling.core.fixture;
import
org.apache.shardingsphere.scaling.core.execute.executor.dumper.JDBCDumper;
import
org.apache.shardingsphere.scaling.core.execute.executor.dumper.LogDumper;
import
org.apache.shardingsphere.scaling.core.execute.executor.importer.Importer;
+import
org.apache.shardingsphere.scaling.core.execute.executor.sqlbuilder.ScalingSQLBuilder;
import org.apache.shardingsphere.scaling.core.job.check.DataConsistencyChecker;
import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
import
org.apache.shardingsphere.scaling.core.job.preparer.checker.DataSourceChecker;
@@ -58,6 +59,11 @@ public final class FixtureH2ScalingEntry implements
ScalingEntry {
}
@Override
+ public Class<? extends ScalingSQLBuilder> getSQLBuilderClass() {
+ return FixtureScalingSQLBuilder.class;
+ }
+
+ @Override
public String getDatabaseType() {
return "H2";
}
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/InventoryDumperConfiguration.java
b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureScalingSQLBuilder.java
similarity index 53%
copy from
shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/InventoryDumperConfiguration.java
copy to
shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureScalingSQLBuilder.java
index d004c52..1ed5372 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/config/InventoryDumperConfiguration.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureScalingSQLBuilder.java
@@ -15,27 +15,27 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.scaling.core.config;
+package org.apache.shardingsphere.scaling.core.fixture;
-import lombok.Getter;
-import lombok.Setter;
+import
org.apache.shardingsphere.scaling.core.execute.executor.sqlbuilder.AbstractScalingSQLBuilder;
+import
org.apache.shardingsphere.scaling.core.execute.executor.sqlbuilder.ScalingSQLBuilder;
-/**
- * Inventory dumper configuration.
- */
-@Getter
-@Setter
-public final class InventoryDumperConfiguration extends DumperConfiguration {
-
- private String tableName;
+import java.util.Map;
+import java.util.Set;
+
+public final class FixtureScalingSQLBuilder extends AbstractScalingSQLBuilder
implements ScalingSQLBuilder {
- private String primaryKey;
+ public FixtureScalingSQLBuilder(final Map<String, Set<String>>
shardingColumnsMap) {
+ super(shardingColumnsMap);
+ }
- private Integer spiltNum;
+ @Override
+ protected String getLeftIdentifierQuoteString() {
+ return "`";
+ }
- public InventoryDumperConfiguration(final DumperConfiguration
dumperConfig) {
- setDataSourceName(dumperConfig.getDataSourceName());
- setDataSourceConfig(dumperConfig.getDataSourceConfig());
- setTableNameMap(dumperConfig.getTableNameMap());
+ @Override
+ protected String getRightIdentifierQuoteString() {
+ return "`";
}
}
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryDataTaskSplitterTest.java
b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryDataTaskSplitterTest.java
index 9a61cb8..f3d75a6 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryDataTaskSplitterTest.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/preparer/splitter/InventoryDataTaskSplitterTest.java
@@ -19,10 +19,12 @@ package
org.apache.shardingsphere.scaling.core.job.preparer.splitter;
import org.apache.shardingsphere.scaling.core.config.DumperConfiguration;
import org.apache.shardingsphere.scaling.core.config.ImporterConfiguration;
-import
org.apache.shardingsphere.scaling.core.config.datasource.StandardJDBCDataSourceConfiguration;
-import
org.apache.shardingsphere.scaling.core.config.datasource.DataSourceConfiguration;
+import org.apache.shardingsphere.scaling.core.config.JobConfiguration;
import org.apache.shardingsphere.scaling.core.config.SyncConfiguration;
+import
org.apache.shardingsphere.scaling.core.config.datasource.DataSourceConfiguration;
+import
org.apache.shardingsphere.scaling.core.config.datasource.StandardJDBCDataSourceConfiguration;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
+import org.apache.shardingsphere.scaling.core.job.position.PrimaryKeyPosition;
import org.apache.shardingsphere.scaling.core.job.task.ScalingTask;
import org.junit.After;
import org.junit.Before;
@@ -34,6 +36,7 @@ import java.sql.SQLException;
import java.sql.Statement;
import java.util.Collection;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import static org.hamcrest.CoreMatchers.is;
@@ -48,6 +51,8 @@ public final class InventoryDataTaskSplitterTest {
private static final String PASSWORD = "password";
+ private static final String DATABASE_TYPE = "H2";
+
private SyncConfiguration syncConfig;
private DataSourceManager dataSourceManager;
@@ -58,7 +63,7 @@ public final class InventoryDataTaskSplitterTest {
public void setUp() {
DumperConfiguration dumperConfig = mockDumperConfig();
ImporterConfiguration importerConfig = new ImporterConfiguration();
- syncConfig = new SyncConfiguration(3, dumperConfig, importerConfig);
+ syncConfig = new SyncConfiguration(new JobConfiguration(),
dumperConfig, importerConfig);
dataSourceManager = new DataSourceManager();
inventoryDataTaskSplitter = new InventoryDataTaskSplitter();
}
@@ -70,16 +75,19 @@ public final class InventoryDataTaskSplitterTest {
@Test
public void assertSplitInventoryDataWithIntPrimary() throws SQLException {
+ syncConfig.getJobConfig().setShardingSize(10);
initIntPrimaryEnvironment(syncConfig.getDumperConfig());
- Collection<ScalingTask> actual =
inventoryDataTaskSplitter.splitInventoryData(syncConfig, dataSourceManager);
+ List<ScalingTask> actual = (List<ScalingTask>)
inventoryDataTaskSplitter.splitInventoryData(DATABASE_TYPE, syncConfig,
dataSourceManager);
assertNotNull(actual);
- assertThat(actual.size(), is(3));
+ assertThat(actual.size(), is(10));
+ assertThat(((PrimaryKeyPosition)
actual.get(9).getPositionManager().getPosition()).getBeginValue(), is(91L));
+ assertThat(((PrimaryKeyPosition)
actual.get(9).getPositionManager().getPosition()).getEndValue(), is(100L));
}
@Test
public void assertSplitInventoryDataWithCharPrimary() throws SQLException {
initCharPrimaryEnvironment(syncConfig.getDumperConfig());
- Collection<ScalingTask> actual =
inventoryDataTaskSplitter.splitInventoryData(syncConfig, dataSourceManager);
+ Collection<ScalingTask> actual =
inventoryDataTaskSplitter.splitInventoryData(DATABASE_TYPE, syncConfig,
dataSourceManager);
assertNotNull(actual);
assertThat(actual.size(), is(1));
}
@@ -87,7 +95,7 @@ public final class InventoryDataTaskSplitterTest {
@Test
public void assertSplitInventoryDataWithUnionPrimary() throws SQLException
{
initUnionPrimaryEnvironment(syncConfig.getDumperConfig());
- Collection<ScalingTask> actual =
inventoryDataTaskSplitter.splitInventoryData(syncConfig, dataSourceManager);
+ Collection<ScalingTask> actual =
inventoryDataTaskSplitter.splitInventoryData(DATABASE_TYPE, syncConfig,
dataSourceManager);
assertNotNull(actual);
assertThat(actual.size(), is(1));
}
@@ -95,7 +103,7 @@ public final class InventoryDataTaskSplitterTest {
@Test
public void assertSplitInventoryDataWithoutPrimary() throws SQLException {
initNoPrimaryEnvironment(syncConfig.getDumperConfig());
- Collection<ScalingTask> actual =
inventoryDataTaskSplitter.splitInventoryData(syncConfig, dataSourceManager);
+ Collection<ScalingTask> actual =
inventoryDataTaskSplitter.splitInventoryData(DATABASE_TYPE, syncConfig,
dataSourceManager);
assertNotNull(actual);
assertThat(actual.size(), is(1));
}
@@ -103,17 +111,19 @@ public final class InventoryDataTaskSplitterTest {
private void initIntPrimaryEnvironment(final DumperConfiguration
dumperConfig) throws SQLException {
DataSource dataSource =
dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig());
try (Connection connection = dataSource.getConnection();
- Statement statement = connection.createStatement()) {
+ Statement statement = connection.createStatement()) {
statement.execute("DROP TABLE IF EXISTS t_order");
statement.execute("CREATE TABLE t_order (id INT PRIMARY KEY,
user_id VARCHAR(12))");
- statement.execute("INSERT INTO t_order (id, user_id) VALUES (1,
'xxx'), (999, 'yyy')");
+ for (int i = 1; i <= 100; i++) {
+ statement.execute(String.format("INSERT INTO t_order (id,
user_id) VALUES (%d, 'x')", i));
+ }
}
}
private void initCharPrimaryEnvironment(final DumperConfiguration
dumperConfig) throws SQLException {
DataSource dataSource =
dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig());
try (Connection connection = dataSource.getConnection();
- Statement statement = connection.createStatement()) {
+ Statement statement = connection.createStatement()) {
statement.execute("DROP TABLE IF EXISTS t_order");
statement.execute("CREATE TABLE t_order (id CHAR(3) PRIMARY KEY,
user_id VARCHAR(12))");
statement.execute("INSERT INTO t_order (id, user_id) VALUES ('1',
'xxx'), ('999', 'yyy')");
@@ -123,7 +133,7 @@ public final class InventoryDataTaskSplitterTest {
private void initUnionPrimaryEnvironment(final DumperConfiguration
dumperConfig) throws SQLException {
DataSource dataSource =
dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig());
try (Connection connection = dataSource.getConnection();
- Statement statement = connection.createStatement()) {
+ Statement statement = connection.createStatement()) {
statement.execute("DROP TABLE IF EXISTS t_order");
statement.execute("CREATE TABLE t_order (id INT, user_id
VARCHAR(12), PRIMARY KEY (id, user_id))");
statement.execute("INSERT INTO t_order (id, user_id) VALUES (1,
'xxx'), (999, 'yyy')");
@@ -133,7 +143,7 @@ public final class InventoryDataTaskSplitterTest {
private void initNoPrimaryEnvironment(final DumperConfiguration
dumperConfig) throws SQLException {
DataSource dataSource =
dataSourceManager.getDataSource(dumperConfig.getDataSourceConfig());
try (Connection connection = dataSource.getConnection();
- Statement statement = connection.createStatement()) {
+ Statement statement = connection.createStatement()) {
statement.execute("DROP TABLE IF EXISTS t_order");
statement.execute("CREATE TABLE t_order (id INT, user_id
VARCHAR(12))");
statement.execute("INSERT INTO t_order (id, user_id) VALUES (1,
'xxx'), (999, 'yyy')");
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTaskTest.java
b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTaskTest.java
index 9d25a50..60a4c7d 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTaskTest.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/job/task/inventory/InventoryDataScalingTaskTest.java
@@ -20,6 +20,7 @@ package
org.apache.shardingsphere.scaling.core.job.task.inventory;
import org.apache.shardingsphere.scaling.core.config.DumperConfiguration;
import org.apache.shardingsphere.scaling.core.config.ImporterConfiguration;
import
org.apache.shardingsphere.scaling.core.config.InventoryDumperConfiguration;
+import org.apache.shardingsphere.scaling.core.config.JobConfiguration;
import
org.apache.shardingsphere.scaling.core.config.datasource.StandardJDBCDataSourceConfiguration;
import org.apache.shardingsphere.scaling.core.config.ScalingContext;
import
org.apache.shardingsphere.scaling.core.config.datasource.DataSourceConfiguration;
@@ -58,7 +59,7 @@ public final class InventoryDataScalingTaskTest {
DumperConfiguration dumperConfig = mockDumperConfig();
ImporterConfiguration importerConfig = mockImporterConfig();
ScalingContext.getInstance().init(new ServerConfiguration());
- syncConfig = new SyncConfiguration(3, dumperConfig, importerConfig);
+ syncConfig = new SyncConfiguration(new JobConfiguration(),
dumperConfig, importerConfig);
dataSourceManager = new DataSourceManager();
}
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLScalingEntry.java
b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLScalingEntry.java
index bef398a..092b4df 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLScalingEntry.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLScalingEntry.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.scaling.mysql;
import
org.apache.shardingsphere.scaling.core.execute.executor.dumper.JDBCDumper;
import
org.apache.shardingsphere.scaling.core.execute.executor.dumper.LogDumper;
import
org.apache.shardingsphere.scaling.core.execute.executor.importer.Importer;
+import
org.apache.shardingsphere.scaling.core.execute.executor.sqlbuilder.ScalingSQLBuilder;
import org.apache.shardingsphere.scaling.core.job.check.DataConsistencyChecker;
import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
import
org.apache.shardingsphere.scaling.core.job.preparer.checker.DataSourceChecker;
@@ -30,6 +31,7 @@ import
org.apache.shardingsphere.scaling.mysql.component.MySQLDataSourceChecker;
import org.apache.shardingsphere.scaling.mysql.component.MySQLImporter;
import org.apache.shardingsphere.scaling.mysql.component.MySQLJdbcDumper;
import org.apache.shardingsphere.scaling.mysql.component.MySQLPositionManager;
+import
org.apache.shardingsphere.scaling.mysql.component.MySQLScalingSQLBuilder;
/**
* MySQL scaling entry.
@@ -67,6 +69,11 @@ public final class MySQLScalingEntry implements ScalingEntry
{
}
@Override
+ public Class<? extends ScalingSQLBuilder> getSQLBuilderClass() {
+ return MySQLScalingSQLBuilder.class;
+ }
+
+ @Override
public String getDatabaseType() {
return "MySQL";
}
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/MySQLDataConsistencyChecker.java
b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/MySQLDataConsistencyChecker.java
index d05581e..bdaaf1a 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/MySQLDataConsistencyChecker.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/MySQLDataConsistencyChecker.java
@@ -97,7 +97,7 @@ public final class MySQLDataConsistencyChecker extends
AbstractDataConsistencyCh
}
@Override
- protected MySQLSQLBuilder getSqlBuilder() {
- return new MySQLSQLBuilder(Maps.newHashMap());
+ protected MySQLScalingSQLBuilder getSqlBuilder() {
+ return new MySQLScalingSQLBuilder(Maps.newHashMap());
}
}
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/MySQLImporter.java
b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/MySQLImporter.java
index 8de96a1..39f2155 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/MySQLImporter.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/MySQLImporter.java
@@ -21,7 +21,7 @@ import com.google.common.collect.ImmutableMap;
import org.apache.shardingsphere.scaling.core.config.ImporterConfiguration;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
import
org.apache.shardingsphere.scaling.core.execute.executor.importer.AbstractJDBCImporter;
-import
org.apache.shardingsphere.scaling.core.execute.executor.importer.AbstractSQLBuilder;
+import
org.apache.shardingsphere.scaling.core.execute.executor.sqlbuilder.ScalingSQLBuilder;
import org.apache.shardingsphere.scaling.core.utils.JDBCUtil;
import java.util.Map;
@@ -38,7 +38,7 @@ public final class MySQLImporter extends AbstractJDBCImporter
{
}
@Override
- protected AbstractSQLBuilder createSQLBuilder(final Map<String,
Set<String>> shardingColumnsMap) {
- return new MySQLSQLBuilder(shardingColumnsMap);
+ protected ScalingSQLBuilder createSQLBuilder(final Map<String,
Set<String>> shardingColumnsMap) {
+ return new MySQLScalingSQLBuilder(shardingColumnsMap);
}
}
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/MySQLSQLBuilder.java
b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/MySQLScalingSQLBuilder.java
similarity index 87%
rename from
shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/MySQLSQLBuilder.java
rename to
shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/MySQLScalingSQLBuilder.java
index 608dc62..4c308b6 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/MySQLSQLBuilder.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/component/MySQLScalingSQLBuilder.java
@@ -17,9 +17,10 @@
package org.apache.shardingsphere.scaling.mysql.component;
-import
org.apache.shardingsphere.scaling.core.execute.executor.importer.AbstractSQLBuilder;
import org.apache.shardingsphere.scaling.core.execute.executor.record.Column;
import
org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord;
+import
org.apache.shardingsphere.scaling.core.execute.executor.sqlbuilder.AbstractScalingSQLBuilder;
+import
org.apache.shardingsphere.scaling.core.execute.executor.sqlbuilder.ScalingSQLBuilder;
import org.apache.shardingsphere.scaling.core.utils.ShardingColumnsUtil;
import java.util.Map;
@@ -28,9 +29,9 @@ import java.util.Set;
/**
* MySQL SQL builder.
*/
-public final class MySQLSQLBuilder extends AbstractSQLBuilder {
+public final class MySQLScalingSQLBuilder extends AbstractScalingSQLBuilder
implements ScalingSQLBuilder {
- public MySQLSQLBuilder(final Map<String, Set<String>> shardingColumnsMap) {
+ public MySQLScalingSQLBuilder(final Map<String, Set<String>>
shardingColumnsMap) {
super(shardingColumnsMap);
}
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/component/MySQLSQLBuilderTest.java
b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/component/MySQLScalingSQLBuilderTest.java
similarity index 92%
rename from
shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/component/MySQLSQLBuilderTest.java
rename to
shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/component/MySQLScalingSQLBuilderTest.java
index 94203bc..61ea317 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/component/MySQLSQLBuilderTest.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/component/MySQLScalingSQLBuilderTest.java
@@ -29,9 +29,9 @@ import java.util.Set;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
-public final class MySQLSQLBuilderTest {
+public final class MySQLScalingSQLBuilderTest {
- private final MySQLSQLBuilder sqlBuilder = new
MySQLSQLBuilder(ImmutableMap.<String, Set<String>>builder().put("t2",
Sets.newHashSet("sc")).build());
+ private final MySQLScalingSQLBuilder sqlBuilder = new
MySQLScalingSQLBuilder(ImmutableMap.<String, Set<String>>builder().put("t2",
Sets.newHashSet("sc")).build());
@Test
public void assertBuildInsertSQL() {
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLScalingEntry.java
b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLScalingEntry.java
index 42fa66f..6ddec40 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLScalingEntry.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLScalingEntry.java
@@ -20,6 +20,7 @@ package org.apache.shardingsphere.scaling.postgresql;
import
org.apache.shardingsphere.scaling.core.execute.executor.dumper.JDBCDumper;
import
org.apache.shardingsphere.scaling.core.execute.executor.dumper.LogDumper;
import
org.apache.shardingsphere.scaling.core.execute.executor.importer.Importer;
+import
org.apache.shardingsphere.scaling.core.execute.executor.sqlbuilder.ScalingSQLBuilder;
import org.apache.shardingsphere.scaling.core.job.check.DataConsistencyChecker;
import org.apache.shardingsphere.scaling.core.job.position.PositionManager;
import
org.apache.shardingsphere.scaling.core.job.preparer.checker.DataSourceChecker;
@@ -29,6 +30,7 @@ import
org.apache.shardingsphere.scaling.postgresql.component.PostgreSQLDataSour
import
org.apache.shardingsphere.scaling.postgresql.component.PostgreSQLImporter;
import
org.apache.shardingsphere.scaling.postgresql.component.PostgreSQLJdbcDumper;
import
org.apache.shardingsphere.scaling.postgresql.component.PostgreSQLPositionManager;
+import
org.apache.shardingsphere.scaling.postgresql.component.PostgreSQLScalingSQLBuilder;
import
org.apache.shardingsphere.scaling.postgresql.component.PostgreSQLWalDumper;
/**
@@ -67,6 +69,11 @@ public final class PostgreSQLScalingEntry implements
ScalingEntry {
}
@Override
+ public Class<? extends ScalingSQLBuilder> getSQLBuilderClass() {
+ return PostgreSQLScalingSQLBuilder.class;
+ }
+
+ @Override
public String getDatabaseType() {
return "PostgreSQL";
}
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLDataConsistencyChecker.java
b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLDataConsistencyChecker.java
index 8e02c3d..2063120 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLDataConsistencyChecker.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLDataConsistencyChecker.java
@@ -18,7 +18,7 @@
package org.apache.shardingsphere.scaling.postgresql.component;
import com.google.common.collect.Maps;
-import
org.apache.shardingsphere.scaling.core.execute.executor.importer.AbstractSQLBuilder;
+import
org.apache.shardingsphere.scaling.core.execute.executor.sqlbuilder.ScalingSQLBuilder;
import org.apache.shardingsphere.scaling.core.job.ShardingScalingJob;
import
org.apache.shardingsphere.scaling.core.job.check.AbstractDataConsistencyChecker;
import org.apache.shardingsphere.scaling.core.job.check.DataConsistencyChecker;
@@ -41,7 +41,7 @@ public final class PostgreSQLDataConsistencyChecker extends
AbstractDataConsiste
}
@Override
- protected AbstractSQLBuilder getSqlBuilder() {
- return new PostgreSQLSQLBuilder(Maps.newHashMap());
+ protected ScalingSQLBuilder getSqlBuilder() {
+ return new PostgreSQLScalingSQLBuilder(Maps.newHashMap());
}
}
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLImporter.java
b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLImporter.java
index 2409029..bae6296 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLImporter.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLImporter.java
@@ -20,7 +20,7 @@ package
org.apache.shardingsphere.scaling.postgresql.component;
import org.apache.shardingsphere.scaling.core.config.ImporterConfiguration;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
import
org.apache.shardingsphere.scaling.core.execute.executor.importer.AbstractJDBCImporter;
-import
org.apache.shardingsphere.scaling.core.execute.executor.importer.AbstractSQLBuilder;
+import
org.apache.shardingsphere.scaling.core.execute.executor.sqlbuilder.ScalingSQLBuilder;
import java.util.Map;
import java.util.Set;
@@ -35,8 +35,8 @@ public final class PostgreSQLImporter extends
AbstractJDBCImporter {
}
@Override
- protected AbstractSQLBuilder createSQLBuilder(final Map<String,
Set<String>> shardingColumnsMap) {
- return new PostgreSQLSQLBuilder(shardingColumnsMap);
+ protected ScalingSQLBuilder createSQLBuilder(final Map<String,
Set<String>> shardingColumnsMap) {
+ return new PostgreSQLScalingSQLBuilder(shardingColumnsMap);
}
}
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLSQLBuilder.java
b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLScalingSQLBuilder.java
similarity index 83%
rename from
shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLSQLBuilder.java
rename to
shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLScalingSQLBuilder.java
index b5bb520..5316bb6 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLSQLBuilder.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLScalingSQLBuilder.java
@@ -20,7 +20,8 @@ package
org.apache.shardingsphere.scaling.postgresql.component;
import org.apache.shardingsphere.scaling.core.execute.executor.record.Column;
import
org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord;
import
org.apache.shardingsphere.scaling.core.execute.executor.record.RecordUtil;
-import
org.apache.shardingsphere.scaling.core.execute.executor.importer.AbstractSQLBuilder;
+import
org.apache.shardingsphere.scaling.core.execute.executor.sqlbuilder.AbstractScalingSQLBuilder;
+import
org.apache.shardingsphere.scaling.core.execute.executor.sqlbuilder.ScalingSQLBuilder;
import java.util.Map;
import java.util.Set;
@@ -28,9 +29,9 @@ import java.util.Set;
/**
* PostgreSQL SQL builder.
*/
-public final class PostgreSQLSQLBuilder extends AbstractSQLBuilder {
+public final class PostgreSQLScalingSQLBuilder extends
AbstractScalingSQLBuilder implements ScalingSQLBuilder {
- public PostgreSQLSQLBuilder(final Map<String, Set<String>>
shardingColumnsMap) {
+ public PostgreSQLScalingSQLBuilder(final Map<String, Set<String>>
shardingColumnsMap) {
super(shardingColumnsMap);
}
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLSqlBuilderTest.java
b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLScalingSQLBuilderTest.java
similarity index 91%
rename from
shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLSqlBuilderTest.java
rename to
shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLScalingSQLBuilderTest.java
index 247a6b2..70bf3c2 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLSqlBuilderTest.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLScalingSQLBuilderTest.java
@@ -27,11 +27,11 @@ import org.postgresql.replication.LogSequenceNumber;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
-public final class PostgreSQLSqlBuilderTest {
+public final class PostgreSQLScalingSQLBuilderTest {
@Test
public void assertBuildInsertSQL() {
- String actual = new
PostgreSQLSQLBuilder(Maps.newHashMap()).buildInsertSQL(mockDataRecord());
+ String actual = new
PostgreSQLScalingSQLBuilder(Maps.newHashMap()).buildInsertSQL(mockDataRecord());
assertThat(actual, is("INSERT INTO \"t_order\"(\"id\",\"name\")
VALUES(?,?) ON CONFLICT (id) DO NOTHING"));
}