This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 453f2ffd5f3 Add inventory records count at progress (#21664)
453f2ffd5f3 is described below
commit 453f2ffd5f3af9c1ec446dc4ccb28371daf62801
Author: Xinze Guo <[email protected]>
AuthorDate: Fri Oct 21 10:59:48 2022 +0800
Add inventory records count at progress (#21664)
---
.../InventoryIncrementalJobItemProgress.java | 2 ++
.../impl/InventoryIncrementalJobItemAPIImpl.java | 1 +
.../InventoryIncrementalJobItemContext.java | 14 ++++++++++++++
.../YamlInventoryIncrementalJobItemProgress.java | 2 ++
...InventoryIncrementalJobItemProgressSwapper.java | 2 ++
.../core/prepare/InventoryTaskSplitter.java | 22 ++++++++++++++++++++--
.../sqlbuilder/AbstractPipelineSQLBuilder.java | 2 +-
.../migration/MigrationJobItemContext.java | 8 ++++++++
8 files changed, 50 insertions(+), 3 deletions(-)
diff --git
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/InventoryIncrementalJobItemProgress.java
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/InventoryIncrementalJobItemProgress.java
index e529214631f..9cc4284a747 100644
---
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/InventoryIncrementalJobItemProgress.java
+++
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/job/progress/InventoryIncrementalJobItemProgress.java
@@ -42,5 +42,7 @@ public final class InventoryIncrementalJobItemProgress
implements PipelineJobIte
private long processedRecordsCount;
+ private long inventoryRecordsCount;
+
private String errorMessage;
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/InventoryIncrementalJobItemAPIImpl.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/InventoryIncrementalJobItemAPIImpl.java
index e1750c66025..01eda47a28c 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/InventoryIncrementalJobItemAPIImpl.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/api/impl/InventoryIncrementalJobItemAPIImpl.java
@@ -56,6 +56,7 @@ public final class InventoryIncrementalJobItemAPIImpl
implements PipelineJobItem
jobItemProgress.setIncremental(getIncrementalTasksProgress(context.getIncrementalTasks()));
jobItemProgress.setInventory(getInventoryTasksProgress(context.getInventoryTasks()));
jobItemProgress.setProcessedRecordsCount(context.getProcessedRecordsCount());
+
jobItemProgress.setInventoryRecordsCount(context.getInventoryRecordsCount());
String value =
YamlEngine.marshal(SWAPPER.swapToYamlConfiguration(jobItemProgress));
PipelineAPIFactory.getGovernanceRepositoryAPI().persistJobItemProgress(jobItemContext.getJobId(),
jobItemContext.getShardingItem(), value);
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/InventoryIncrementalJobItemContext.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/InventoryIncrementalJobItemContext.java
index 046aa1d254b..104d4e12a52 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/InventoryIncrementalJobItemContext.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/context/InventoryIncrementalJobItemContext.java
@@ -76,4 +76,18 @@ public interface InventoryIncrementalJobItemContext extends
PipelineJobItemConte
* @return processed record count.
*/
long getProcessedRecordsCount();
+
+ /**
+ * Init inventory records count.
+ *
+ * @param recordsCount records count
+ */
+ void initInventoryRecordsCount(long recordsCount);
+
+ /**
+ * Get inventory records count.
+ *
+ * @return inventory records count
+ */
+ long getInventoryRecordsCount();
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlInventoryIncrementalJobItemProgress.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlInventoryIncrementalJobItemProgress.java
index 956a90506a6..d9936fff379 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlInventoryIncrementalJobItemProgress.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlInventoryIncrementalJobItemProgress.java
@@ -39,4 +39,6 @@ public final class YamlInventoryIncrementalJobItemProgress
implements YamlConfig
private YamlJobItemIncrementalTasksProgress incremental;
private long processedRecordsCount;
+
+ private long inventoryRecordsCount;
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlInventoryIncrementalJobItemProgressSwapper.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlInventoryIncrementalJobItemProgressSwapper.java
index 1e31d99d7c7..3dbb18a6077 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlInventoryIncrementalJobItemProgressSwapper.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/YamlInventoryIncrementalJobItemProgressSwapper.java
@@ -39,6 +39,7 @@ public final class
YamlInventoryIncrementalJobItemProgressSwapper implements Yam
result.setInventory(inventoryTasksProgressSwapper.swapToYaml(progress.getInventory()));
result.setIncremental(incrementalTasksProgressSwapper.swapToYaml(progress.getIncremental()));
result.setProcessedRecordsCount(progress.getProcessedRecordsCount());
+ result.setInventoryRecordsCount(progress.getInventoryRecordsCount());
return result;
}
@@ -51,6 +52,7 @@ public final class
YamlInventoryIncrementalJobItemProgressSwapper implements Yam
result.setInventory(inventoryTasksProgressSwapper.swapToObject(yamlProgress.getInventory()));
result.setIncremental(incrementalTasksProgressSwapper.swapToObject(yamlProgress.getSourceDatabaseType(),
yamlProgress.getIncremental()));
result.setProcessedRecordsCount(yamlProgress.getProcessedRecordsCount());
+
result.setInventoryRecordsCount(yamlProgress.getInventoryRecordsCount());
return result;
}
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
index 9cfcd247ca2..40c81189147 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/prepare/InventoryTaskSplitter.java
@@ -149,7 +149,7 @@ public final class InventoryTaskSplitter {
return getPositionByIntegerPrimaryKeyRange(jobItemContext,
dataSource, dumperConfig);
}
if (PipelineJdbcUtils.isStringColumn(uniqueKeyDataType)) {
- return getPositionByStringPrimaryKeyRange();
+ return getPositionByStringPrimaryKeyRange(jobItemContext,
dataSource, dumperConfig);
}
throw new
SplitPipelineJobByRangeException(dumperConfig.getActualTableName(), "primary
key is not integer or string type");
}
@@ -166,6 +166,7 @@ public final class InventoryTaskSplitter {
PreparedStatement ps = connection.prepareStatement(sql)) {
// TODO query minimum value less than 0
long beginId = 0;
+ long recordsCount = 0;
for (int i = 0; i < Integer.MAX_VALUE; i++) {
ps.setLong(1, beginId);
ps.setLong(2, shardingSize);
@@ -175,6 +176,7 @@ public final class InventoryTaskSplitter {
break;
}
long endId = rs.getLong(1);
+ recordsCount += rs.getLong(2);
if (0 == endId) {
log.info("getPositionByPrimaryKeyRange, endId is 0,
break, tableName={}, primaryKey={}, beginId={}",
dumperConfig.getActualTableName(), dumperConfig.getUniqueKey(), beginId);
break;
@@ -183,6 +185,7 @@ public final class InventoryTaskSplitter {
beginId = endId + 1;
}
}
+ jobItemContext.initInventoryRecordsCount(recordsCount);
// fix empty table missing inventory task
if (result.isEmpty()) {
result.add(new IntegerPrimaryKeyPosition(0, 0));
@@ -193,7 +196,22 @@ public final class InventoryTaskSplitter {
return result;
}
- private Collection<IngestPosition<?>> getPositionByStringPrimaryKeyRange()
{
+ private Collection<IngestPosition<?>>
getPositionByStringPrimaryKeyRange(final InventoryIncrementalJobItemContext
jobItemContext, final DataSource dataSource,
+
final InventoryDumperConfiguration dumperConfig) {
+ PipelineJobConfiguration jobConfig = jobItemContext.getJobConfig();
+ String schemaName = dumperConfig.getSchemaName(new
LogicTableName(dumperConfig.getLogicTableName()));
+ String actualTableName = dumperConfig.getActualTableName();
+ String sql =
PipelineSQLBuilderFactory.getInstance(jobConfig.getSourceDatabaseType()).buildCountSQL(schemaName,
actualTableName);
+ try (
+ Connection connection = dataSource.getConnection();
+ PreparedStatement ps = connection.prepareStatement(sql)) {
+ try (ResultSet rs = ps.executeQuery()) {
+ rs.next();
+ jobItemContext.initInventoryRecordsCount(rs.getLong(1));
+ }
+ } catch (final SQLException ex) {
+ throw new
SplitPipelineJobByUniqueKeyException(dumperConfig.getActualTableName(),
dumperConfig.getUniqueKey(), ex);
+ }
Collection<IngestPosition<?>> result = new LinkedList<>();
result.add(new StringPrimaryKeyPosition("!", "~"));
return result;
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java
index d65408f499f..75bd659b63c 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/AbstractPipelineSQLBuilder.java
@@ -181,7 +181,7 @@ public abstract class AbstractPipelineSQLBuilder implements
PipelineSQLBuilder {
@Override
public String buildSplitByPrimaryKeyRangeSQL(final String schemaName,
final String tableName, final String primaryKey) {
String quotedUniqueKey = quote(primaryKey);
- return String.format("SELECT MAX(%s) FROM (SELECT %s FROM %s WHERE
%s>=? ORDER BY %s LIMIT ?) t",
+ return String.format("SELECT MAX(%s),COUNT(*) FROM (SELECT %s FROM %s
WHERE %s>=? ORDER BY %s LIMIT ?) t",
quotedUniqueKey, quotedUniqueKey,
getQualifiedTableName(schemaName, tableName), quotedUniqueKey, quotedUniqueKey);
}
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java
index c5a5f2301ba..adcdd6761d1 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobItemContext.java
@@ -66,6 +66,8 @@ public final class MigrationJobItemContext implements
InventoryIncrementalJobIte
private final AtomicLong processedRecordsCount = new AtomicLong(0);
+ private volatile long inventoryRecordsCount;
+
private final MigrationJobConfiguration jobConfig;
private final MigrationProcessContext jobProcessContext;
@@ -97,6 +99,7 @@ public final class MigrationJobItemContext implements
InventoryIncrementalJobIte
this.initProgress = initProgress;
if (null != initProgress) {
processedRecordsCount.set(initProgress.getProcessedRecordsCount());
+ inventoryRecordsCount = initProgress.getInventoryRecordsCount();
}
this.jobProcessContext = jobProcessContext;
this.taskConfig = taskConfig;
@@ -138,4 +141,9 @@ public final class MigrationJobItemContext implements
InventoryIncrementalJobIte
public long getProcessedRecordsCount() {
return processedRecordsCount.get();
}
+
+ @Override
+ public void initInventoryRecordsCount(final long recordsCount) {
+ inventoryRecordsCount = recordsCount;
+ }
}