This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new cd287d6530f Use ShardingSphereIdentifier on ShardingColumnsExtractor
(#33949)
cd287d6530f is described below
commit cd287d6530f6e80251a6f6947351dc8e8f23bf89
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Dec 7 13:06:00 2024 +0800
Use ShardingSphereIdentifier on ShardingColumnsExtractor (#33949)
* Fix E2EEnvironmentEngine
* Use ShardingSphereIdentifier on ShardingColumnsExtractor
* Use ShardingSphereIdentifier on ShardingColumnsExtractor
* Use ShardingSphereIdentifier on ShardingColumnsExtractor
---
.../ShardingRuleConfigurationConverter.java | 2 --
.../core/importer/ImporterConfiguration.java | 12 ++++----
.../core/util/ShardingColumnsExtractor.java | 32 ++++++++++------------
.../core/importer/ImporterConfigurationTest.java | 6 ++--
.../shardingsphere/data/pipeline/cdc/CDCJob.java | 6 ++--
.../migration/MigrationJobExecutorCallback.java | 8 +++---
.../test/e2e/env/E2EEnvironmentEngine.java | 3 +-
.../sink/type/PipelineDataSourceSinkTest.java | 4 +--
.../pipeline/core/util/PipelineContextUtils.java | 8 +++---
9 files changed, 40 insertions(+), 41 deletions(-)
diff --git
a/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/yaml/swapper/ShardingRuleConfigurationConverter.java
b/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/yaml/swapper/ShardingRuleConfigurationConverter.java
index f869417b033..66e12f94d66 100644
---
a/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/yaml/swapper/ShardingRuleConfigurationConverter.java
+++
b/features/sharding/core/src/main/java/org/apache/shardingsphere/sharding/yaml/swapper/ShardingRuleConfigurationConverter.java
@@ -38,7 +38,6 @@ public final class ShardingRuleConfigurationConverter {
*
* @param yamlRuleConfigs YAML rule configurations
* @return sharding rule configuration
- * @throws IllegalStateException if there is no available sharding rule
*/
public static Optional<ShardingRuleConfiguration>
findAndConvertShardingRuleConfiguration(final Collection<YamlRuleConfiguration>
yamlRuleConfigs) {
return findYamlShardingRuleConfiguration(yamlRuleConfigs).map(each ->
new YamlShardingRuleConfigurationSwapper().swapToObject(each));
@@ -49,7 +48,6 @@ public final class ShardingRuleConfigurationConverter {
*
* @param yamlRuleConfigs YAML rule configurations
* @return YAML sharding rule configuration
- * @throws IllegalStateException if there is no available sharding rule
*/
public static Optional<YamlShardingRuleConfiguration>
findYamlShardingRuleConfiguration(final Collection<YamlRuleConfiguration>
yamlRuleConfigs) {
return
yamlRuleConfigs.stream().filter(YamlShardingRuleConfiguration.class::isInstance).findFirst().map(YamlShardingRuleConfiguration.class::cast);
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/ImporterConfiguration.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/ImporterConfiguration.java
index 58cfd0e3909..bb0ebca58cd 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/ImporterConfiguration.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/ImporterConfiguration.java
@@ -17,16 +17,17 @@
package org.apache.shardingsphere.data.pipeline.core.importer;
+import lombok.AccessLevel;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.ToString;
import
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.mapper.TableAndSchemaNameMapper;
-import
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveIdentifier;
-import
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveQualifiedTable;
import
org.apache.shardingsphere.data.pipeline.core.ratelimit.JobRateLimitAlgorithm;
import
org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
+import
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveQualifiedTable;
+import
org.apache.shardingsphere.infra.metadata.identifier.ShardingSphereIdentifier;
import java.util.Collection;
import java.util.Collections;
@@ -45,7 +46,8 @@ public final class ImporterConfiguration {
private final PipelineDataSourceConfiguration dataSourceConfig;
- private final Map<CaseInsensitiveIdentifier, Set<String>>
shardingColumnsMap;
+ @Getter(AccessLevel.NONE)
+ private final Map<ShardingSphereIdentifier, Set<String>>
shardingColumnsMap;
private final TableAndSchemaNameMapper tableAndSchemaNameMapper;
@@ -64,7 +66,7 @@ public final class ImporterConfiguration {
* @return sharding columns
*/
public Set<String> getShardingColumns(final String logicTableName) {
- return shardingColumnsMap.getOrDefault(new
CaseInsensitiveIdentifier(logicTableName), Collections.emptySet());
+ return shardingColumnsMap.getOrDefault(new
ShardingSphereIdentifier(logicTableName), Collections.emptySet());
}
/**
@@ -85,6 +87,6 @@ public final class ImporterConfiguration {
*/
public Collection<CaseInsensitiveQualifiedTable> getQualifiedTables() {
return shardingColumnsMap.keySet().stream()
- .map(CaseInsensitiveIdentifier::toString).map(each -> new
CaseInsensitiveQualifiedTable(tableAndSchemaNameMapper.getSchemaName(each),
each)).collect(Collectors.toList());
+ .map(ShardingSphereIdentifier::getValue).map(each -> new
CaseInsensitiveQualifiedTable(tableAndSchemaNameMapper.getSchemaName(each),
each)).collect(Collectors.toList());
}
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/ShardingColumnsExtractor.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/ShardingColumnsExtractor.java
index 276b8581c70..951227516d0 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/ShardingColumnsExtractor.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/util/ShardingColumnsExtractor.java
@@ -17,7 +17,7 @@
package org.apache.shardingsphere.data.pipeline.core.util;
-import
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveIdentifier;
+import
org.apache.shardingsphere.infra.metadata.identifier.ShardingSphereIdentifier;
import
org.apache.shardingsphere.infra.yaml.config.pojo.rule.YamlRuleConfiguration;
import org.apache.shardingsphere.sharding.api.config.ShardingRuleConfiguration;
import
org.apache.shardingsphere.sharding.api.config.rule.ShardingAutoTableRuleConfiguration;
@@ -48,39 +48,37 @@ public final class ShardingColumnsExtractor {
* @param logicTableNames logic table names
* @return sharding columns map
*/
- public Map<CaseInsensitiveIdentifier, Set<String>>
getShardingColumnsMap(final Collection<YamlRuleConfiguration> yamlRuleConfigs,
final Set<CaseInsensitiveIdentifier> logicTableNames) {
+ public Map<ShardingSphereIdentifier, Set<String>>
getShardingColumnsMap(final Collection<YamlRuleConfiguration> yamlRuleConfigs,
final Collection<ShardingSphereIdentifier> logicTableNames) {
Optional<ShardingRuleConfiguration> shardingRuleConfig =
ShardingRuleConfigurationConverter.findAndConvertShardingRuleConfiguration(yamlRuleConfigs);
if (!shardingRuleConfig.isPresent()) {
return Collections.emptyMap();
}
Set<String> defaultDatabaseShardingColumns =
extractShardingColumns(shardingRuleConfig.get().getDefaultDatabaseShardingStrategy());
Set<String> defaultTableShardingColumns =
extractShardingColumns(shardingRuleConfig.get().getDefaultTableShardingStrategy());
- Map<CaseInsensitiveIdentifier, Set<String>> result = new
ConcurrentHashMap<>(shardingRuleConfig.get().getTables().size(), 1F);
+ // TODO check is it need to be ConcurrentHashMap?
+ // TODO check is it need to be ShardingSphereIdentifier with column
names?
+ Map<ShardingSphereIdentifier, Set<String>> result = new
ConcurrentHashMap<>(shardingRuleConfig.get().getTables().size(), 1F);
for (ShardingTableRuleConfiguration each :
shardingRuleConfig.get().getTables()) {
- CaseInsensitiveIdentifier logicTableName = new
CaseInsensitiveIdentifier(each.getLogicTable());
- if (!logicTableNames.contains(logicTableName)) {
- continue;
+ ShardingSphereIdentifier logicTableName = new
ShardingSphereIdentifier(each.getLogicTable());
+ if (logicTableNames.contains(logicTableName)) {
+ Set<String> shardingColumns = new HashSet<>();
+ shardingColumns.addAll(null ==
each.getDatabaseShardingStrategy() ? defaultDatabaseShardingColumns :
extractShardingColumns(each.getDatabaseShardingStrategy()));
+ shardingColumns.addAll(null == each.getTableShardingStrategy()
? defaultTableShardingColumns :
extractShardingColumns(each.getTableShardingStrategy()));
+ result.put(logicTableName, shardingColumns);
}
- Set<String> shardingColumns = new HashSet<>();
- shardingColumns.addAll(null == each.getDatabaseShardingStrategy()
? defaultDatabaseShardingColumns :
extractShardingColumns(each.getDatabaseShardingStrategy()));
- shardingColumns.addAll(null == each.getTableShardingStrategy() ?
defaultTableShardingColumns :
extractShardingColumns(each.getTableShardingStrategy()));
- result.put(logicTableName, shardingColumns);
}
for (ShardingAutoTableRuleConfiguration each :
shardingRuleConfig.get().getAutoTables()) {
- CaseInsensitiveIdentifier logicTableName = new
CaseInsensitiveIdentifier(each.getLogicTable());
- if (!logicTableNames.contains(logicTableName)) {
- continue;
+ ShardingSphereIdentifier logicTableName = new
ShardingSphereIdentifier(each.getLogicTable());
+ if (logicTableNames.contains(logicTableName)) {
+ result.put(logicTableName,
extractShardingColumns(each.getShardingStrategy()));
}
- ShardingStrategyConfiguration shardingStrategy =
each.getShardingStrategy();
- Set<String> shardingColumns = new
HashSet<>(extractShardingColumns(shardingStrategy));
- result.put(logicTableName, shardingColumns);
}
return result;
}
private Set<String> extractShardingColumns(final
ShardingStrategyConfiguration shardingStrategy) {
if (shardingStrategy instanceof StandardShardingStrategyConfiguration)
{
- return new
HashSet<>(Collections.singleton(((StandardShardingStrategyConfiguration)
shardingStrategy).getShardingColumn()));
+ return
Collections.singleton(((StandardShardingStrategyConfiguration)
shardingStrategy).getShardingColumn());
}
if (shardingStrategy instanceof ComplexShardingStrategyConfiguration) {
return new
HashSet<>(Arrays.asList(((ComplexShardingStrategyConfiguration)
shardingStrategy).getShardingColumns().split(",")));
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/ImporterConfigurationTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/ImporterConfigurationTest.java
index e7725dcd564..d0bbb7024aa 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/ImporterConfigurationTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/ImporterConfigurationTest.java
@@ -21,8 +21,8 @@ import
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfigurati
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.mapper.TableAndSchemaNameMapper;
import
org.apache.shardingsphere.data.pipeline.core.ratelimit.JobRateLimitAlgorithm;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
-import
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveIdentifier;
import
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveQualifiedTable;
+import
org.apache.shardingsphere.infra.metadata.identifier.ShardingSphereIdentifier;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.junit.jupiter.api.Test;
@@ -39,7 +39,7 @@ class ImporterConfigurationTest {
@Test
void assertGetShardingColumns() {
ImporterConfiguration importerConfig = new ImporterConfiguration(
- mock(PipelineDataSourceConfiguration.class),
Collections.singletonMap(new CaseInsensitiveIdentifier("foo_tbl"),
Collections.singleton("foo_col")),
+ mock(PipelineDataSourceConfiguration.class),
Collections.singletonMap(new ShardingSphereIdentifier("foo_tbl"),
Collections.singleton("foo_col")),
mock(TableAndSchemaNameMapper.class), 1,
mock(JobRateLimitAlgorithm.class), 1, 1);
assertThat(importerConfig.getShardingColumns("foo_tbl"),
is(Collections.singleton("foo_col")));
}
@@ -57,7 +57,7 @@ class ImporterConfigurationTest {
TableAndSchemaNameMapper tableAndSchemaNameMapper =
mock(TableAndSchemaNameMapper.class);
when(tableAndSchemaNameMapper.getSchemaName("foo_tbl")).thenReturn("foo_schema");
ImporterConfiguration importerConfig = new ImporterConfiguration(
- mock(PipelineDataSourceConfiguration.class),
Collections.singletonMap(new CaseInsensitiveIdentifier("foo_tbl"),
Collections.singleton("foo_col")),
+ mock(PipelineDataSourceConfiguration.class),
Collections.singletonMap(new ShardingSphereIdentifier("foo_tbl"),
Collections.singleton("foo_col")),
tableAndSchemaNameMapper, 1,
mock(JobRateLimitAlgorithm.class), 1, 1);
assertThat(importerConfig.getQualifiedTables(),
is(Collections.singletonList(new CaseInsensitiveQualifiedTable("foo_schema",
"foo_tbl"))));
}
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
index 7a111945446..03cab1ef778 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
@@ -64,7 +64,7 @@ import
org.apache.shardingsphere.data.pipeline.core.registrycenter.repository.Pi
import org.apache.shardingsphere.data.pipeline.core.task.PipelineTask;
import
org.apache.shardingsphere.data.pipeline.core.util.ShardingColumnsExtractor;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
-import
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveIdentifier;
+import
org.apache.shardingsphere.infra.metadata.identifier.ShardingSphereIdentifier;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import java.util.Collection;
@@ -149,8 +149,8 @@ public final class CDCJob implements PipelineJob {
final
Collection<String> schemaTableNames, final TableAndSchemaNameMapper mapper) {
PipelineDataSourceConfiguration dataSourceConfig =
PipelineDataSourceConfigurationFactory.newInstance(
jobConfig.getDataSourceConfig().getType(),
jobConfig.getDataSourceConfig().getParameter());
- Map<CaseInsensitiveIdentifier, Set<String>> shardingColumnsMap = new
ShardingColumnsExtractor()
-
.getShardingColumnsMap(jobConfig.getDataSourceConfig().getRootConfig().getRules(),
schemaTableNames.stream().map(CaseInsensitiveIdentifier::new).collect(Collectors.toSet()));
+ Map<ShardingSphereIdentifier, Set<String>> shardingColumnsMap = new
ShardingColumnsExtractor()
+
.getShardingColumnsMap(jobConfig.getDataSourceConfig().getRootConfig().getRules(),
schemaTableNames.stream().map(ShardingSphereIdentifier::new).collect(Collectors.toSet()));
PipelineWriteConfiguration write = pipelineProcessConfig.getWrite();
JobRateLimitAlgorithm writeRateLimitAlgorithm = null ==
write.getRateLimiter() ? null
: TypedSPILoader.getService(JobRateLimitAlgorithm.class,
write.getRateLimiter().getType(), write.getRateLimiter().getProps());
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobExecutorCallback.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobExecutorCallback.java
index 2fde01e7632..cb1f86c252e 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobExecutorCallback.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobExecutorCallback.java
@@ -40,8 +40,8 @@ import
org.apache.shardingsphere.data.pipeline.scenario.migration.ingest.dumper.
import
org.apache.shardingsphere.data.pipeline.scenario.migration.preparer.MigrationJobPreparer;
import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
import org.apache.shardingsphere.infra.datanode.DataNode;
-import
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveIdentifier;
import
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveQualifiedTable;
+import
org.apache.shardingsphere.infra.metadata.identifier.ShardingSphereIdentifier;
import java.sql.SQLException;
import java.util.Collection;
@@ -65,8 +65,8 @@ public final class MigrationJobExecutorCallback implements
DistributedPipelineJo
private MigrationTaskConfiguration buildTaskConfiguration(final
MigrationJobConfiguration jobConfig, final int jobShardingItem, final
PipelineProcessConfiguration processConfig) {
IncrementalDumperContext incrementalDumperContext = new
MigrationIncrementalDumperContextCreator(jobConfig).createDumperContext(jobConfig.getJobDataNodeLine(jobShardingItem));
Collection<CreateTableConfiguration> createTableConfigs =
buildCreateTableConfigurations(jobConfig,
incrementalDumperContext.getCommonContext().getTableAndSchemaNameMapper());
- Set<CaseInsensitiveIdentifier> targetTableNames =
jobConfig.getTargetTableNames().stream().map(CaseInsensitiveIdentifier::new).collect(Collectors.toSet());
- Map<CaseInsensitiveIdentifier, Set<String>> shardingColumnsMap = new
ShardingColumnsExtractor().getShardingColumnsMap(
+ Set<ShardingSphereIdentifier> targetTableNames =
jobConfig.getTargetTableNames().stream().map(ShardingSphereIdentifier::new).collect(Collectors.toSet());
+ Map<ShardingSphereIdentifier, Set<String>> shardingColumnsMap = new
ShardingColumnsExtractor().getShardingColumnsMap(
((ShardingSpherePipelineDataSourceConfiguration)
jobConfig.getTarget()).getRootConfig().getRules(), targetTableNames);
ImporterConfiguration importerConfig =
buildImporterConfiguration(jobConfig, processConfig, shardingColumnsMap,
incrementalDumperContext.getCommonContext().getTableAndSchemaNameMapper());
return new
MigrationTaskConfiguration(incrementalDumperContext.getCommonContext().getDataSourceName(),
createTableConfigs, incrementalDumperContext, importerConfig);
@@ -86,7 +86,7 @@ public final class MigrationJobExecutorCallback implements
DistributedPipelineJo
}
private ImporterConfiguration buildImporterConfiguration(final
MigrationJobConfiguration jobConfig, final PipelineProcessConfiguration
pipelineProcessConfig,
- final
Map<CaseInsensitiveIdentifier, Set<String>> shardingColumnsMap, final
TableAndSchemaNameMapper mapper) {
+ final
Map<ShardingSphereIdentifier, Set<String>> shardingColumnsMap, final
TableAndSchemaNameMapper mapper) {
int batchSize = pipelineProcessConfig.getWrite().getBatchSize();
JobRateLimitAlgorithm writeRateLimitAlgorithm = new
TransmissionProcessContext(jobConfig.getJobId(),
pipelineProcessConfig).getWriteRateLimitAlgorithm();
int retryTimes = jobConfig.getRetryTimes();
diff --git
a/test/e2e/sql/src/test/java/org/apache/shardingsphere/test/e2e/env/E2EEnvironmentEngine.java
b/test/e2e/sql/src/test/java/org/apache/shardingsphere/test/e2e/env/E2EEnvironmentEngine.java
index c15f87c0846..b1eb9e3b03c 100644
---
a/test/e2e/sql/src/test/java/org/apache/shardingsphere/test/e2e/env/E2EEnvironmentEngine.java
+++
b/test/e2e/sql/src/test/java/org/apache/shardingsphere/test/e2e/env/E2EEnvironmentEngine.java
@@ -19,6 +19,7 @@ package org.apache.shardingsphere.test.e2e.env;
import lombok.Getter;
import lombok.SneakyThrows;
+import org.apache.shardingsphere.infra.database.core.DefaultDatabase;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import org.apache.shardingsphere.test.e2e.container.compose.ContainerComposer;
import
org.apache.shardingsphere.test.e2e.container.compose.ContainerComposerRegistry;
@@ -68,7 +69,7 @@ public final class E2EEnvironmentEngine {
@SneakyThrows({SQLException.class, IOException.class})
private void executeLogicDatabaseInitSQLFileOnlyOnce(final String key,
final String scenario, final DatabaseType databaseType, final DataSource
targetDataSource) {
- Optional<String> logicDatabaseInitSQLFile = new
ScenarioDataPath(scenario).findActualDatabaseInitSQLFile("foo_db",
databaseType);
+ Optional<String> logicDatabaseInitSQLFile = new
ScenarioDataPath(scenario).findActualDatabaseInitSQLFile(DefaultDatabase.LOGIC_NAME,
databaseType);
if (!logicDatabaseInitSQLFile.isPresent()) {
return;
}
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/type/PipelineDataSourceSinkTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/type/PipelineDataSourceSinkTest.java
index 53e13552ac9..b15284017af 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/type/PipelineDataSourceSinkTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/type/PipelineDataSourceSinkTest.java
@@ -32,7 +32,7 @@ import
org.apache.shardingsphere.data.pipeline.core.ingest.record.DataRecord;
import
org.apache.shardingsphere.data.pipeline.core.ingest.record.FinishedRecord;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.NormalColumn;
import org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
-import
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveIdentifier;
+import
org.apache.shardingsphere.infra.metadata.identifier.ShardingSphereIdentifier;
import
org.apache.shardingsphere.test.it.data.pipeline.core.fixture.algorithm.FixtureTransmissionJobItemContext;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -90,7 +90,7 @@ class PipelineDataSourceSinkTest {
}
private ImporterConfiguration mockImporterConfiguration() {
- Map<CaseInsensitiveIdentifier, Set<String>> shardingColumnsMap =
Collections.singletonMap(new CaseInsensitiveIdentifier("test_table"),
Collections.singleton("user"));
+ Map<ShardingSphereIdentifier, Set<String>> shardingColumnsMap =
Collections.singletonMap(new ShardingSphereIdentifier("test_table"),
Collections.singleton("user"));
return new ImporterConfiguration(dataSourceConfig, shardingColumnsMap,
new TableAndSchemaNameMapper(Collections.emptyMap()), 1000, null, 3, 3);
}
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java
index eb3b1038854..94ace243fbe 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java
@@ -52,10 +52,10 @@ import
org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
import org.apache.shardingsphere.infra.datanode.DataNode;
import org.apache.shardingsphere.infra.instance.ComputeNodeInstanceContext;
import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
-import
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveIdentifier;
import
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveQualifiedTable;
import
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereColumn;
import
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereTable;
+import
org.apache.shardingsphere.infra.metadata.identifier.ShardingSphereIdentifier;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
import
org.apache.shardingsphere.infra.yaml.config.swapper.mode.YamlModeConfigurationSwapper;
@@ -210,8 +210,8 @@ public final class PipelineContextUtils {
private static MigrationTaskConfiguration buildTaskConfiguration(final
MigrationJobConfiguration jobConfig, final int jobShardingItem, final
PipelineProcessConfiguration processConfig) {
IncrementalDumperContext incrementalDumperContext = new
MigrationIncrementalDumperContextCreator(jobConfig).createDumperContext(jobConfig.getJobShardingDataNodes().get(jobShardingItem));
Collection<CreateTableConfiguration> createTableConfigs =
buildCreateTableConfigurations(jobConfig,
incrementalDumperContext.getCommonContext().getTableAndSchemaNameMapper());
- Set<CaseInsensitiveIdentifier> targetTableNames =
jobConfig.getTargetTableNames().stream().map(CaseInsensitiveIdentifier::new).collect(Collectors.toSet());
- Map<CaseInsensitiveIdentifier, Set<String>> shardingColumnsMap = new
ShardingColumnsExtractor().getShardingColumnsMap(
+ Set<ShardingSphereIdentifier> targetTableNames =
jobConfig.getTargetTableNames().stream().map(ShardingSphereIdentifier::new).collect(Collectors.toSet());
+ Map<ShardingSphereIdentifier, Set<String>> shardingColumnsMap = new
ShardingColumnsExtractor().getShardingColumnsMap(
((ShardingSpherePipelineDataSourceConfiguration)
jobConfig.getTarget()).getRootConfig().getRules(), targetTableNames);
ImporterConfiguration importerConfig =
buildImporterConfiguration(jobConfig, processConfig, shardingColumnsMap,
incrementalDumperContext.getCommonContext().getTableAndSchemaNameMapper());
return new
MigrationTaskConfiguration(incrementalDumperContext.getCommonContext().getDataSourceName(),
createTableConfigs, incrementalDumperContext, importerConfig);
@@ -233,7 +233,7 @@ public final class PipelineContextUtils {
}
private static ImporterConfiguration buildImporterConfiguration(final
MigrationJobConfiguration jobConfig, final PipelineProcessConfiguration
pipelineProcessConfig,
- final
Map<CaseInsensitiveIdentifier, Set<String>> shardingColumnsMap, final
TableAndSchemaNameMapper tableAndSchemaNameMapper) {
+ final
Map<ShardingSphereIdentifier, Set<String>> shardingColumnsMap, final
TableAndSchemaNameMapper tableAndSchemaNameMapper) {
int batchSize = pipelineProcessConfig.getWrite().getBatchSize();
JobRateLimitAlgorithm writeRateLimitAlgorithm = new
TransmissionProcessContext(jobConfig.getJobId(),
pipelineProcessConfig).getWriteRateLimitAlgorithm();
int retryTimes = jobConfig.getRetryTimes();