This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 7a2b03c Use insert on duplicate key update in mysql insert (#8004)
7a2b03c is described below
commit 7a2b03c48d0b8c5d967df1d43607484ecd49a6e2
Author: avalon5666 <[email protected]>
AuthorDate: Wed Nov 4 16:06:22 2020 +0800
Use insert on duplicate key update in mysql insert (#8004)
* Refactor AbstractSQLBuilder & AbstractJDBCImporter
* Move extract sql paramters code from AbstractJDBCImporter to
AbstractSQLBuilder
* Refactor AbstractSQLBuilder
* Refactor AbstractJDBCImporter
* Make columns of DataRecord private
* Use insert on duplicate key update in mysql insert.
---
.../executor/importer/AbstractJDBCImporter.java | 41 ++++-------
.../executor/importer/AbstractSQLBuilder.java | 82 +++++++++++++---------
.../execute/executor/importer/PreparedSQL.java} | 34 ++++-----
.../core/execute/executor/record/DataRecord.java | 6 +-
.../core/execute/executor/record/RecordUtil.java | 55 +++++++--------
.../importer/AbstractJDBCImporterTest.java | 19 ++---
.../executor/importer/AbstractSqlBuilderTest.java | 74 ++++++++++++++-----
.../fixture/FixtureDataConsistencyChecker.java | 3 +-
.../scaling/mysql/MySQLDataConsistencyChecker.java | 3 +-
.../scaling/mysql/MySQLImporter.java | 7 +-
.../scaling/mysql/MySQLSQLBuilder.java | 28 ++++++++
.../scaling/mysql/MySQLImporterTest.java | 8 ++-
.../scaling/mysql/MySQLSqlBuilderTest.java} | 18 +++--
.../PostgreSQLDataConsistencyChecker.java | 3 +-
.../scaling/postgresql/PostgreSQLImporter.java | 7 +-
.../scaling/postgresql/PostgreSQLSQLBuilder.java | 20 ++++--
.../scaling/postgresql/PostgreSQLImporterTest.java | 8 ++-
.../postgresql/PostgreSQLSqlBuilderTest.java | 8 ++-
18 files changed, 254 insertions(+), 170 deletions(-)
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporter.java
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporter.java
index 2271cfa..e36eabc 100755
---
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporter.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporter.java
@@ -25,11 +25,9 @@ import
org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
import
org.apache.shardingsphere.scaling.core.exception.SyncTaskExecuteException;
import
org.apache.shardingsphere.scaling.core.execute.executor.AbstractShardingScalingExecutor;
import org.apache.shardingsphere.scaling.core.execute.executor.channel.Channel;
-import org.apache.shardingsphere.scaling.core.execute.executor.record.Column;
import
org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord;
import
org.apache.shardingsphere.scaling.core.execute.executor.record.FinishedRecord;
import org.apache.shardingsphere.scaling.core.execute.executor.record.Record;
-import
org.apache.shardingsphere.scaling.core.execute.executor.record.RecordUtil;
import org.apache.shardingsphere.scaling.core.job.position.IncrementalPosition;
import javax.sql.DataSource;
@@ -37,9 +35,10 @@ import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.SQLIntegrityConstraintViolationException;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
/**
* Abstract JDBC importer implementation.
@@ -59,7 +58,7 @@ public abstract class AbstractJDBCImporter extends
AbstractShardingScalingExecut
protected AbstractJDBCImporter(final ImporterConfiguration importerConfig,
final DataSourceManager dataSourceManager) {
this.importerConfig = importerConfig;
this.dataSourceManager = dataSourceManager;
- sqlBuilder = createSQLBuilder();
+ sqlBuilder = createSQLBuilder(importerConfig.getShardingColumnsMap());
}
/**
@@ -67,7 +66,7 @@ public abstract class AbstractJDBCImporter extends
AbstractShardingScalingExecut
*
* @return SQL builder
*/
- protected abstract AbstractSQLBuilder createSQLBuilder();
+ protected abstract AbstractSQLBuilder createSQLBuilder(Map<String,
Set<String>> shardingColumnsMap);
@Override
public final void start() {
@@ -141,37 +140,25 @@ public abstract class AbstractJDBCImporter extends
AbstractShardingScalingExecut
}
private void executeInsert(final Connection connection, final DataRecord
record) throws SQLException {
- String insertSql = sqlBuilder.buildInsertSQL(record);
- PreparedStatement ps = connection.prepareStatement(insertSql);
- ps.setQueryTimeout(30);
try {
- for (int i = 0; i < record.getColumnCount(); i++) {
- ps.setObject(i + 1, record.getColumn(i).getValue());
- }
- ps.execute();
+ executeSQL(connection, record, sqlBuilder.buildInsertSQL(record));
} catch (final SQLIntegrityConstraintViolationException ignored) {
}
}
private void executeUpdate(final Connection connection, final DataRecord
record) throws SQLException {
- List<Column> conditionColumns =
RecordUtil.extractConditionColumns(record,
importerConfig.getShardingColumnsMap().get(record.getTableName()));
- List<Column> values = new ArrayList<>();
- values.addAll(RecordUtil.extractUpdatedColumns(record));
- values.addAll(conditionColumns);
- String updateSql = sqlBuilder.buildUpdateSQL(record, conditionColumns);
- PreparedStatement ps = connection.prepareStatement(updateSql);
- for (int i = 0; i < values.size(); i++) {
- ps.setObject(i + 1, values.get(i).getValue());
- }
- ps.execute();
+ executeSQL(connection, record, sqlBuilder.buildUpdateSQL(record));
}
private void executeDelete(final Connection connection, final DataRecord
record) throws SQLException {
- List<Column> conditionColumns =
RecordUtil.extractConditionColumns(record,
importerConfig.getShardingColumnsMap().get(record.getTableName()));
- String deleteSql = sqlBuilder.buildDeleteSQL(record, conditionColumns);
- PreparedStatement ps = connection.prepareStatement(deleteSql);
- for (int i = 0; i < conditionColumns.size(); i++) {
- ps.setObject(i + 1, conditionColumns.get(i).getValue());
+ executeSQL(connection, record, sqlBuilder.buildDeleteSQL(record));
+ }
+
+ private void executeSQL(final Connection connection, final DataRecord
record, final PreparedSQL preparedSQL) throws SQLException {
+ PreparedStatement ps =
connection.prepareStatement(preparedSQL.getSql());
+ for (int i = 0; i < preparedSQL.getValuesIndex().size(); i++) {
+ int columnIndex = preparedSQL.getValuesIndex().get(i);
+ ps.setObject(i + 1, record.getColumn(columnIndex).getValue());
}
ps.execute();
}
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractSQLBuilder.java
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractSQLBuilder.java
index e102b05..fc191df 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractSQLBuilder.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractSQLBuilder.java
@@ -17,18 +17,21 @@
package org.apache.shardingsphere.scaling.core.execute.executor.importer;
-import com.google.common.collect.Collections2;
-import org.apache.shardingsphere.scaling.core.execute.executor.record.Column;
+import lombok.RequiredArgsConstructor;
import
org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord;
+import
org.apache.shardingsphere.scaling.core.execute.executor.record.RecordUtil;
-import java.util.Collection;
+import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* Abstract SQL builder.
*/
+@RequiredArgsConstructor
public abstract class AbstractSQLBuilder {
private static final String INSERT_SQL_CACHE_KEY_PREFIX = "INSERT_";
@@ -37,7 +40,9 @@ public abstract class AbstractSQLBuilder {
private static final String DELETE_SQL_CACHE_KEY_PREFIX = "DELETE_";
- private final ConcurrentMap<String, String> sqlCacheMap = new
ConcurrentHashMap<>();
+ private final Map<String, Set<String>> shardingColumnsMap;
+
+ private final ConcurrentMap<String, PreparedSQL> sqlCacheMap = new
ConcurrentHashMap<>();
/**
* Get left identifier quote string.
@@ -67,79 +72,90 @@ public abstract class AbstractSQLBuilder {
* Build insert SQL.
*
* @param dataRecord data record
- * @return insert SQL
+ * @return insert prepared SQL
*/
- public String buildInsertSQL(final DataRecord dataRecord) {
+ public PreparedSQL buildInsertSQL(final DataRecord dataRecord) {
String sqlCacheKey = INSERT_SQL_CACHE_KEY_PREFIX +
dataRecord.getTableName();
if (!sqlCacheMap.containsKey(sqlCacheKey)) {
- sqlCacheMap.put(sqlCacheKey,
buildInsertSQLInternal(dataRecord.getTableName(), dataRecord.getColumns()));
+ sqlCacheMap.put(sqlCacheKey, buildInsertSQLInternal(dataRecord));
}
return sqlCacheMap.get(sqlCacheKey);
}
- private String buildInsertSQLInternal(final String tableName, final
List<Column> columns) {
+ protected PreparedSQL buildInsertSQLInternal(final DataRecord dataRecord) {
StringBuilder columnsLiteral = new StringBuilder();
StringBuilder holder = new StringBuilder();
- for (Column each : columns) {
- columnsLiteral.append(String.format("%s,", quote(each.getName())));
+ List<Integer> valuesIndex = new ArrayList<>();
+ for (int i = 0; i < dataRecord.getColumnCount(); i++) {
+ columnsLiteral.append(String.format("%s,",
quote(dataRecord.getColumn(i).getName())));
holder.append("?,");
+ valuesIndex.add(i);
}
columnsLiteral.setLength(columnsLiteral.length() - 1);
holder.setLength(holder.length() - 1);
- return String.format("INSERT INTO %s(%s) VALUES(%s)",
quote(tableName), columnsLiteral, holder);
+ return new PreparedSQL(
+ String.format("INSERT INTO %s(%s) VALUES(%s)",
quote(dataRecord.getTableName()), columnsLiteral, holder),
+ valuesIndex);
}
/**
* Build update SQL.
*
* @param dataRecord data record
- * @param conditionColumns condition columns
- * @return update SQL
+ * @return update prepared SQL
*/
- public String buildUpdateSQL(final DataRecord dataRecord, final
Collection<Column> conditionColumns) {
+ public PreparedSQL buildUpdateSQL(final DataRecord dataRecord) {
String sqlCacheKey = UPDATE_SQL_CACHE_KEY_PREFIX +
dataRecord.getTableName();
if (!sqlCacheMap.containsKey(sqlCacheKey)) {
- sqlCacheMap.put(sqlCacheKey,
buildUpdateSQLInternal(dataRecord.getTableName(), conditionColumns));
+ sqlCacheMap.put(sqlCacheKey, buildUpdateSQLInternal(dataRecord));
}
StringBuilder updatedColumnString = new StringBuilder();
- for (Column each : extractUpdatedColumns(dataRecord.getColumns())) {
- updatedColumnString.append(String.format("%s = ?,",
quote(each.getName())));
+ List<Integer> valuesIndex = new ArrayList<>();
+ for (Integer each : RecordUtil.extractUpdatedColumns(dataRecord)) {
+ updatedColumnString.append(String.format("%s = ?,",
quote(dataRecord.getColumn(each).getName())));
+ valuesIndex.add(each);
}
updatedColumnString.setLength(updatedColumnString.length() - 1);
- return String.format(sqlCacheMap.get(sqlCacheKey),
updatedColumnString);
- }
-
- private String buildUpdateSQLInternal(final String tableName, final
Collection<Column> conditionColumns) {
- return String.format("UPDATE %s SET %%s WHERE %s", quote(tableName),
buildWhereSQL(conditionColumns));
+ PreparedSQL preparedSQL = sqlCacheMap.get(sqlCacheKey);
+ valuesIndex.addAll(preparedSQL.getValuesIndex());
+ return new PreparedSQL(
+ String.format(preparedSQL.getSql(), updatedColumnString),
+ valuesIndex);
}
- private Collection<Column> extractUpdatedColumns(final Collection<Column>
columns) {
- return Collections2.filter(columns, Column::isUpdated);
+ private PreparedSQL buildUpdateSQLInternal(final DataRecord dataRecord) {
+ List<Integer> valuesIndex = new ArrayList<>();
+ return new PreparedSQL(
+ String.format("UPDATE %s SET %%s WHERE %s",
quote(dataRecord.getTableName()), buildWhereSQL(dataRecord, valuesIndex)),
+ valuesIndex);
}
/**
* Build delete SQL.
*
* @param dataRecord data record
- * @param conditionColumns condition columns
- * @return delete SQL
+ * @return delete prepared SQL
*/
- public String buildDeleteSQL(final DataRecord dataRecord, final
Collection<Column> conditionColumns) {
+ public PreparedSQL buildDeleteSQL(final DataRecord dataRecord) {
String sqlCacheKey = DELETE_SQL_CACHE_KEY_PREFIX +
dataRecord.getTableName();
if (!sqlCacheMap.containsKey(sqlCacheKey)) {
- sqlCacheMap.put(sqlCacheKey,
buildDeleteSQLInternal(dataRecord.getTableName(), conditionColumns));
+ sqlCacheMap.put(sqlCacheKey, buildDeleteSQLInternal(dataRecord));
}
return sqlCacheMap.get(sqlCacheKey);
}
- private String buildDeleteSQLInternal(final String tableName, final
Collection<Column> conditionColumns) {
- return String.format("DELETE FROM %s WHERE %s", quote(tableName),
buildWhereSQL(conditionColumns));
+ private PreparedSQL buildDeleteSQLInternal(final DataRecord dataRecord) {
+ List<Integer> columnsIndex = new ArrayList<>();
+ return new PreparedSQL(
+ String.format("DELETE FROM %s WHERE %s",
quote(dataRecord.getTableName()), buildWhereSQL(dataRecord, columnsIndex)),
+ columnsIndex);
}
- private String buildWhereSQL(final Collection<Column> conditionColumns) {
+ private String buildWhereSQL(final DataRecord dataRecord, final
List<Integer> valuesIndex) {
StringBuilder where = new StringBuilder();
- for (Column each : conditionColumns) {
- where.append(String.format("%s = ? and ", quote(each.getName())));
+ for (Integer each : RecordUtil.extractConditionColumns(dataRecord,
shardingColumnsMap.get(dataRecord.getTableName()))) {
+ where.append(String.format("%s = ? and ",
quote(dataRecord.getColumn(each).getName())));
+ valuesIndex.add(each);
}
where.setLength(where.length() - 5);
return where.toString();
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLSQLBuilder.java
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/PreparedSQL.java
similarity index 52%
copy from
shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLSQLBuilder.java
copy to
shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/PreparedSQL.java
index f9960b6..fe07e10 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLSQLBuilder.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/PreparedSQL.java
@@ -15,33 +15,25 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.scaling.mysql;
+package org.apache.shardingsphere.scaling.core.execute.executor.importer;
-import
org.apache.shardingsphere.scaling.core.execute.executor.importer.AbstractSQLBuilder;
+import lombok.Getter;
+
+import java.util.Collections;
+import java.util.List;
/**
- * MySQL SQL builder.
+ * Prepared SQL, include complete sql and complete values index list.
*/
-public final class MySQLSQLBuilder extends AbstractSQLBuilder {
+@Getter
+public class PreparedSQL {
- @Override
- public String getLeftIdentifierQuoteString() {
- return "`";
- }
+ private final String sql;
- @Override
- public String getRightIdentifierQuoteString() {
- return "`";
- }
+ private final List<Integer> valuesIndex;
- /**
- * Build select sum crc32 SQL.
- *
- * @param tableName table Name
- * @param column column
- * @return select sum crc32 SQL
- */
- public String buildSumCrc32SQL(final String tableName, final String
column) {
- return String.format("SELECT SUM(CRC32(%s)) from %s", quote(column),
quote(tableName));
+ public PreparedSQL(final String sql, final List<Integer> valuesIndex) {
+ this.sql = sql;
+ this.valuesIndex = Collections.unmodifiableList(valuesIndex);
}
}
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/record/DataRecord.java
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/record/DataRecord.java
index e8594c0..ffb88cd 100755
---
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/record/DataRecord.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/record/DataRecord.java
@@ -31,8 +31,6 @@ import java.util.List;
/**
* Data record.
*/
-@Setter
-@Getter
@EqualsAndHashCode(of = {"tableName", "primaryKeyValue"}, callSuper = false)
@ToString
public final class DataRecord extends Record {
@@ -41,8 +39,12 @@ public final class DataRecord extends Record {
private final List<Object> primaryKeyValue = new LinkedList<>();
+ @Setter
+ @Getter
private String type;
+ @Setter
+ @Getter
private String tableName;
public DataRecord(final Position position, final int columnCount) {
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/record/RecordUtil.java
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/record/RecordUtil.java
index d9e38bb..0a72b08 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/record/RecordUtil.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-core/src/main/java/org/apache/shardingsphere/scaling/core/execute/executor/record/RecordUtil.java
@@ -20,9 +20,10 @@ package
org.apache.shardingsphere.scaling.core.execute.executor.record;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
-import java.util.ArrayList;
import java.util.List;
import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
/**
* Record utility.
@@ -31,51 +32,45 @@ import java.util.Set;
public final class RecordUtil {
/**
- * Extract primary columns from data record.
+ * Extract primary columns index from data record.
*
* @param dataRecord data record
- * @return primary columns
+ * @return primary columns index
*/
- public static List<Column> extractPrimaryColumns(final DataRecord
dataRecord) {
- List<Column> result = new ArrayList<>(dataRecord.getColumns().size());
- for (Column each : dataRecord.getColumns()) {
- if (each.isPrimaryKey()) {
- result.add(each);
- }
- }
- return result;
+ public static List<Integer> extractPrimaryColumns(final DataRecord
dataRecord) {
+ return IntStream.range(0, dataRecord.getColumnCount())
+ .filter(each -> dataRecord.getColumn(each).isPrimaryKey())
+ .mapToObj(each -> each)
+ .collect(Collectors.toList());
}
/**
- * Extract condition columns(include primary and sharding columns) from
data record.
+ * Extract condition columns(include primary and sharding columns) index
from data record.
*
* @param dataRecord data record
* @param shardingColumns sharding columns
- * @return condition columns
+ * @return condition columns index
*/
- public static List<Column> extractConditionColumns(final DataRecord
dataRecord, final Set<String> shardingColumns) {
- List<Column> result = new ArrayList<>(dataRecord.getColumns().size());
- for (Column each : dataRecord.getColumns()) {
- if (each.isPrimaryKey() ||
shardingColumns.contains(each.getName())) {
- result.add(each);
- }
- }
- return result;
+ public static List<Integer> extractConditionColumns(final DataRecord
dataRecord, final Set<String> shardingColumns) {
+ return IntStream.range(0, dataRecord.getColumnCount())
+ .filter(each -> {
+ Column column = dataRecord.getColumn(each);
+ return column.isPrimaryKey() ||
shardingColumns.contains(column.getName());
+ })
+ .mapToObj(each -> each)
+ .collect(Collectors.toList());
}
/**
* Extract updated columns from data record.
*
* @param dataRecord data record
- * @return updated columns
+ * @return updated columns index
*/
- public static List<Column> extractUpdatedColumns(final DataRecord
dataRecord) {
- List<Column> result = new ArrayList<>(dataRecord.getColumns().size());
- for (Column each : dataRecord.getColumns()) {
- if (each.isUpdated()) {
- result.add(each);
- }
- }
- return result;
+ public static List<Integer> extractUpdatedColumns(final DataRecord
dataRecord) {
+ return IntStream.range(0, dataRecord.getColumnCount())
+ .filter(each -> dataRecord.getColumn(each).isUpdated())
+ .mapToObj(each -> each)
+ .collect(Collectors.toList());
}
}
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporterTest.java
b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporterTest.java
index 91e6c77..3283dd0 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporterTest.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractJDBCImporterTest.java
@@ -17,17 +17,17 @@
package org.apache.shardingsphere.scaling.core.execute.executor.importer;
+import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
-import
org.apache.shardingsphere.scaling.core.config.ScalingDataSourceConfiguration;
import org.apache.shardingsphere.scaling.core.config.ImporterConfiguration;
+import
org.apache.shardingsphere.scaling.core.config.ScalingDataSourceConfiguration;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
import org.apache.shardingsphere.scaling.core.execute.executor.channel.Channel;
import org.apache.shardingsphere.scaling.core.execute.executor.record.Column;
import
org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord;
import
org.apache.shardingsphere.scaling.core.execute.executor.record.FinishedRecord;
import org.apache.shardingsphere.scaling.core.execute.executor.record.Record;
-import
org.apache.shardingsphere.scaling.core.execute.executor.record.RecordUtil;
import org.apache.shardingsphere.scaling.core.job.position.NopPosition;
import org.junit.Before;
import org.junit.Test;
@@ -39,7 +39,6 @@ import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
-import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -87,7 +86,7 @@ public final class AbstractJDBCImporterTest {
jdbcImporter = new AbstractJDBCImporter(getImporterConfiguration(),
dataSourceManager) {
@Override
- protected AbstractSQLBuilder createSQLBuilder() {
+ protected AbstractSQLBuilder createSQLBuilder(final Map<String,
Set<String>> shardingColumnsMap) {
return sqlBuilder;
}
};
@@ -97,9 +96,9 @@ public final class AbstractJDBCImporterTest {
}
@Test
- public void assertWriteInsertDataRecord() throws SQLException {
+ public void assertInsertDataRecord() throws SQLException {
DataRecord insertRecord = getDataRecord("INSERT");
- when(sqlBuilder.buildInsertSQL(insertRecord)).thenReturn(INSERT_SQL);
+ when(sqlBuilder.buildInsertSQL(insertRecord)).thenReturn(new
PreparedSQL(INSERT_SQL, Lists.newArrayList(0, 1, 2)));
when(connection.prepareStatement(INSERT_SQL)).thenReturn(preparedStatement);
when(channel.fetchRecords(100,
3)).thenReturn(mockRecords(insertRecord));
jdbcImporter.run();
@@ -112,7 +111,7 @@ public final class AbstractJDBCImporterTest {
@Test
public void assertDeleteDataRecord() throws SQLException {
DataRecord deleteRecord = getDataRecord("DELETE");
- when(sqlBuilder.buildDeleteSQL(deleteRecord,
mockConditionColumns(deleteRecord))).thenReturn(DELETE_SQL);
+ when(sqlBuilder.buildDeleteSQL(deleteRecord)).thenReturn(new
PreparedSQL(DELETE_SQL, Lists.newArrayList(0, 1)));
when(connection.prepareStatement(DELETE_SQL)).thenReturn(preparedStatement);
when(channel.fetchRecords(100,
3)).thenReturn(mockRecords(deleteRecord));
jdbcImporter.run();
@@ -124,7 +123,7 @@ public final class AbstractJDBCImporterTest {
@Test
public void assertUpdateDataRecord() throws SQLException {
DataRecord updateRecord = getDataRecord("UPDATE");
- when(sqlBuilder.buildUpdateSQL(updateRecord,
mockConditionColumns(updateRecord))).thenReturn(UPDATE_SQL);
+ when(sqlBuilder.buildUpdateSQL(updateRecord)).thenReturn(new
PreparedSQL(UPDATE_SQL, Lists.newArrayList(1, 2, 0, 1)));
when(connection.prepareStatement(UPDATE_SQL)).thenReturn(preparedStatement);
when(channel.fetchRecords(100,
3)).thenReturn(mockRecords(updateRecord));
jdbcImporter.run();
@@ -135,10 +134,6 @@ public final class AbstractJDBCImporterTest {
verify(preparedStatement).execute();
}
- private Collection<Column> mockConditionColumns(final DataRecord
dataRecord) {
- return RecordUtil.extractConditionColumns(dataRecord,
Sets.newHashSet("user"));
- }
-
private List<Record> mockRecords(final DataRecord dataRecord) {
List<Record> result = new LinkedList<>();
result.add(dataRecord);
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractSqlBuilderTest.java
b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractSqlBuilderTest.java
index c61a481..4cc12e4 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractSqlBuilderTest.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/execute/executor/importer/AbstractSqlBuilderTest.java
@@ -20,29 +20,37 @@ package
org.apache.shardingsphere.scaling.core.execute.executor.importer;
import com.google.common.collect.Sets;
import org.apache.shardingsphere.scaling.core.execute.executor.record.Column;
import
org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord;
-import
org.apache.shardingsphere.scaling.core.execute.executor.record.RecordUtil;
import org.apache.shardingsphere.scaling.core.job.position.NopPosition;
+import org.hamcrest.Matchers;
import org.junit.Before;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
-import java.util.Collection;
+import java.util.Map;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.when;
+@RunWith(MockitoJUnitRunner.class)
public class AbstractSqlBuilderTest {
+ @Mock
+ private Map shardingColumnsMap;
+
private AbstractSQLBuilder sqlBuilder;
@Before
public void setUp() {
- sqlBuilder = new AbstractSQLBuilder() {
+ sqlBuilder = new AbstractSQLBuilder(shardingColumnsMap) {
@Override
protected String getLeftIdentifierQuoteString() {
return "`";
}
-
+
@Override
protected String getRightIdentifierQuoteString() {
return "`";
@@ -52,38 +60,66 @@ public class AbstractSqlBuilderTest {
@Test
public void assertBuildInsertSQL() {
- String actual = sqlBuilder.buildInsertSQL(mockDataRecord("t1"));
- assertThat(actual, is("INSERT INTO `t1`(`id`,`sc`,`c1`,`c2`,`c3`)
VALUES(?,?,?,?,?)"));
+ PreparedSQL actual = sqlBuilder.buildInsertSQL(mockDataRecord("t1"));
+ assertThat(actual.getSql(), is("INSERT INTO
`t1`(`id`,`sc`,`c1`,`c2`,`c3`) VALUES(?,?,?,?,?)"));
+ assertThat(actual.getValuesIndex().toArray(),
Matchers.arrayContaining(0, 1, 2, 3, 4));
}
@Test
public void assertBuildUpdateSQLWithPrimaryKey() {
- String actual = sqlBuilder.buildUpdateSQL(mockDataRecord("t2"),
RecordUtil.extractPrimaryColumns(mockDataRecord("t2")));
- assertThat(actual, is("UPDATE `t2` SET `c1` = ?,`c2` = ?,`c3` = ?
WHERE `id` = ?"));
+ when(shardingColumnsMap.get("t2")).thenReturn(Sets.newHashSet());
+ PreparedSQL actual = sqlBuilder.buildUpdateSQL(mockDataRecord("t2"));
+ assertThat(actual.getSql(), is("UPDATE `t2` SET `c1` = ?,`c2` = ?,`c3`
= ? WHERE `id` = ?"));
+ assertThat(actual.getValuesIndex().toArray(),
Matchers.arrayContaining(2, 3, 4, 0));
}
@Test
public void assertBuildUpdateSQLWithShardingColumns() {
+ when(shardingColumnsMap.get("t2")).thenReturn(Sets.newHashSet("sc"));
DataRecord dataRecord = mockDataRecord("t2");
- String actual = sqlBuilder.buildUpdateSQL(dataRecord,
mockConditionColumns(dataRecord));
- assertThat(actual, is("UPDATE `t2` SET `c1` = ?,`c2` = ?,`c3` = ?
WHERE `id` = ? and `sc` = ?"));
+ PreparedSQL actual = sqlBuilder.buildUpdateSQL(dataRecord);
+ assertThat(actual.getSql(), is("UPDATE `t2` SET `c1` = ?,`c2` = ?,`c3`
= ? WHERE `id` = ? and `sc` = ?"));
+ assertThat(actual.getValuesIndex().toArray(),
Matchers.arrayContaining(2, 3, 4, 0, 1));
}
@Test
- public void assertBuildDeleteSQLWithPrimaryKey() {
- String actual = sqlBuilder.buildDeleteSQL(mockDataRecord("t3"),
RecordUtil.extractPrimaryColumns(mockDataRecord("t3")));
- assertThat(actual, is("DELETE FROM `t3` WHERE `id` = ?"));
+ public void assertBuildUpdateSQLWithShardingColumnsUseCache() {
+ when(shardingColumnsMap.get("t2")).thenReturn(Sets.newHashSet("sc"));
+ DataRecord dataRecord = mockDataRecord("t2");
+ PreparedSQL actual = sqlBuilder.buildUpdateSQL(dataRecord);
+ assertThat(actual.getSql(), is("UPDATE `t2` SET `c1` = ?,`c2` = ?,`c3`
= ? WHERE `id` = ? and `sc` = ?"));
+ assertThat(actual.getValuesIndex().toArray(),
Matchers.arrayContaining(2, 3, 4, 0, 1));
+ actual = sqlBuilder.buildUpdateSQL(mockDataRecord2("t2"));
+ assertThat(actual.getSql(), is("UPDATE `t2` SET `c1` = ?,`c3` = ?
WHERE `id` = ? and `sc` = ?"));
+ assertThat(actual.getValuesIndex().toArray(),
Matchers.arrayContaining(2, 4, 0, 1));
+ }
+
+ private DataRecord mockDataRecord2(final String tableName) {
+ DataRecord result = new DataRecord(new NopPosition(), 4);
+ result.setTableName(tableName);
+ result.addColumn(new Column("id", "", false, true));
+ result.addColumn(new Column("sc", "", false, false));
+ result.addColumn(new Column("c1", "", true, false));
+ result.addColumn(new Column("c2", "", false, false));
+ result.addColumn(new Column("c3", "", true, false));
+ return result;
}
@Test
- public void assertBuildDeleteSQLWithConditionColumns() {
- DataRecord dataRecord = mockDataRecord("t3");
- String actual = sqlBuilder.buildDeleteSQL(dataRecord,
mockConditionColumns(dataRecord));
- assertThat(actual, is("DELETE FROM `t3` WHERE `id` = ? and `sc` = ?"));
+ public void assertBuildDeleteSQLWithPrimaryKey() {
+ when(shardingColumnsMap.get("t3")).thenReturn(Sets.newHashSet());
+ PreparedSQL actual = sqlBuilder.buildDeleteSQL(mockDataRecord("t3"));
+ assertThat(actual.getSql(), is("DELETE FROM `t3` WHERE `id` = ?"));
+ assertThat(actual.getValuesIndex().toArray(),
Matchers.arrayContaining(0));
}
- private Collection<Column> mockConditionColumns(final DataRecord
dataRecord) {
- return RecordUtil.extractConditionColumns(dataRecord,
Sets.newHashSet("sc"));
+ @Test
+ public void assertBuildDeleteSQLWithShardingColumns() {
+ when(shardingColumnsMap.get("t3")).thenReturn(Sets.newHashSet("sc"));
+ DataRecord dataRecord = mockDataRecord("t3");
+ PreparedSQL actual = sqlBuilder.buildDeleteSQL(dataRecord);
+ assertThat(actual.getSql(), is("DELETE FROM `t3` WHERE `id` = ? and
`sc` = ?"));
+ assertThat(actual.getValuesIndex().toArray(),
Matchers.arrayContaining(0, 1));
}
private DataRecord mockDataRecord(final String tableName) {
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureDataConsistencyChecker.java
b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureDataConsistencyChecker.java
index f5f315e..1c13d4e 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureDataConsistencyChecker.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-core/src/test/java/org/apache/shardingsphere/scaling/core/fixture/FixtureDataConsistencyChecker.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.scaling.core.fixture;
+import com.google.common.collect.Maps;
import
org.apache.shardingsphere.scaling.core.check.AbstractDataConsistencyChecker;
import org.apache.shardingsphere.scaling.core.check.DataConsistencyCheckResult;
import org.apache.shardingsphere.scaling.core.check.DataConsistencyChecker;
@@ -44,7 +45,7 @@ public final class FixtureDataConsistencyChecker extends
AbstractDataConsistency
@Override
protected AbstractSQLBuilder getSqlBuilder() {
- return new AbstractSQLBuilder() {
+ return new AbstractSQLBuilder(Maps.newHashMap()) {
@Override
protected String getLeftIdentifierQuoteString() {
return "`";
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLDataConsistencyChecker.java
b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLDataConsistencyChecker.java
index 41f9e61..fb44038 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLDataConsistencyChecker.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLDataConsistencyChecker.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.scaling.mysql;
+import com.google.common.collect.Maps;
import
org.apache.shardingsphere.scaling.core.check.AbstractDataConsistencyChecker;
import org.apache.shardingsphere.scaling.core.check.DataConsistencyChecker;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceWrapper;
@@ -98,6 +99,6 @@ public final class MySQLDataConsistencyChecker extends
AbstractDataConsistencyCh
@Override
protected MySQLSQLBuilder getSqlBuilder() {
- return new MySQLSQLBuilder();
+ return new MySQLSQLBuilder(Maps.newHashMap());
}
}
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLImporter.java
b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLImporter.java
index a22f5ac..97a971a 100755
---
a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLImporter.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLImporter.java
@@ -22,6 +22,9 @@ import
org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
import
org.apache.shardingsphere.scaling.core.execute.executor.importer.AbstractJDBCImporter;
import
org.apache.shardingsphere.scaling.core.execute.executor.importer.AbstractSQLBuilder;
+import java.util.Map;
+import java.util.Set;
+
/**
* MySQL importer.
*/
@@ -32,7 +35,7 @@ public final class MySQLImporter extends AbstractJDBCImporter
{
}
@Override
- protected AbstractSQLBuilder createSQLBuilder() {
- return new MySQLSQLBuilder();
+ protected AbstractSQLBuilder createSQLBuilder(final Map<String,
Set<String>> shardingColumnsMap) {
+ return new MySQLSQLBuilder(shardingColumnsMap);
}
}
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLSQLBuilder.java
b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLSQLBuilder.java
index f9960b6..5da75de 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLSQLBuilder.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/main/java/org/apache/shardingsphere/scaling/mysql/MySQLSQLBuilder.java
@@ -18,12 +18,24 @@
package org.apache.shardingsphere.scaling.mysql;
import
org.apache.shardingsphere.scaling.core.execute.executor.importer.AbstractSQLBuilder;
+import
org.apache.shardingsphere.scaling.core.execute.executor.importer.PreparedSQL;
+import org.apache.shardingsphere.scaling.core.execute.executor.record.Column;
+import
org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
/**
* MySQL SQL builder.
*/
public final class MySQLSQLBuilder extends AbstractSQLBuilder {
+ public MySQLSQLBuilder(final Map<String, Set<String>> shardingColumnsMap) {
+ super(shardingColumnsMap);
+ }
+
@Override
public String getLeftIdentifierQuoteString() {
return "`";
@@ -34,6 +46,22 @@ public final class MySQLSQLBuilder extends
AbstractSQLBuilder {
return "`";
}
+ @Override
+ protected PreparedSQL buildInsertSQLInternal(final DataRecord dataRecord) {
+ PreparedSQL preparedSQL = super.buildInsertSQLInternal(dataRecord);
+ StringBuilder insertSQL = new StringBuilder(preparedSQL.getSql() + "
ON DUPLICATE KEY UPDATE ");
+ List<Integer> valuesIndex = new
ArrayList<>(preparedSQL.getValuesIndex());
+ for (int i = 0; i < dataRecord.getColumnCount(); i++) {
+ Column column = dataRecord.getColumn(i);
+ if (!dataRecord.getColumn(i).isPrimaryKey()) {
+ insertSQL.append(quote(column.getName())).append("=?,");
+ valuesIndex.add(i);
+ }
+ }
+ insertSQL.setLength(insertSQL.length() - 1);
+ return new PreparedSQL(insertSQL.toString(), valuesIndex);
+ }
+
/**
* Build select sum crc32 SQL.
*
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLImporterTest.java
b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLImporterTest.java
index 7d8e7f2..8b89a63 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLImporterTest.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLImporterTest.java
@@ -17,11 +17,14 @@
package org.apache.shardingsphere.scaling.mysql;
+import com.google.common.collect.Maps;
import org.apache.shardingsphere.scaling.core.config.ImporterConfiguration;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
+import
org.apache.shardingsphere.scaling.core.execute.executor.importer.PreparedSQL;
import org.apache.shardingsphere.scaling.core.execute.executor.record.Column;
import
org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord;
import org.apache.shardingsphere.scaling.mysql.binlog.BinlogPosition;
+import org.hamcrest.Matchers;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
@@ -42,8 +45,9 @@ public final class MySQLImporterTest {
@Test
public void assertCreateSqlBuilder() {
MySQLImporter mySQLImporter = new MySQLImporter(importerConfig,
dataSourceManager);
- String insertSQL =
mySQLImporter.createSQLBuilder().buildInsertSQL(mockDataRecord());
- assertThat(insertSQL, is("INSERT INTO `t_order`(`id`,`name`)
VALUES(?,?)"));
+ PreparedSQL insertSQL =
mySQLImporter.createSQLBuilder(Maps.newHashMap()).buildInsertSQL(mockDataRecord());
+ assertThat(insertSQL.getSql(), is("INSERT INTO `t_order`(`id`,`name`)
VALUES(?,?) ON DUPLICATE KEY UPDATE `name`=?"));
+ assertThat(insertSQL.getValuesIndex().toArray(),
Matchers.arrayContaining(0, 1, 1));
}
private DataRecord mockDataRecord() {
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLSqlBuilderTest.java
b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLSqlBuilderTest.java
similarity index 63%
copy from
shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLSqlBuilderTest.java
copy to
shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLSqlBuilderTest.java
index 3f14f4c..12adaa5 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLSqlBuilderTest.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-mysql/src/test/java/org/apache/shardingsphere/scaling/mysql/MySQLSqlBuilderTest.java
@@ -15,30 +15,34 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.scaling.postgresql;
+package org.apache.shardingsphere.scaling.mysql;
+import com.google.common.collect.Maps;
+import
org.apache.shardingsphere.scaling.core.execute.executor.importer.PreparedSQL;
import org.apache.shardingsphere.scaling.core.execute.executor.record.Column;
import
org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord;
-import org.apache.shardingsphere.scaling.postgresql.wal.WalPosition;
+import org.apache.shardingsphere.scaling.mysql.binlog.BinlogPosition;
+import org.hamcrest.Matchers;
import org.junit.Test;
-import org.postgresql.replication.LogSequenceNumber;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
-public final class PostgreSQLSqlBuilderTest {
+public final class MySQLSqlBuilderTest {
@Test
public void assertBuildInsertSQL() {
- String actual = new
PostgreSQLSQLBuilder().buildInsertSQL(mockDataRecord());
- assertThat(actual, is("INSERT INTO \"t_order\"(\"id\",\"name\")
VALUES(?,?) ON CONFLICT (id) DO NOTHING"));
+ PreparedSQL actual = new
MySQLSQLBuilder(Maps.newHashMap()).buildInsertSQL(mockDataRecord());
+ assertThat(actual.getSql(), is("INSERT INTO
`t_order`(`id`,`name`,`age`) VALUES(?,?,?) ON DUPLICATE KEY UPDATE
`name`=?,`age`=?"));
+ assertThat(actual.getValuesIndex().toArray(),
Matchers.arrayContaining(0, 1, 2, 1, 2));
}
private DataRecord mockDataRecord() {
- DataRecord result = new DataRecord(new
WalPosition(LogSequenceNumber.valueOf(100L)), 2);
+ DataRecord result = new DataRecord(new BinlogPosition("", 1), 2);
result.setTableName("t_order");
result.addColumn(new Column("id", 1, true, true));
result.addColumn(new Column("name", "", true, false));
+ result.addColumn(new Column("age", 1, true, false));
return result;
}
}
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLDataConsistencyChecker.java
b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLDataConsistencyChecker.java
index 97f2712..df1d0c3 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLDataConsistencyChecker.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLDataConsistencyChecker.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.scaling.postgresql;
+import com.google.common.collect.Maps;
import
org.apache.shardingsphere.scaling.core.check.AbstractDataConsistencyChecker;
import org.apache.shardingsphere.scaling.core.check.DataConsistencyChecker;
import
org.apache.shardingsphere.scaling.core.execute.executor.importer.AbstractSQLBuilder;
@@ -41,6 +42,6 @@ public final class PostgreSQLDataConsistencyChecker extends
AbstractDataConsiste
@Override
protected AbstractSQLBuilder getSqlBuilder() {
- return new PostgreSQLSQLBuilder();
+ return new PostgreSQLSQLBuilder(Maps.newHashMap());
}
}
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLImporter.java
b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLImporter.java
index 0422d4b..8faa00c 100755
---
a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLImporter.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLImporter.java
@@ -22,6 +22,9 @@ import
org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
import
org.apache.shardingsphere.scaling.core.execute.executor.importer.AbstractJDBCImporter;
import
org.apache.shardingsphere.scaling.core.execute.executor.importer.AbstractSQLBuilder;
+import java.util.Map;
+import java.util.Set;
+
/**
* postgreSQL importer.
*/
@@ -32,8 +35,8 @@ public final class PostgreSQLImporter extends
AbstractJDBCImporter {
}
@Override
- protected AbstractSQLBuilder createSQLBuilder() {
- return new PostgreSQLSQLBuilder();
+ protected AbstractSQLBuilder createSQLBuilder(final Map<String,
Set<String>> shardingColumnsMap) {
+ return new PostgreSQLSQLBuilder(shardingColumnsMap);
}
}
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLSQLBuilder.java
b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLSQLBuilder.java
index e59fb36..488a09e 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLSQLBuilder.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLSQLBuilder.java
@@ -17,16 +17,23 @@
package org.apache.shardingsphere.scaling.postgresql;
-import org.apache.shardingsphere.scaling.core.execute.executor.record.Column;
+import
org.apache.shardingsphere.scaling.core.execute.executor.importer.AbstractSQLBuilder;
+import
org.apache.shardingsphere.scaling.core.execute.executor.importer.PreparedSQL;
import
org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord;
import
org.apache.shardingsphere.scaling.core.execute.executor.record.RecordUtil;
-import
org.apache.shardingsphere.scaling.core.execute.executor.importer.AbstractSQLBuilder;
+
+import java.util.Map;
+import java.util.Set;
/**
* PostgreSQL SQL builder.
*/
public final class PostgreSQLSQLBuilder extends AbstractSQLBuilder {
+ public PostgreSQLSQLBuilder(final Map<String, Set<String>>
shardingColumnsMap) {
+ super(shardingColumnsMap);
+ }
+
@Override
public String getLeftIdentifierQuoteString() {
return "\"";
@@ -38,14 +45,15 @@ public final class PostgreSQLSQLBuilder extends
AbstractSQLBuilder {
}
@Override
- public String buildInsertSQL(final DataRecord dataRecord) {
- return super.buildInsertSQL(dataRecord) + buildConflictSQL(dataRecord);
+ public PreparedSQL buildInsertSQL(final DataRecord dataRecord) {
+ PreparedSQL preparedSQL = super.buildInsertSQL(dataRecord);
+ return new PreparedSQL(preparedSQL.getSql() +
buildConflictSQL(dataRecord), preparedSQL.getValuesIndex());
}
private String buildConflictSQL(final DataRecord dataRecord) {
StringBuilder result = new StringBuilder(" ON CONFLICT (");
- for (Column each : RecordUtil.extractPrimaryColumns(dataRecord)) {
- result.append(each.getName()).append(",");
+ for (Integer each : RecordUtil.extractPrimaryColumns(dataRecord)) {
+ result.append(dataRecord.getColumn(each).getName()).append(",");
}
result.setLength(result.length() - 1);
result.append(") DO NOTHING");
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLImporterTest.java
b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLImporterTest.java
index f7b4e68..19a1bf9 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLImporterTest.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLImporterTest.java
@@ -17,11 +17,14 @@
package org.apache.shardingsphere.scaling.postgresql;
+import com.google.common.collect.Maps;
import org.apache.shardingsphere.scaling.core.config.ImporterConfiguration;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
+import
org.apache.shardingsphere.scaling.core.execute.executor.importer.PreparedSQL;
import org.apache.shardingsphere.scaling.core.execute.executor.record.Column;
import
org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord;
import org.apache.shardingsphere.scaling.postgresql.wal.WalPosition;
+import org.hamcrest.Matchers;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
@@ -43,8 +46,9 @@ public final class PostgreSQLImporterTest {
@Test
public void assertCreateSQLBuilder() {
PostgreSQLImporter postgreSQLImporter = new
PostgreSQLImporter(importerConfig, dataSourceManager);
- String insertSQL =
postgreSQLImporter.createSQLBuilder().buildInsertSQL(mockDataRecord());
- assertThat(insertSQL, is("INSERT INTO \"t_order\"(\"id\",\"name\")
VALUES(?,?) ON CONFLICT (id) DO NOTHING"));
+ PreparedSQL insertSQL =
postgreSQLImporter.createSQLBuilder(Maps.newHashMap()).buildInsertSQL(mockDataRecord());
+ assertThat(insertSQL.getSql(), is("INSERT INTO
\"t_order\"(\"id\",\"name\") VALUES(?,?) ON CONFLICT (id) DO NOTHING"));
+ assertThat(insertSQL.getValuesIndex().toArray(),
Matchers.arrayContaining(0, 1));
}
private DataRecord mockDataRecord() {
diff --git
a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLSqlBuilderTest.java
b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLSqlBuilderTest.java
index 3f14f4c..885c5f4 100644
---
a/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLSqlBuilderTest.java
+++
b/shardingsphere-scaling/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/PostgreSQLSqlBuilderTest.java
@@ -17,9 +17,12 @@
package org.apache.shardingsphere.scaling.postgresql;
+import com.google.common.collect.Maps;
+import
org.apache.shardingsphere.scaling.core.execute.executor.importer.PreparedSQL;
import org.apache.shardingsphere.scaling.core.execute.executor.record.Column;
import
org.apache.shardingsphere.scaling.core.execute.executor.record.DataRecord;
import org.apache.shardingsphere.scaling.postgresql.wal.WalPosition;
+import org.hamcrest.Matchers;
import org.junit.Test;
import org.postgresql.replication.LogSequenceNumber;
@@ -30,8 +33,9 @@ public final class PostgreSQLSqlBuilderTest {
@Test
public void assertBuildInsertSQL() {
- String actual = new
PostgreSQLSQLBuilder().buildInsertSQL(mockDataRecord());
- assertThat(actual, is("INSERT INTO \"t_order\"(\"id\",\"name\")
VALUES(?,?) ON CONFLICT (id) DO NOTHING"));
+ PreparedSQL actual = new
PostgreSQLSQLBuilder(Maps.newHashMap()).buildInsertSQL(mockDataRecord());
+ assertThat(actual.getSql(), is("INSERT INTO
\"t_order\"(\"id\",\"name\") VALUES(?,?) ON CONFLICT (id) DO NOTHING"));
+ assertThat(actual.getValuesIndex().toArray(),
Matchers.arrayContaining(0, 1));
}
private DataRecord mockDataRecord() {