This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 5fd2b197db1 Use ShardingSphereIdentifier to replace 
CaseInsensitiveIdentifier (#33950)
5fd2b197db1 is described below

commit 5fd2b197db15a5beab8921023ae99a88228cae6c
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Dec 7 13:42:03 2024 +0800

    Use ShardingSphereIdentifier to replace CaseInsensitiveIdentifier (#33950)
    
    * Use ShardingSphereIdentifier to replace CaseInsensitiveIdentifier
    
    * Use ShardingSphereIdentifier to replace CaseInsensitiveIdentifier
    
    * Use ShardingSphereIdentifier to replace CaseInsensitiveIdentifier
---
 .../caseinsensitive/CaseInsensitiveIdentifier.java | 40 --------------------
 .../CaseInsensitiveQualifiedTable.java             | 11 +++---
 .../identifier/ShardingSphereIdentifier.java       |  5 +++
 .../CaseInsensitiveIdentifierTest.java             | 37 ------------------
 .../checker/PipelineDataSourceCheckEngine.java     |  4 +-
 .../table/MatchingTableInventoryChecker.java       |  8 ++--
 .../SingleTableInventoryCalculateParameter.java    |  4 +-
 .../core/datanode/JobDataNodeLineConvertUtils.java |  6 +--
 .../mapper/ActualAndLogicTableNameMapper.java      | 10 ++---
 .../dumper/mapper/TableAndSchemaNameMapper.java    | 14 +++----
 .../StandardPipelineTableMetaDataLoader.java       | 44 +++++++++++-----------
 .../core/metadata/model/PipelineIndexMetaData.java |  4 +-
 .../core/metadata/model/PipelineTableMetaData.java |  8 ++--
 .../datasource/PipelineJobDataSourcePreparer.java  |  8 ++--
 .../splitter/InventoryDumperContextSplitter.java   |  4 +-
 .../metadata/model/PipelineTableMetaDataTest.java  |  4 +-
 .../incremental/dumper/MySQLIncrementalDumper.java |  4 +-
 .../dumper/MySQLIncrementalDumperTest.java         | 14 +++----
 .../ingest/incremental/wal/WALEventConverter.java  |  4 +-
 .../dumper/PostgreSQLIncrementalDumperTest.java    |  8 ++--
 .../incremental/wal/WALEventConverterTest.java     | 10 ++---
 .../MigrationDataConsistencyChecker.java           |  3 +-
 .../test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java |  2 +-
 23 files changed, 92 insertions(+), 164 deletions(-)

diff --git 
a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/caseinsensitive/CaseInsensitiveIdentifier.java
 
b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/caseinsensitive/CaseInsensitiveIdentifier.java
deleted file mode 100644
index 5149ec6a6ac..00000000000
--- 
a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/caseinsensitive/CaseInsensitiveIdentifier.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.infra.metadata.caseinsensitive;
-
-import com.cedarsoftware.util.CaseInsensitiveMap.CaseInsensitiveString;
-import lombok.EqualsAndHashCode;
-
-/**
- * Case insensitive identifier.
- */
-// TODO table name case-sensitive for some database
-@EqualsAndHashCode
-public final class CaseInsensitiveIdentifier {
-    
-    private final CaseInsensitiveString original;
-    
-    public CaseInsensitiveIdentifier(final String identifier) {
-        original = null == identifier ? null : new 
CaseInsensitiveString(identifier);
-    }
-    
-    @Override
-    public String toString() {
-        return null == original ? null : original.toString();
-    }
-}
diff --git 
a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/caseinsensitive/CaseInsensitiveQualifiedTable.java
 
b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/caseinsensitive/CaseInsensitiveQualifiedTable.java
index 96e4c27bf2c..d27d573d391 100644
--- 
a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/caseinsensitive/CaseInsensitiveQualifiedTable.java
+++ 
b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/caseinsensitive/CaseInsensitiveQualifiedTable.java
@@ -19,6 +19,7 @@ package 
org.apache.shardingsphere.infra.metadata.caseinsensitive;
 
 import lombok.EqualsAndHashCode;
 import lombok.Getter;
+import 
org.apache.shardingsphere.infra.metadata.identifier.ShardingSphereIdentifier;
 
 /**
  * Case insensitive qualified table.
@@ -28,17 +29,17 @@ import lombok.Getter;
 // TODO should merge with QualifiedTable
 public final class CaseInsensitiveQualifiedTable {
     
-    private final CaseInsensitiveIdentifier schemaName;
+    private final ShardingSphereIdentifier schemaName;
     
-    private final CaseInsensitiveIdentifier tableName;
+    private final ShardingSphereIdentifier tableName;
     
     public CaseInsensitiveQualifiedTable(final String schemaName, final String 
tableName) {
-        this.schemaName = new CaseInsensitiveIdentifier(schemaName);
-        this.tableName = new CaseInsensitiveIdentifier(tableName);
+        this.schemaName = new ShardingSphereIdentifier(schemaName);
+        this.tableName = new ShardingSphereIdentifier(tableName);
     }
     
     @Override
     public String toString() {
-        return null == schemaName.toString() ? tableName.toString() : 
String.join(".", schemaName.toString(), tableName.toString());
+        return null == schemaName.getValue() ? tableName.getValue() : 
String.join(".", schemaName.getValue(), tableName.getValue());
     }
 }
diff --git 
a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/identifier/ShardingSphereIdentifier.java
 
b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/identifier/ShardingSphereIdentifier.java
index ea0699c70f1..7f54a7bad7e 100644
--- 
a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/identifier/ShardingSphereIdentifier.java
+++ 
b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/identifier/ShardingSphereIdentifier.java
@@ -75,4 +75,9 @@ public final class ShardingSphereIdentifier {
     public int hashCode() {
         return isCaseSensitive ? value.toString().hashCode() : 
value.hashCode();
     }
+    
+    @Override
+    public String toString() {
+        return getValue();
+    }
 }
diff --git 
a/infra/common/src/test/java/org/apache/shardingsphere/infra/metadata/caseinsensitive/CaseInsensitiveIdentifierTest.java
 
b/infra/common/src/test/java/org/apache/shardingsphere/infra/metadata/caseinsensitive/CaseInsensitiveIdentifierTest.java
deleted file mode 100644
index 6bf978bdb13..00000000000
--- 
a/infra/common/src/test/java/org/apache/shardingsphere/infra/metadata/caseinsensitive/CaseInsensitiveIdentifierTest.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.infra.metadata.caseinsensitive;
-
-import org.junit.jupiter.api.Test;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
-
-class CaseInsensitiveIdentifierTest {
-    
-    @Test
-    void assertEquals() {
-        assertThat(new CaseInsensitiveIdentifier("t_order"), is(new 
CaseInsensitiveIdentifier("T_ORDER")));
-    }
-    
-    @Test
-    void assertToString() {
-        CaseInsensitiveIdentifier actual = new 
CaseInsensitiveIdentifier("T_ORDER");
-        assertThat(actual.toString(), is("T_ORDER"));
-    }
-}
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/PipelineDataSourceCheckEngine.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/PipelineDataSourceCheckEngine.java
index 66c2137a016..03bfdfafe9c 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/PipelineDataSourceCheckEngine.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/PipelineDataSourceCheckEngine.java
@@ -91,7 +91,7 @@ public final class PipelineDataSourceCheckEngine {
         try {
             for (DataSource each : dataSources) {
                 for (CaseInsensitiveQualifiedTable qualifiedTable : 
importerConfig.getQualifiedTables()) {
-                    
ShardingSpherePreconditions.checkState(checkEmptyTable(each, qualifiedTable), 
() -> new 
PrepareJobWithTargetTableNotEmptyException(qualifiedTable.getTableName().toString()));
+                    
ShardingSpherePreconditions.checkState(checkEmptyTable(each, qualifiedTable), 
() -> new 
PrepareJobWithTargetTableNotEmptyException(qualifiedTable.getTableName().getValue()));
                 }
             }
         } catch (final SQLException ex) {
@@ -108,7 +108,7 @@ public final class PipelineDataSourceCheckEngine {
      * @throws SQLException if there's database operation failure
      */
     public boolean checkEmptyTable(final DataSource dataSource, final 
CaseInsensitiveQualifiedTable qualifiedTable) throws SQLException {
-        String sql = 
sqlBuilder.buildCheckEmptyTableSQL(qualifiedTable.getSchemaName().toString(), 
qualifiedTable.getTableName().toString());
+        String sql = 
sqlBuilder.buildCheckEmptyTableSQL(qualifiedTable.getSchemaName().getValue(), 
qualifiedTable.getTableName().getValue());
         try (
                 Connection connection = dataSource.getConnection();
                 PreparedStatement preparedStatement = 
connection.prepareStatement(sql);
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java
index 9a98e67a92f..b26358e8b50 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java
@@ -73,9 +73,9 @@ public abstract class MatchingTableInventoryChecker 
implements TableInventoryChe
     
     private TableDataConsistencyCheckResult 
checkSingleTableInventoryData(final TableInventoryCheckParameter param, final 
ThreadPoolExecutor executor) {
         SingleTableInventoryCalculateParameter sourceParam = new 
SingleTableInventoryCalculateParameter(param.getSourceDataSource(), 
param.getSourceTable(),
-                param.getColumnNames(), param.getUniqueKeys(), 
param.getProgressContext().getSourceTableCheckPositions().get(param.getSourceTable().getTableName().toString()));
+                param.getColumnNames(), param.getUniqueKeys(), 
param.getProgressContext().getSourceTableCheckPositions().get(param.getSourceTable().getTableName().getValue()));
         SingleTableInventoryCalculateParameter targetParam = new 
SingleTableInventoryCalculateParameter(param.getTargetDataSource(), 
param.getTargetTable(),
-                param.getColumnNames(), param.getUniqueKeys(), 
param.getProgressContext().getTargetTableCheckPositions().get(param.getTargetTable().getTableName().toString()));
+                param.getColumnNames(), param.getUniqueKeys(), 
param.getProgressContext().getTargetTableCheckPositions().get(param.getTargetTable().getTableName().getValue()));
         SingleTableInventoryCalculator sourceCalculator = 
buildSingleTableInventoryCalculator();
         this.sourceCalculator = sourceCalculator;
         SingleTableInventoryCalculator targetCalculator = 
buildSingleTableInventoryCalculator();
@@ -108,10 +108,10 @@ public abstract class MatchingTableInventoryChecker 
implements TableInventoryChe
                 break;
             }
             if (sourceCalculatedResult.getMaxUniqueKeyValue().isPresent()) {
-                
param.getProgressContext().getSourceTableCheckPositions().put(param.getSourceTable().getTableName().toString(),
 sourceCalculatedResult.getMaxUniqueKeyValue().get());
+                
param.getProgressContext().getSourceTableCheckPositions().put(param.getSourceTable().getTableName().getValue(),
 sourceCalculatedResult.getMaxUniqueKeyValue().get());
             }
             if (targetCalculatedResult.getMaxUniqueKeyValue().isPresent()) {
-                
param.getProgressContext().getTargetTableCheckPositions().put(param.getTargetTable().getTableName().toString(),
 targetCalculatedResult.getMaxUniqueKeyValue().get());
+                
param.getProgressContext().getTargetTableCheckPositions().put(param.getTargetTable().getTableName().getValue(),
 targetCalculatedResult.getMaxUniqueKeyValue().get());
             }
             param.getProgressContext().onProgressUpdated(new 
PipelineJobUpdateProgress(sourceCalculatedResult.getRecordsCount()));
         }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/SingleTableInventoryCalculateParameter.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/SingleTableInventoryCalculateParameter.java
index 76391012ac4..62d34286768 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/SingleTableInventoryCalculateParameter.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/SingleTableInventoryCalculateParameter.java
@@ -92,7 +92,7 @@ public final class SingleTableInventoryCalculateParameter {
      * @return schema name
      */
     public String getSchemaName() {
-        return table.getSchemaName().toString();
+        return table.getSchemaName().getValue();
     }
     
     /**
@@ -101,7 +101,7 @@ public final class SingleTableInventoryCalculateParameter {
      * @return logic table name
      */
     public String getLogicTableName() {
-        return table.getTableName().toString();
+        return table.getTableName().getValue();
     }
     
     /**
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datanode/JobDataNodeLineConvertUtils.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datanode/JobDataNodeLineConvertUtils.java
index 4d16d743542..5692f107bf4 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datanode/JobDataNodeLineConvertUtils.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/datanode/JobDataNodeLineConvertUtils.java
@@ -20,9 +20,9 @@ package org.apache.shardingsphere.data.pipeline.core.datanode;
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
 import org.apache.commons.lang3.tuple.Pair;
-import 
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveIdentifier;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.mapper.ActualAndLogicTableNameMapper;
 import org.apache.shardingsphere.infra.datanode.DataNode;
+import 
org.apache.shardingsphere.infra.metadata.identifier.ShardingSphereIdentifier;
 
 import java.util.LinkedHashMap;
 import java.util.LinkedList;
@@ -75,10 +75,10 @@ public final class JobDataNodeLineConvertUtils {
      * @return actual and logic table name mapper
      */
     public static ActualAndLogicTableNameMapper buildTableNameMapper(final 
JobDataNodeLine dataNodeLine) {
-        Map<CaseInsensitiveIdentifier, CaseInsensitiveIdentifier> map = new 
LinkedHashMap<>();
+        Map<ShardingSphereIdentifier, ShardingSphereIdentifier> map = new 
LinkedHashMap<>();
         for (JobDataNodeEntry each : dataNodeLine.getEntries()) {
             for (DataNode dataNode : each.getDataNodes()) {
-                map.put(new 
CaseInsensitiveIdentifier(dataNode.getTableName()), new 
CaseInsensitiveIdentifier(each.getLogicTableName()));
+                map.put(new ShardingSphereIdentifier(dataNode.getTableName()), 
new ShardingSphereIdentifier(each.getLogicTableName()));
             }
         }
         return new ActualAndLogicTableNameMapper(map);
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/mapper/ActualAndLogicTableNameMapper.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/mapper/ActualAndLogicTableNameMapper.java
index 7ce95578ac3..b8d7f401ff1 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/mapper/ActualAndLogicTableNameMapper.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/mapper/ActualAndLogicTableNameMapper.java
@@ -20,7 +20,7 @@ package 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.mapper;
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import lombok.ToString;
-import 
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveIdentifier;
+import 
org.apache.shardingsphere.infra.metadata.identifier.ShardingSphereIdentifier;
 
 import java.util.Map;
 
@@ -32,7 +32,7 @@ import java.util.Map;
 @ToString
 public final class ActualAndLogicTableNameMapper {
     
-    private final Map<CaseInsensitiveIdentifier, CaseInsensitiveIdentifier> 
tableNameMap;
+    private final Map<ShardingSphereIdentifier, ShardingSphereIdentifier> 
tableNameMap;
     
     /**
      * Get logic table name.
@@ -40,8 +40,8 @@ public final class ActualAndLogicTableNameMapper {
      * @param actualTableName actual table name
      * @return logic table name
      */
-    public CaseInsensitiveIdentifier getLogicTableName(final String 
actualTableName) {
-        return tableNameMap.get(new 
CaseInsensitiveIdentifier(actualTableName));
+    public ShardingSphereIdentifier getLogicTableName(final String 
actualTableName) {
+        return tableNameMap.get(new ShardingSphereIdentifier(actualTableName));
     }
     
     /**
@@ -51,6 +51,6 @@ public final class ActualAndLogicTableNameMapper {
      * @return contains or not
      */
     public boolean containsTable(final String actualTableName) {
-        return tableNameMap.containsKey(new 
CaseInsensitiveIdentifier(actualTableName));
+        return tableNameMap.containsKey(new 
ShardingSphereIdentifier(actualTableName));
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/mapper/TableAndSchemaNameMapper.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/mapper/TableAndSchemaNameMapper.java
index 4c1f3d40df8..512c72fb8d7 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/mapper/TableAndSchemaNameMapper.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/ingest/dumper/mapper/TableAndSchemaNameMapper.java
@@ -18,7 +18,7 @@
 package org.apache.shardingsphere.data.pipeline.core.ingest.dumper.mapper;
 
 import lombok.ToString;
-import 
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveIdentifier;
+import 
org.apache.shardingsphere.infra.metadata.identifier.ShardingSphereIdentifier;
 
 import java.util.Collection;
 import java.util.Collections;
@@ -33,7 +33,7 @@ import java.util.stream.Collectors;
 @ToString
 public final class TableAndSchemaNameMapper {
     
-    private final Map<CaseInsensitiveIdentifier, String> mapping;
+    private final Map<ShardingSphereIdentifier, String> mapping;
     
     public TableAndSchemaNameMapper(final Map<String, String> tableSchemaMap) {
         mapping = null == tableSchemaMap ? Collections.emptyMap() : 
getLogicTableNameMap(tableSchemaMap);
@@ -44,13 +44,13 @@ public final class TableAndSchemaNameMapper {
         mapping = getLogicTableNameMap(tableNameSchemaMap);
     }
     
-    private Map<CaseInsensitiveIdentifier, String> getLogicTableNameMap(final 
Map<String, String> tableSchemaMap) {
-        Map<CaseInsensitiveIdentifier, String> result = new 
HashMap<>(tableSchemaMap.size(), 1F);
+    private Map<ShardingSphereIdentifier, String> getLogicTableNameMap(final 
Map<String, String> tableSchemaMap) {
+        Map<ShardingSphereIdentifier, String> result = new 
HashMap<>(tableSchemaMap.size(), 1F);
         for (Entry<String, String> entry : tableSchemaMap.entrySet()) {
             String tableName = entry.getKey();
             String schemaName = entry.getValue();
             if (null != schemaName) {
-                result.put(new CaseInsensitiveIdentifier(tableName), 
schemaName);
+                result.put(new ShardingSphereIdentifier(tableName), 
schemaName);
             }
         }
         return result;
@@ -63,7 +63,7 @@ public final class TableAndSchemaNameMapper {
      * @return schema name
      */
     public String getSchemaName(final String logicTableName) {
-        return mapping.get(new CaseInsensitiveIdentifier(logicTableName));
+        return mapping.get(new ShardingSphereIdentifier(logicTableName));
     }
     
     /**
@@ -72,7 +72,7 @@ public final class TableAndSchemaNameMapper {
      * @param logicTableName logic table name
      * @return schema name
      */
-    public String getSchemaName(final CaseInsensitiveIdentifier 
logicTableName) {
+    public String getSchemaName(final ShardingSphereIdentifier logicTableName) 
{
         return mapping.get(logicTableName);
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/StandardPipelineTableMetaDataLoader.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/StandardPipelineTableMetaDataLoader.java
index 2cde14d35b8..c75f88041fa 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/StandardPipelineTableMetaDataLoader.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/loader/StandardPipelineTableMetaDataLoader.java
@@ -27,7 +27,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineIndex
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
 import 
org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
-import 
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveIdentifier;
+import 
org.apache.shardingsphere.infra.metadata.identifier.ShardingSphereIdentifier;
 
 import java.sql.Connection;
 import java.sql.ResultSet;
@@ -51,11 +51,11 @@ public final class StandardPipelineTableMetaDataLoader 
implements PipelineTableM
     
     private final PipelineDataSource dataSource;
     
-    private final Map<CaseInsensitiveIdentifier, PipelineTableMetaData> 
tableMetaDataMap = new ConcurrentHashMap<>();
+    private final Map<ShardingSphereIdentifier, PipelineTableMetaData> 
tableMetaDataMap = new ConcurrentHashMap<>();
     
     @Override
     public PipelineTableMetaData getTableMetaData(final String schemaName, 
final String tableName) {
-        PipelineTableMetaData result = tableMetaDataMap.get(new 
CaseInsensitiveIdentifier(tableName));
+        PipelineTableMetaData result = tableMetaDataMap.get(new 
ShardingSphereIdentifier(tableName));
         if (null != result) {
             return result;
         }
@@ -64,7 +64,7 @@ public final class StandardPipelineTableMetaDataLoader 
implements PipelineTableM
         } catch (final SQLException ex) {
             throw new PipelineInternalException(String.format("Load meta data 
for schema '%s' and table '%s' failed", schemaName, tableName), ex);
         }
-        result = tableMetaDataMap.get(new 
CaseInsensitiveIdentifier(tableName));
+        result = tableMetaDataMap.get(new ShardingSphereIdentifier(tableName));
         if (null == result) {
             log.warn("Can not load meta data for table '{}'", tableName);
         }
@@ -74,12 +74,12 @@ public final class StandardPipelineTableMetaDataLoader 
implements PipelineTableM
     private void loadTableMetaData(final String schemaName, final String 
tableNamePattern) throws SQLException {
         try (Connection connection = dataSource.getConnection()) {
             DialectDatabaseMetaData dialectDatabaseMetaData = new 
DatabaseTypeRegistry(dataSource.getDatabaseType()).getDialectDatabaseMetaData();
-            Map<CaseInsensitiveIdentifier, PipelineTableMetaData> 
tableMetaDataMap = loadTableMetaData0(connection, 
dialectDatabaseMetaData.isSchemaAvailable() ? schemaName : null, 
tableNamePattern);
+            Map<ShardingSphereIdentifier, PipelineTableMetaData> 
tableMetaDataMap = loadTableMetaData0(connection, 
dialectDatabaseMetaData.isSchemaAvailable() ? schemaName : null, 
tableNamePattern);
             this.tableMetaDataMap.putAll(tableMetaDataMap);
         }
     }
     
-    private Map<CaseInsensitiveIdentifier, PipelineTableMetaData> 
loadTableMetaData0(final Connection connection, final String schemaName, final 
String tableNamePattern) throws SQLException {
+    private Map<ShardingSphereIdentifier, PipelineTableMetaData> 
loadTableMetaData0(final Connection connection, final String schemaName, final 
String tableNamePattern) throws SQLException {
         Collection<String> tableNames = new LinkedList<>();
         try (ResultSet resultSet = 
connection.getMetaData().getTables(connection.getCatalog(), schemaName, 
tableNamePattern, null)) {
             while (resultSet.next()) {
@@ -87,15 +87,15 @@ public final class StandardPipelineTableMetaDataLoader 
implements PipelineTableM
                 tableNames.add(tableName);
             }
         }
-        Map<CaseInsensitiveIdentifier, PipelineTableMetaData> result = new 
LinkedHashMap<>(tableNames.size(), 1F);
+        Map<ShardingSphereIdentifier, PipelineTableMetaData> result = new 
LinkedHashMap<>(tableNames.size(), 1F);
         for (String each : tableNames) {
-            Collection<CaseInsensitiveIdentifier> primaryKeys = 
loadPrimaryKeys(connection, schemaName, each);
-            Map<CaseInsensitiveIdentifier, 
Collection<CaseInsensitiveIdentifier>> uniqueKeys = 
loadUniqueIndexesOfTable(connection, schemaName, each);
-            Map<CaseInsensitiveIdentifier, PipelineColumnMetaData> 
columnMetaDataMap = new LinkedHashMap<>();
+            Collection<ShardingSphereIdentifier> primaryKeys = 
loadPrimaryKeys(connection, schemaName, each);
+            Map<ShardingSphereIdentifier, 
Collection<ShardingSphereIdentifier>> uniqueKeys = 
loadUniqueIndexesOfTable(connection, schemaName, each);
+            Map<ShardingSphereIdentifier, PipelineColumnMetaData> 
columnMetaDataMap = new LinkedHashMap<>();
             try (ResultSet resultSet = 
connection.getMetaData().getColumns(connection.getCatalog(), schemaName, each, 
"%")) {
                 while (resultSet.next()) {
                     int ordinalPosition = resultSet.getInt("ORDINAL_POSITION");
-                    CaseInsensitiveIdentifier columnName = new 
CaseInsensitiveIdentifier(resultSet.getString("COLUMN_NAME"));
+                    ShardingSphereIdentifier columnName = new 
ShardingSphereIdentifier(resultSet.getString("COLUMN_NAME"));
                     if (columnMetaDataMap.containsKey(columnName)) {
                         continue;
                     }
@@ -112,14 +112,14 @@ public final class StandardPipelineTableMetaDataLoader 
implements PipelineTableM
                     .map(entry -> new PipelineIndexMetaData(entry.getKey(), 
entry.getValue().stream().map(columnMetaDataMap::get).collect(Collectors.toList()),
                             
DataConsistencyCheckUtils.compareLists(primaryKeys, entry.getValue())))
                     .collect(Collectors.toList());
-            result.put(new CaseInsensitiveIdentifier(each), new 
PipelineTableMetaData(each, columnMetaDataMap, uniqueIndexMetaData));
+            result.put(new ShardingSphereIdentifier(each), new 
PipelineTableMetaData(each, columnMetaDataMap, uniqueIndexMetaData));
         }
         return result;
     }
     
-    private Map<CaseInsensitiveIdentifier, 
Collection<CaseInsensitiveIdentifier>> loadUniqueIndexesOfTable(final 
Connection connection,
-                                                                               
                            final String schemaName, final String tableName) 
throws SQLException {
-        Map<String, SortedMap<Short, CaseInsensitiveIdentifier>> 
orderedColumnsOfIndexes = new LinkedHashMap<>();
+    private Map<ShardingSphereIdentifier, 
Collection<ShardingSphereIdentifier>> loadUniqueIndexesOfTable(final Connection 
connection,
+                                                                               
                          final String schemaName, final String tableName) 
throws SQLException {
+        Map<String, SortedMap<Short, ShardingSphereIdentifier>> 
orderedColumnsOfIndexes = new LinkedHashMap<>();
         // Set approximate=true to avoid Oracle driver 19 run `analyze table`
         try (ResultSet resultSet = 
connection.getMetaData().getIndexInfo(connection.getCatalog(), schemaName, 
tableName, true, true)) {
             while (resultSet.next()) {
@@ -128,22 +128,22 @@ public final class StandardPipelineTableMetaDataLoader 
implements PipelineTableM
                     continue;
                 }
                 orderedColumnsOfIndexes.computeIfAbsent(indexName, unused -> 
new TreeMap<>()).put(
-                        resultSet.getShort("ORDINAL_POSITION"), new 
CaseInsensitiveIdentifier(resultSet.getString("COLUMN_NAME")));
+                        resultSet.getShort("ORDINAL_POSITION"), new 
ShardingSphereIdentifier(resultSet.getString("COLUMN_NAME")));
             }
         }
-        Map<CaseInsensitiveIdentifier, Collection<CaseInsensitiveIdentifier>> 
result = new LinkedHashMap<>();
-        for (Entry<String, SortedMap<Short, CaseInsensitiveIdentifier>> entry 
: orderedColumnsOfIndexes.entrySet()) {
-            Collection<CaseInsensitiveIdentifier> columnNames = 
result.computeIfAbsent(new CaseInsensitiveIdentifier(entry.getKey()), unused -> 
new LinkedList<>());
+        Map<ShardingSphereIdentifier, Collection<ShardingSphereIdentifier>> 
result = new LinkedHashMap<>();
+        for (Entry<String, SortedMap<Short, ShardingSphereIdentifier>> entry : 
orderedColumnsOfIndexes.entrySet()) {
+            Collection<ShardingSphereIdentifier> columnNames = 
result.computeIfAbsent(new ShardingSphereIdentifier(entry.getKey()), unused -> 
new LinkedList<>());
             columnNames.addAll(entry.getValue().values());
         }
         return result;
     }
     
-    private Collection<CaseInsensitiveIdentifier> loadPrimaryKeys(final 
Connection connection, final String schemaName, final String tableName) throws 
SQLException {
-        SortedMap<Short, CaseInsensitiveIdentifier> result = new TreeMap<>();
+    private Collection<ShardingSphereIdentifier> loadPrimaryKeys(final 
Connection connection, final String schemaName, final String tableName) throws 
SQLException {
+        SortedMap<Short, ShardingSphereIdentifier> result = new TreeMap<>();
         try (ResultSet resultSet = 
connection.getMetaData().getPrimaryKeys(connection.getCatalog(), schemaName, 
tableName)) {
             while (resultSet.next()) {
-                result.put(resultSet.getShort("KEY_SEQ"), new 
CaseInsensitiveIdentifier(resultSet.getString("COLUMN_NAME")));
+                result.put(resultSet.getShort("KEY_SEQ"), new 
ShardingSphereIdentifier(resultSet.getString("COLUMN_NAME")));
             }
         }
         return result.values();
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineIndexMetaData.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineIndexMetaData.java
index 624fae31d5d..895ce1a3fd8 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineIndexMetaData.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineIndexMetaData.java
@@ -20,7 +20,7 @@ package 
org.apache.shardingsphere.data.pipeline.core.metadata.model;
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import lombok.ToString;
-import 
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveIdentifier;
+import 
org.apache.shardingsphere.infra.metadata.identifier.ShardingSphereIdentifier;
 
 import java.util.List;
 
@@ -32,7 +32,7 @@ import java.util.List;
 @ToString
 public final class PipelineIndexMetaData {
     
-    private final CaseInsensitiveIdentifier name;
+    private final ShardingSphereIdentifier name;
     
     private final List<PipelineColumnMetaData> columns;
     
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineTableMetaData.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineTableMetaData.java
index f108aa3469a..c2b8904b138 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineTableMetaData.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineTableMetaData.java
@@ -24,7 +24,7 @@ import lombok.NonNull;
 import lombok.RequiredArgsConstructor;
 import lombok.ToString;
 import lombok.extern.slf4j.Slf4j;
-import 
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveIdentifier;
+import 
org.apache.shardingsphere.infra.metadata.identifier.ShardingSphereIdentifier;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -46,7 +46,7 @@ public final class PipelineTableMetaData {
     @NonNull
     private final String name;
     
-    private final Map<CaseInsensitiveIdentifier, PipelineColumnMetaData> 
columnMetaDataMap;
+    private final Map<ShardingSphereIdentifier, PipelineColumnMetaData> 
columnMetaDataMap;
     
     @Getter
     private final List<String> columnNames;
@@ -57,7 +57,7 @@ public final class PipelineTableMetaData {
     @Getter
     private final Collection<PipelineIndexMetaData> uniqueIndexes;
     
-    public PipelineTableMetaData(final String name, final 
Map<CaseInsensitiveIdentifier, PipelineColumnMetaData> columnMetaDataMap, final 
Collection<PipelineIndexMetaData> uniqueIndexes) {
+    public PipelineTableMetaData(final String name, final 
Map<ShardingSphereIdentifier, PipelineColumnMetaData> columnMetaDataMap, final 
Collection<PipelineIndexMetaData> uniqueIndexes) {
         this.name = name;
         this.columnMetaDataMap = columnMetaDataMap;
         List<PipelineColumnMetaData> columnMetaDataList = new 
ArrayList<>(columnMetaDataMap.values());
@@ -87,7 +87,7 @@ public final class PipelineTableMetaData {
      * @return column meta data
      */
     public PipelineColumnMetaData getColumnMetaData(final String columnName) {
-        PipelineColumnMetaData result = columnMetaDataMap.get(new 
CaseInsensitiveIdentifier(columnName));
+        PipelineColumnMetaData result = columnMetaDataMap.get(new 
ShardingSphereIdentifier(columnName));
         if (null == result) {
             log.warn("Can not get column meta data for column name '{}', 
columnNames={}", columnName, columnNames);
         }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/PipelineJobDataSourcePreparer.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/PipelineJobDataSourcePreparer.java
index 335c6ed8a42..2de8a10e05f 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/PipelineJobDataSourcePreparer.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/PipelineJobDataSourcePreparer.java
@@ -73,7 +73,7 @@ public final class PipelineJobDataSourcePreparer {
         PipelinePrepareSQLBuilder pipelineSQLBuilder = new 
PipelinePrepareSQLBuilder(targetDatabaseType);
         Collection<String> createdSchemaNames = new 
HashSet<>(param.getCreateTableConfigurations().size(), 1F);
         for (CreateTableConfiguration each : 
param.getCreateTableConfigurations()) {
-            String targetSchemaName = 
each.getTargetName().getSchemaName().toString();
+            String targetSchemaName = 
each.getTargetName().getSchemaName().getValue();
             if (null == targetSchemaName || 
targetSchemaName.equalsIgnoreCase(defaultSchema) || 
createdSchemaNames.contains(targetSchemaName)) {
                 continue;
             }
@@ -124,9 +124,9 @@ public final class PipelineJobDataSourcePreparer {
                                                  final 
PipelineDataSourceManager dataSourceManager, final SQLParserEngine 
sqlParserEngine) throws SQLException {
         DatabaseType databaseType = 
createTableConfig.getSourceDataSourceConfig().getDatabaseType();
         DataSource sourceDataSource = 
dataSourceManager.getDataSource(createTableConfig.getSourceDataSourceConfig());
-        String schemaName = 
createTableConfig.getSourceName().getSchemaName().toString();
-        String sourceTableName = 
createTableConfig.getSourceName().getTableName().toString();
-        String targetTableName = 
createTableConfig.getTargetName().getTableName().toString();
+        String schemaName = 
createTableConfig.getSourceName().getSchemaName().getValue();
+        String sourceTableName = 
createTableConfig.getSourceName().getTableName().getValue();
+        String targetTableName = 
createTableConfig.getTargetName().getTableName().getValue();
         return new PipelineDDLGenerator().generateLogicDDL(databaseType, 
sourceDataSource, schemaName, sourceTableName, targetTableName, 
sqlParserEngine);
     }
     
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java
index 25d17e637be..bca5d267888 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/inventory/splitter/InventoryDumperContextSplitter.java
@@ -36,7 +36,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.preparer.inventory.calculato
 import 
org.apache.shardingsphere.data.pipeline.core.ratelimit.JobRateLimitAlgorithm;
 import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql.PipelinePrepareSQLBuilder;
 import org.apache.shardingsphere.data.pipeline.core.util.PipelineJdbcUtils;
-import 
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveIdentifier;
+import 
org.apache.shardingsphere.infra.metadata.identifier.ShardingSphereIdentifier;
 
 import java.sql.Connection;
 import java.sql.ResultSet;
@@ -73,7 +73,7 @@ public final class InventoryDumperContextSplitter {
                 .stream().map(entry -> 
createTableSpLitDumperContext(entry.getKey(), 
entry.getValue())).collect(Collectors.toList());
     }
     
-    private InventoryDumperContext createTableSpLitDumperContext(final 
CaseInsensitiveIdentifier actualTableName, final CaseInsensitiveIdentifier 
logicTableName) {
+    private InventoryDumperContext createTableSpLitDumperContext(final 
ShardingSphereIdentifier actualTableName, final ShardingSphereIdentifier 
logicTableName) {
         InventoryDumperContext result = new 
InventoryDumperContext(dumperContext.getCommonContext());
         result.setActualTableName(actualTableName.toString());
         result.setLogicTableName(logicTableName.toString());
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineTableMetaDataTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineTableMetaDataTest.java
index adff3e358aa..b4a2536b6a4 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineTableMetaDataTest.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/metadata/model/PipelineTableMetaDataTest.java
@@ -17,7 +17,7 @@
 
 package org.apache.shardingsphere.data.pipeline.core.metadata.model;
 
-import 
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveIdentifier;
+import 
org.apache.shardingsphere.infra.metadata.identifier.ShardingSphereIdentifier;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
@@ -37,7 +37,7 @@ class PipelineTableMetaDataTest {
     @BeforeEach
     void setUp() {
         PipelineColumnMetaData column = new PipelineColumnMetaData(1, "test", 
Types.INTEGER, "INTEGER", true, true, true);
-        pipelineTableMetaData = new PipelineTableMetaData("test_data", 
Collections.singletonMap(new CaseInsensitiveIdentifier("test"), column), 
Collections.emptySet());
+        pipelineTableMetaData = new PipelineTableMetaData("test_data", 
Collections.singletonMap(new ShardingSphereIdentifier("test"), column), 
Collections.emptySet());
     }
     
     @Test
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumper.java
 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumper.java
index 8d1dedecfe3..bc03384ed1a 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumper.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumper.java
@@ -46,7 +46,7 @@ import 
org.apache.shardingsphere.infra.database.core.connector.ConnectionPropert
 import 
org.apache.shardingsphere.infra.database.core.connector.ConnectionPropertiesParser;
 import 
org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
-import 
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveIdentifier;
+import 
org.apache.shardingsphere.infra.metadata.identifier.ShardingSphereIdentifier;
 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
 
 import java.io.Serializable;
@@ -141,7 +141,7 @@ public final class MySQLIncrementalDumper extends 
AbstractPipelineLifecycleRunna
     }
     
     private PipelineTableMetaData getPipelineTableMetaData(final String 
actualTableName) {
-        CaseInsensitiveIdentifier logicTableName = 
dumperContext.getCommonContext().getTableNameMapper().getLogicTableName(actualTableName);
+        ShardingSphereIdentifier logicTableName = 
dumperContext.getCommonContext().getTableNameMapper().getLogicTableName(actualTableName);
         return 
metaDataLoader.getTableMetaData(dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(logicTableName),
 actualTableName);
     }
     
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumperTest.java
 
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumperTest.java
index ac1469b6e66..12ef402c3c1 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumperTest.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumperTest.java
@@ -20,8 +20,8 @@ package 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.dumper;
 import 
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.channel.memory.MemoryPipelineChannel;
 import 
org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
+import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.DumperCommonContext;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperContext;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.mapper.ActualAndLogicTableNameMapper;
@@ -31,13 +31,13 @@ import 
org.apache.shardingsphere.data.pipeline.core.ingest.record.Record;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.PipelineTableMetaDataLoader;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
-import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.position.MySQLBinlogPosition;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.MySQLBaseBinlogEvent;
-import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows.MySQLDeleteRowsBinlogEvent;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.PlaceholderBinlogEvent;
+import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows.MySQLDeleteRowsBinlogEvent;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows.MySQLUpdateRowsBinlogEvent;
 import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows.MySQLWriteRowsBinlogEvent;
-import 
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveIdentifier;
+import 
org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.position.MySQLBinlogPosition;
+import 
org.apache.shardingsphere.infra.metadata.identifier.ShardingSphereIdentifier;
 import org.apache.shardingsphere.test.fixture.jdbc.MockedDriver;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
@@ -103,7 +103,7 @@ class MySQLIncrementalDumperTest {
         poolProps.put("password", "root");
         DumperCommonContext commonContext = new DumperCommonContext(null,
                 new StandardPipelineDataSourceConfiguration(poolProps),
-                new ActualAndLogicTableNameMapper(Collections.singletonMap(new 
CaseInsensitiveIdentifier("t_order"), new 
CaseInsensitiveIdentifier("t_order"))),
+                new ActualAndLogicTableNameMapper(Collections.singletonMap(new 
ShardingSphereIdentifier("t_order"), new ShardingSphereIdentifier("t_order"))),
                 new TableAndSchemaNameMapper(Collections.emptyMap()));
         return new IncrementalDumperContext(commonContext, null, false);
     }
@@ -120,8 +120,8 @@ class MySQLIncrementalDumperTest {
         }
     }
     
-    private Map<CaseInsensitiveIdentifier, PipelineColumnMetaData> 
mockOrderColumnsMetaDataMap() {
-        return 
mockOrderColumnsMetaDataList().stream().collect(Collectors.toMap(metaData -> 
new CaseInsensitiveIdentifier(metaData.getName()), Function.identity()));
+    private Map<ShardingSphereIdentifier, PipelineColumnMetaData> 
mockOrderColumnsMetaDataMap() {
+        return 
mockOrderColumnsMetaDataList().stream().collect(Collectors.toMap(metaData -> 
new ShardingSphereIdentifier(metaData.getName()), Function.identity()));
     }
     
     private List<PipelineColumnMetaData> mockOrderColumnsMetaDataList() {
diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/incremental/wal/WALEventConverter.java
 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/incremental/wal/WALEventConverter.java
index 1857c9312e2..459dedfecab 100644
--- 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/incremental/wal/WALEventConverter.java
+++ 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/incremental/wal/WALEventConverter.java
@@ -33,7 +33,7 @@ import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.incremental.wal
 import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.incremental.wal.event.UpdateRowEvent;
 import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.incremental.wal.event.WriteRowEvent;
 import 
org.apache.shardingsphere.infra.exception.generic.UnsupportedSQLOperationException;
-import 
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveIdentifier;
+import 
org.apache.shardingsphere.infra.metadata.identifier.ShardingSphereIdentifier;
 
 import java.util.List;
 
@@ -91,7 +91,7 @@ public final class WALEventConverter {
     }
     
     private PipelineTableMetaData getPipelineTableMetaData(final String 
actualTableName) {
-        CaseInsensitiveIdentifier logicTableName = 
dumperContext.getCommonContext().getTableNameMapper().getLogicTableName(actualTableName);
+        ShardingSphereIdentifier logicTableName = 
dumperContext.getCommonContext().getTableNameMapper().getLogicTableName(actualTableName);
         return 
metaDataLoader.getTableMetaData(dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(logicTableName),
 actualTableName);
     }
     
diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/incremental/dumper/PostgreSQLIncrementalDumperTest.java
 
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/incremental/dumper/PostgreSQLIncrementalDumperTest.java
index e3e89473c5e..47f39db241d 100644
--- 
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/incremental/dumper/PostgreSQLIncrementalDumperTest.java
+++ 
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/incremental/dumper/PostgreSQLIncrementalDumperTest.java
@@ -26,13 +26,13 @@ import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.In
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.mapper.ActualAndLogicTableNameMapper;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.mapper.TableAndSchemaNameMapper;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
-import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.incremental.wal.position.PostgreSQLIncrementalPositionManager;
-import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.incremental.wal.position.slot.PostgreSQLSlotNameGenerator;
 import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.incremental.wal.PostgreSQLLogicalReplication;
 import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.incremental.wal.WALPosition;
 import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.incremental.wal.decode.PostgreSQLLogSequenceNumber;
+import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.incremental.wal.position.PostgreSQLIncrementalPositionManager;
+import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.incremental.wal.position.slot.PostgreSQLSlotNameGenerator;
 import 
org.apache.shardingsphere.infra.exception.core.external.sql.type.wrapper.SQLWrapperException;
-import 
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveIdentifier;
+import 
org.apache.shardingsphere.infra.metadata.identifier.ShardingSphereIdentifier;
 import org.apache.shardingsphere.test.mock.AutoMockExtension;
 import org.apache.shardingsphere.test.mock.StaticMockSettings;
 import org.junit.jupiter.api.AfterEach;
@@ -119,7 +119,7 @@ class PostgreSQLIncrementalDumperTest {
         poolProps.put("password", password);
         DumperCommonContext commonContext = new DumperCommonContext(null,
                 new StandardPipelineDataSourceConfiguration(poolProps),
-                new ActualAndLogicTableNameMapper(Collections.singletonMap(new 
CaseInsensitiveIdentifier("t_order_0"), new 
CaseInsensitiveIdentifier("t_order"))),
+                new ActualAndLogicTableNameMapper(Collections.singletonMap(new 
ShardingSphereIdentifier("t_order_0"), new 
ShardingSphereIdentifier("t_order"))),
                 new TableAndSchemaNameMapper(Collections.emptyMap()));
         return new IncrementalDumperContext(commonContext, "0101123456", 
false);
     }
diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/incremental/wal/WALEventConverterTest.java
 
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/incremental/wal/WALEventConverterTest.java
index b4294fe387e..fb3006a92d2 100644
--- 
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/incremental/wal/WALEventConverterTest.java
+++ 
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/incremental/wal/WALEventConverterTest.java
@@ -19,8 +19,8 @@ package 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.incremental.wa
 
 import 
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperationType;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
+import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.DumperCommonContext;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperContext;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.mapper.ActualAndLogicTableNameMapper;
@@ -40,7 +40,7 @@ import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.incremental.wal
 import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.incremental.wal.event.UpdateRowEvent;
 import 
org.apache.shardingsphere.data.pipeline.postgresql.ingest.incremental.wal.event.WriteRowEvent;
 import 
org.apache.shardingsphere.infra.exception.generic.UnsupportedSQLOperationException;
-import 
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveIdentifier;
+import 
org.apache.shardingsphere.infra.metadata.identifier.ShardingSphereIdentifier;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -93,7 +93,7 @@ class WALEventConverterTest {
         poolProps.put("password", "root");
         DumperCommonContext commonContext = new DumperCommonContext(null,
                 new StandardPipelineDataSourceConfiguration(poolProps),
-                new ActualAndLogicTableNameMapper(Collections.singletonMap(new 
CaseInsensitiveIdentifier("t_order"), new 
CaseInsensitiveIdentifier("t_order"))),
+                new ActualAndLogicTableNameMapper(Collections.singletonMap(new 
ShardingSphereIdentifier("t_order"), new ShardingSphereIdentifier("t_order"))),
                 new TableAndSchemaNameMapper(Collections.emptyMap()));
         return new IncrementalDumperContext(commonContext, null, false);
     }
@@ -110,8 +110,8 @@ class WALEventConverterTest {
         }
     }
     
-    private Map<CaseInsensitiveIdentifier, PipelineColumnMetaData> 
mockOrderColumnsMetaDataMap() {
-        return 
mockOrderColumnsMetaDataList().stream().collect(Collectors.toMap(metaData -> 
new CaseInsensitiveIdentifier(metaData.getName()), Function.identity()));
+    private Map<ShardingSphereIdentifier, PipelineColumnMetaData> 
mockOrderColumnsMetaDataMap() {
+        return 
mockOrderColumnsMetaDataList().stream().collect(Collectors.toMap(metaData -> 
new ShardingSphereIdentifier(metaData.getName()), Function.identity()));
     }
     
     private List<PipelineColumnMetaData> mockOrderColumnsMetaDataList() {
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
index a55cfce2f15..b8722ff9ca3 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
@@ -132,8 +132,7 @@ public final class MigrationDataConsistencyChecker 
implements PipelineDataConsis
         PipelineTableMetaData tableMetaData = 
metaDataLoader.getTableMetaData(dataNode.getSchemaName(), 
dataNode.getTableName());
         ShardingSpherePreconditions.checkNotNull(tableMetaData, () -> new 
PipelineTableDataConsistencyCheckLoadingFailedException(dataNode.getSchemaName(),
 dataNode.getTableName()));
         List<String> columnNames = tableMetaData.getColumnNames();
-        List<PipelineColumnMetaData> uniqueKeys = 
PipelineTableMetaDataUtils.getUniqueKeyColumns(
-                sourceTable.getSchemaName().toString(), 
sourceTable.getTableName().toString(), metaDataLoader);
+        List<PipelineColumnMetaData> uniqueKeys = 
PipelineTableMetaDataUtils.getUniqueKeyColumns(sourceTable.getSchemaName().getValue(),
 sourceTable.getTableName().getValue(), metaDataLoader);
         TableInventoryCheckParameter param = new TableInventoryCheckParameter(
                 jobConfig.getJobId(), sourceDataSource, targetDataSource, 
sourceTable, targetTable, columnNames, uniqueKeys, readRateLimitAlgorithm, 
progressContext);
         TableInventoryChecker tableInventoryChecker = 
tableChecker.buildTableInventoryChecker(param);
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
index 291f7dea30d..0cd02fe9260 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
@@ -188,7 +188,7 @@ class CDCE2EIT {
     
     private void assertDataMatched(final PipelineDataSource sourceDataSource, 
final PipelineDataSource targetDataSource, final CaseInsensitiveQualifiedTable 
schemaTableName) {
         StandardPipelineTableMetaDataLoader metaDataLoader = new 
StandardPipelineTableMetaDataLoader(targetDataSource);
-        PipelineTableMetaData tableMetaData = 
metaDataLoader.getTableMetaData(schemaTableName.getSchemaName().toString(), 
schemaTableName.getTableName().toString());
+        PipelineTableMetaData tableMetaData = 
metaDataLoader.getTableMetaData(schemaTableName.getSchemaName().getValue(), 
schemaTableName.getTableName().getValue());
         List<PipelineColumnMetaData> uniqueKeys = 
Collections.singletonList(tableMetaData.getColumnMetaData(tableMetaData.getPrimaryKeyColumns().get(0)));
         ConsistencyCheckJobItemProgressContext progressContext = new 
ConsistencyCheckJobItemProgressContext("", 0, 
sourceDataSource.getDatabaseType().getType());
         TableInventoryCheckParameter param = new 
TableInventoryCheckParameter("", sourceDataSource, targetDataSource, 
schemaTableName, schemaTableName,

Reply via email to