This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 5b5e5b9d079 Extraction of code for reuse at CDC   (#24578)
5b5e5b9d079 is described below

commit 5b5e5b9d0790dab04a9b80c0b4f845d8045106b5
Author: Xinze Guo <[email protected]>
AuthorDate: Tue Mar 14 20:27:03 2023 +0800

    Extraction of code for reuse at CDC   (#24578)
    
    * Extraction of code for reuse at CDC
    
    * Adjust SocketSinkImporterConnector database parameter
    
    * Remove unused unit test
    
    * Improve parameter name
    
    * Rename
---
 .../connector/SocketSinkImporterConnector.java     |   9 +-
 .../data/pipeline/cdc/core/job/CDCJob.java         |   4 +
 .../data/pipeline/cdc/util/CDCSchemaTableUtil.java | 103 ++++++++++++++
 .../data/pipeline/cdc/util/CDCTableRuleUtil.java   |  43 ++++++
 .../importer/SocketSinkImporterCreatorTest.java    |   3 +-
 .../pipeline/cdc/util/CDCSchemaTableUtilTest.java  |  95 +++++++++++++
 .../cdc/util/DataRecordResultConvertUtilTest.java  |  74 ++++++++++
 .../backend/handler/cdc/CDCBackendHandler.java     |  74 ++--------
 .../backend/handler/cdc/CDCBackendHandlerTest.java | 151 ---------------------
 9 files changed, 333 insertions(+), 223 deletions(-)

diff --git 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/connector/SocketSinkImporterConnector.java
 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/connector/SocketSinkImporterConnector.java
index 58d250d09bf..5a27f961474 100644
--- 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/connector/SocketSinkImporterConnector.java
+++ 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/connector/SocketSinkImporterConnector.java
@@ -18,7 +18,6 @@
 package org.apache.shardingsphere.data.pipeline.cdc.core.connector;
 
 import io.netty.channel.Channel;
-import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import lombok.Setter;
 import lombok.SneakyThrows;
@@ -38,6 +37,7 @@ import 
org.apache.shardingsphere.data.pipeline.cdc.util.DataRecordResultConvertU
 import org.apache.shardingsphere.data.pipeline.core.record.RecordUtil;
 import org.apache.shardingsphere.data.pipeline.core.util.ThreadUtil;
 import 
org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
+import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
 
 import java.util.Collection;
 import java.util.Comparator;
@@ -69,8 +69,7 @@ public final class SocketSinkImporterConnector implements 
ImporterConnector {
     @Setter
     private volatile boolean incrementalTaskRunning = true;
     
-    @Getter
-    private final String database;
+    private final ShardingSphereDatabase database;
     
     private final Channel channel;
     
@@ -86,7 +85,7 @@ public final class SocketSinkImporterConnector implements 
ImporterConnector {
     
     private Thread incrementalImporterTask;
     
-    public SocketSinkImporterConnector(final Channel channel, final String 
database, final int jobShardingCount, final Collection<String> schemaTableNames,
+    public SocketSinkImporterConnector(final Channel channel, final 
ShardingSphereDatabase database, final int jobShardingCount, final 
Collection<String> schemaTableNames,
                                        final Comparator<DataRecord> 
dataRecordComparator) {
         this.channel = channel;
         this.database = database;
@@ -142,7 +141,7 @@ public final class SocketSinkImporterConnector implements 
ImporterConnector {
                 continue;
             }
             DataRecord dataRecord = (DataRecord) each;
-            
records.add(DataRecordResultConvertUtil.convertDataRecordToRecord(database, 
tableNameSchemaMap.get(dataRecord.getTableName()), dataRecord));
+            
records.add(DataRecordResultConvertUtil.convertDataRecordToRecord(database.getName(),
 tableNameSchemaMap.get(dataRecord.getTableName()), dataRecord));
         }
         String ackId = 
CDCAckHolder.getInstance().bindAckIdWithPosition(importerDataRecordMap);
         DataRecordResult dataRecordResult = 
DataRecordResult.newBuilder().addAllRecords(records).setAckId(ackId).build();
diff --git 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
index 4e10f0ca63d..9193c4e7c2e 100644
--- 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
+++ 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
@@ -34,6 +34,7 @@ import 
org.apache.shardingsphere.data.pipeline.cdc.yaml.job.YamlCDCJobConfigurat
 import 
org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalJobItemContext;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.AbstractSimplePipelineJob;
+import org.apache.shardingsphere.data.pipeline.core.util.CloseUtil;
 import 
org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
 import org.apache.shardingsphere.elasticjob.api.ShardingContext;
 
@@ -77,5 +78,8 @@ public final class CDCJob extends AbstractSimplePipelineJob {
     @Override
     protected void doClean() {
         dataSourceManager.close();
+        if (importerConnector instanceof AutoCloseable) {
+            CloseUtil.closeQuietly((AutoCloseable) importerConnector);
+        }
     }
 }
diff --git 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCSchemaTableUtil.java
 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCSchemaTableUtil.java
new file mode 100644
index 00000000000..978d8a11d9c
--- /dev/null
+++ 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCSchemaTableUtil.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.util;
+
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody.SchemaTable;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
+import 
org.apache.shardingsphere.infra.database.type.SchemaSupportedDatabaseType;
+import org.apache.shardingsphere.infra.exception.SchemaNotFoundException;
+import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import 
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereSchema;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * CDC schema table util.
+ */
+public final class CDCSchemaTableUtil {
+    
+    /**
+     * Parse table expression with schema.
+     *
+     * @param database database
+     * @param schemaTables schema tables
+     * @return map key is schema, value is table names
+     */
+    public static Map<String, Set<String>> 
parseTableExpressionWithSchema(final ShardingSphereDatabase database, final 
Collection<SchemaTable> schemaTables) {
+        Map<String, Set<String>> result = new HashMap<>();
+        Collection<String> systemSchemas = 
database.getProtocolType().getSystemSchemas();
+        Optional<SchemaTable> allSchemaTablesOptional = 
schemaTables.stream().filter(each -> "*".equals(each.getTable()) && 
("*".equals(each.getSchema()) || each.getSchema().isEmpty())).findFirst();
+        if (allSchemaTablesOptional.isPresent()) {
+            for (Entry<String, ShardingSphereSchema> entry : 
database.getSchemas().entrySet()) {
+                if (systemSchemas.contains(entry.getKey())) {
+                    continue;
+                }
+                entry.getValue().getAllTableNames().forEach(tableName -> 
result.computeIfAbsent(entry.getKey(), ignored -> new 
HashSet<>()).add(tableName));
+            }
+            return result;
+        }
+        for (SchemaTable each : schemaTables) {
+            if ("*".equals(each.getSchema())) {
+                for (Entry<String, ShardingSphereSchema> entry : 
database.getSchemas().entrySet()) {
+                    if (systemSchemas.contains(entry.getKey())) {
+                        continue;
+                    }
+                    
entry.getValue().getAllTableNames().stream().filter(tableName -> 
tableName.equals(each.getTable())).findFirst()
+                            .ifPresent(tableName -> 
result.computeIfAbsent(entry.getKey(), ignored -> new 
HashSet<>()).add(tableName));
+                }
+            } else if ("*".equals(each.getTable())) {
+                String schemaName = each.getSchema().isEmpty() ? 
getDefaultSchema(database.getProtocolType()) : each.getSchema();
+                ShardingSphereSchema schema = database.getSchema(schemaName);
+                if (null == schema) {
+                    throw new SchemaNotFoundException(each.getSchema());
+                }
+                schema.getAllTableNames().forEach(tableName -> 
result.computeIfAbsent(schemaName, ignored -> new HashSet<>()).add(tableName));
+            } else {
+                result.computeIfAbsent(each.getSchema(), ignored -> new 
HashSet<>()).add(each.getTable());
+            }
+        }
+        return result;
+    }
+    
+    private static String getDefaultSchema(final DatabaseType databaseType) {
+        if (!(databaseType instanceof SchemaSupportedDatabaseType)) {
+            return null;
+        }
+        return ((SchemaSupportedDatabaseType) databaseType).getDefaultSchema();
+    }
+    
+    /**
+     * Parse table expression without schema.
+     *
+     * @param database database
+     * @param tableNames table names
+     * @return parsed table names
+     */
+    public static Collection<String> parseTableExpressionWithoutSchema(final 
ShardingSphereDatabase database, final List<String> tableNames) {
+        Optional<String> allTablesOptional = 
tableNames.stream().filter("*"::equals).findFirst();
+        Set<String> allTableNames = new 
HashSet<>(database.getSchema(database.getName()).getAllTableNames());
+        return allTablesOptional.isPresent() ? allTableNames : new 
HashSet<>(tableNames);
+    }
+}
diff --git 
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCTableRuleUtil.java
 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCTableRuleUtil.java
new file mode 100644
index 00000000000..bf0908b7935
--- /dev/null
+++ 
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCTableRuleUtil.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.util;
+
+import org.apache.shardingsphere.infra.datanode.DataNode;
+import org.apache.shardingsphere.sharding.rule.ShardingRule;
+import org.apache.shardingsphere.sharding.rule.TableRule;
+
+import java.util.List;
+
+/**
+ * CDC table rule util.
+ */
+public final class CDCTableRuleUtil {
+    
+    /**
+     * Get actual data nodes.
+     *
+     * @param shardingRule sharding rule
+     * @param tableName table name
+     * @return data nodes
+     */
+    public static List<DataNode> getActualDataNodes(final ShardingRule 
shardingRule, final String tableName) {
+        TableRule tableRule = shardingRule.getTableRule(tableName);
+        // TODO support virtual data source name
+        return tableRule.getActualDataNodes();
+    }
+}
diff --git 
a/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/SocketSinkImporterCreatorTest.java
 
b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/SocketSinkImporterCreatorTest.java
index b04cda6390f..9047fc2edcd 100644
--- 
a/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/SocketSinkImporterCreatorTest.java
+++ 
b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/SocketSinkImporterCreatorTest.java
@@ -21,6 +21,7 @@ import io.netty.channel.Channel;
 import 
org.apache.shardingsphere.data.pipeline.api.config.ImporterConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.cdc.core.connector.SocketSinkImporterConnector;
 import org.apache.shardingsphere.data.pipeline.spi.importer.ImporterCreator;
+import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
 import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
@@ -41,7 +42,7 @@ public final class SocketSinkImporterCreatorTest {
     
     @Test
     public void assertCreateCDCImporter() {
-        SocketSinkImporterConnector importerConnector = new 
SocketSinkImporterConnector(mock(Channel.class), "foo_db", 1, 
Collections.emptyList(), null);
+        SocketSinkImporterConnector importerConnector = new 
SocketSinkImporterConnector(mock(Channel.class), 
mock(ShardingSphereDatabase.class), 1, Collections.emptyList(), null);
         assertThat(TypedSPILoader.getService(ImporterCreator.class, 
"Socket").createImporter(importerConfig, importerConnector, null, null, null), 
instanceOf(SocketSinkImporter.class));
     }
 }
diff --git 
a/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCSchemaTableUtilTest.java
 
b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCSchemaTableUtilTest.java
new file mode 100644
index 00000000000..8d560b14649
--- /dev/null
+++ 
b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCSchemaTableUtilTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.util;
+
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody.SchemaTable;
+import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
+import 
org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseType;
+import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import 
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereSchema;
+import 
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereTable;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.Mockito.mock;
+
+public final class CDCSchemaTableUtilTest {
+    
+    @Test
+    public void assertParseTableExpression() {
+        Map<String, ShardingSphereSchema> schemas = new HashMap<>();
+        schemas.put("public", mockedPublicSchema());
+        schemas.put("test", mockedTestSchema());
+        ShardingSphereDatabase database = new 
ShardingSphereDatabase("sharding_db", new OpenGaussDatabaseType(), null, null, 
schemas);
+        List<SchemaTable> schemaTables = 
Arrays.asList(SchemaTable.newBuilder().setSchema("public").setTable("t_order").build(),
+                
SchemaTable.newBuilder().setSchema("test").setTable("*").build());
+        Map<String, Set<String>> expected = new HashMap<>();
+        expected.put("test", new HashSet<>(Arrays.asList("t_order_item", 
"t_order_item2")));
+        expected.put("public", Collections.singleton("t_order"));
+        Map<String, Set<String>> actual = 
CDCSchemaTableUtil.parseTableExpressionWithSchema(database, schemaTables);
+        assertThat(actual, is(expected));
+        schemaTables = 
Collections.singletonList(SchemaTable.newBuilder().setTable("t_order").build());
+        actual = CDCSchemaTableUtil.parseTableExpressionWithSchema(database, 
schemaTables);
+        expected = Collections.singletonMap("", 
Collections.singleton("t_order"));
+        assertThat(actual, is(expected));
+        schemaTables = 
Collections.singletonList(SchemaTable.newBuilder().setSchema("*").setTable("t_order").build());
+        actual = CDCSchemaTableUtil.parseTableExpressionWithSchema(database, 
schemaTables);
+        expected = new HashMap<>();
+        expected.put("public", Collections.singleton("t_order"));
+        assertThat(actual, is(expected));
+    }
+    
+    private ShardingSphereSchema mockedPublicSchema() {
+        Map<String, ShardingSphereTable> tables = new HashMap<>();
+        tables.put("t_order", mock(ShardingSphereTable.class));
+        tables.put("t_order2", mock(ShardingSphereTable.class));
+        return new ShardingSphereSchema(tables, Collections.emptyMap());
+    }
+    
+    private ShardingSphereSchema mockedTestSchema() {
+        Map<String, ShardingSphereTable> tables = new HashMap<>();
+        tables.put("t_order_item", mock(ShardingSphereTable.class));
+        tables.put("t_order_item2", mock(ShardingSphereTable.class));
+        return new ShardingSphereSchema(tables, Collections.emptyMap());
+    }
+    
+    @Test
+    public void assertParseTableExpressionWithoutSchema() {
+        Map<String, ShardingSphereSchema> schemas = new HashMap<>();
+        schemas.put("sharding_db", mockedPublicSchema());
+        ShardingSphereDatabase database = new 
ShardingSphereDatabase("sharding_db", new MySQLDatabaseType(), null, null, 
schemas);
+        List<String> schemaTables = Collections.singletonList("*");
+        Collection<String> actualWildcardTable = 
CDCSchemaTableUtil.parseTableExpressionWithoutSchema(database, schemaTables);
+        Set<String> expectedWildcardTable = new 
HashSet<>(Arrays.asList("t_order", "t_order2"));
+        assertThat(actualWildcardTable, is(expectedWildcardTable));
+        schemaTables = Collections.singletonList("t_order");
+        Collection<String> actualSingleTable = 
CDCSchemaTableUtil.parseTableExpressionWithoutSchema(database, schemaTables);
+        Set<String> expectedSingleTable = new 
HashSet<>(Collections.singletonList("t_order"));
+        assertThat(actualSingleTable, is(expectedSingleTable));
+    }
+}
diff --git 
a/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/DataRecordResultConvertUtilTest.java
 
b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/DataRecordResultConvertUtilTest.java
new file mode 100644
index 00000000000..6126551315a
--- /dev/null
+++ 
b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/DataRecordResultConvertUtilTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.util;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.TimestampProto;
+import com.google.protobuf.TypeRegistry;
+import com.google.protobuf.WrappersProto;
+import com.google.protobuf.util.JsonFormat;
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.IntegerPrimaryKeyPosition;
+import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
+import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponseProtocol;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record.Builder;
+import org.junit.Test;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.sql.Blob;
+import java.sql.Clob;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.time.LocalTime;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public final class DataRecordResultConvertUtilTest {
+    
+    @Test
+    public void assertConvertDataRecordToRecord() throws 
InvalidProtocolBufferException, SQLException {
+        DataRecord dataRecord = new DataRecord(new 
IntegerPrimaryKeyPosition(0, 1), 2);
+        dataRecord.addColumn(new Column("order_id", BigInteger.ONE, false, 
true));
+        dataRecord.addColumn(new Column("price", BigDecimal.valueOf(123), 
false, false));
+        dataRecord.addColumn(new Column("user_id", Long.MAX_VALUE, false, 
false));
+        dataRecord.addColumn(new Column("item_id", Integer.MAX_VALUE, false, 
false));
+        dataRecord.addColumn(new Column("create_time", LocalTime.now(), false, 
false));
+        Blob mockedBlob = mock(Blob.class);
+        when(mockedBlob.getBytes(anyLong(), anyInt())).thenReturn(new 
byte[]{-1, 0, 1});
+        dataRecord.addColumn(new Column("data_blob", mockedBlob, false, 
false));
+        Clob mockedClob = mock(Clob.class);
+        when(mockedClob.getSubString(anyLong(), 
anyInt())).thenReturn("clob\n");
+        dataRecord.addColumn(new Column("text_clob", mockedClob, false, 
false));
+        dataRecord.addColumn(new Column("update_time", new 
Timestamp(System.currentTimeMillis()), false, false));
+        dataRecord.setTableName("t_order");
+        dataRecord.setType("INSERT");
+        TypeRegistry registry = 
TypeRegistry.newBuilder().add(CDCResponseProtocol.getDescriptor().getFile().getMessageTypes()).add(WrappersProto.getDescriptor().getMessageTypes())
+                .add(TimestampProto.getDescriptor().getMessageTypes()).build();
+        Record expectedRecord = 
DataRecordResultConvertUtil.convertDataRecordToRecord("test", null, dataRecord);
+        String print = 
JsonFormat.printer().usingTypeRegistry(registry).print(expectedRecord);
+        Builder actualRecord = Record.newBuilder();
+        JsonFormat.parser().usingTypeRegistry(registry).merge(print, 
actualRecord);
+        assertEquals(actualRecord.build(), expectedRecord);
+    }
+}
diff --git 
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandler.java
 
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandler.java
index b5b6017681f..295ab70e631 100644
--- 
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandler.java
+++ 
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandler.java
@@ -38,20 +38,17 @@ import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRe
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody.SchemaTable;
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse;
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.StreamDataResult;
+import org.apache.shardingsphere.data.pipeline.cdc.util.CDCSchemaTableUtil;
+import org.apache.shardingsphere.data.pipeline.cdc.util.CDCTableRuleUtil;
 import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
 import 
org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
-import org.apache.shardingsphere.infra.database.type.DatabaseType;
-import 
org.apache.shardingsphere.infra.database.type.SchemaSupportedDatabaseType;
 import 
org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseType;
 import org.apache.shardingsphere.infra.datanode.DataNode;
-import org.apache.shardingsphere.infra.exception.SchemaNotFoundException;
 import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
-import 
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereSchema;
 import org.apache.shardingsphere.sharding.rule.ShardingRule;
-import org.apache.shardingsphere.sharding.rule.TableRule;
 
 import java.sql.SQLException;
 import java.util.Collection;
@@ -61,7 +58,6 @@ import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
@@ -89,16 +85,17 @@ public final class CDCBackendHandler {
         if (null == database) {
             return CDCResponseGenerator.failed(requestId, 
CDCResponseErrorCode.SERVER_ERROR, String.format("%s database is not exists", 
requestBody.getDatabase()));
         }
-        Map<String, Collection<String>> schemaTableNameMap;
+        Map<String, Set<String>> schemaTableNameMap;
         Collection<String> tableNames;
         Set<String> schemaTableNames = new HashSet<>();
         if (database.getProtocolType().isSchemaAvailable()) {
-            schemaTableNameMap = getSchemaTableMapWithSchema(database, 
requestBody.getSourceSchemaTablesList());
+            schemaTableNameMap = 
CDCSchemaTableUtil.parseTableExpressionWithSchema(database, 
requestBody.getSourceSchemaTablesList());
             // TODO if different schema have same table names, table name may 
be overwritten, because the table name at sharding rule not contain schema.
             tableNames = 
schemaTableNameMap.values().stream().flatMap(Collection::stream).collect(Collectors.toList());
             schemaTableNameMap.forEach((k, v) -> v.forEach(tableName -> 
schemaTableNames.add(k.isEmpty() ? tableName : String.join(".", k, 
tableName))));
         } else {
-            schemaTableNames.addAll(getTableNamesWithoutSchema(database, 
requestBody.getSourceSchemaTablesList()));
+            
schemaTableNames.addAll(CDCSchemaTableUtil.parseTableExpressionWithoutSchema(database,
 requestBody.getSourceSchemaTablesList().stream().map(SchemaTable::getTable)
+                    .collect(Collectors.toList())));
             tableNames = schemaTableNames;
         }
         if (tableNames.isEmpty()) {
@@ -111,7 +108,7 @@ public final class CDCBackendHandler {
         Map<String, List<DataNode>> actualDataNodesMap = new HashMap<>();
         // TODO need support case-insensitive later
         for (String each : tableNames) {
-            actualDataNodesMap.put(each, 
getActualDataNodes(shardingRule.get(), each));
+            actualDataNodesMap.put(each, 
CDCTableRuleUtil.getActualDataNodes(shardingRule.get(), each));
         }
         boolean decodeWithTx = database.getProtocolType() instanceof 
OpenGaussDatabaseType;
         StreamDataParameter parameter = new 
StreamDataParameter(requestBody.getDatabase(), new 
LinkedList<>(schemaTableNames), requestBody.getFull(), actualDataNodesMap, 
decodeWithTx);
@@ -121,61 +118,6 @@ public final class CDCBackendHandler {
         return 
CDCResponseGenerator.succeedBuilder(requestId).setStreamDataResult(StreamDataResult.newBuilder().setStreamingId(jobId).build()).build();
     }
     
-    private Map<String, Collection<String>> getSchemaTableMapWithSchema(final 
ShardingSphereDatabase database, final List<SchemaTable> schemaTables) {
-        Map<String, Collection<String>> result = new HashMap<>();
-        Collection<String> systemSchemas = 
database.getProtocolType().getSystemSchemas();
-        Optional<SchemaTable> allSchemaTablesOptional = 
schemaTables.stream().filter(each -> "*".equals(each.getTable()) && 
"*".equals(each.getSchema())).findFirst();
-        if (allSchemaTablesOptional.isPresent()) {
-            for (Entry<String, ShardingSphereSchema> entry : 
database.getSchemas().entrySet()) {
-                if (systemSchemas.contains(entry.getKey())) {
-                    continue;
-                }
-                entry.getValue().getAllTableNames().forEach(tableName -> 
result.computeIfAbsent(entry.getKey(), ignored -> new 
HashSet<>()).add(tableName));
-            }
-            return result;
-        }
-        for (SchemaTable each : schemaTables) {
-            if ("*".equals(each.getSchema())) {
-                for (Entry<String, ShardingSphereSchema> entry : 
database.getSchemas().entrySet()) {
-                    if (systemSchemas.contains(entry.getKey())) {
-                        continue;
-                    }
-                    
entry.getValue().getAllTableNames().stream().filter(tableName -> 
tableName.equals(each.getTable())).findFirst()
-                            .ifPresent(tableName -> 
result.computeIfAbsent(entry.getKey(), ignored -> new 
HashSet<>()).add(tableName));
-                }
-            } else if ("*".equals(each.getTable())) {
-                String schemaName = each.getSchema().isEmpty() ? 
getDefaultSchema(database.getProtocolType()) : each.getSchema();
-                ShardingSphereSchema schema = database.getSchema(schemaName);
-                if (null == schema) {
-                    throw new SchemaNotFoundException(each.getSchema());
-                }
-                schema.getAllTableNames().forEach(tableName -> 
result.computeIfAbsent(schemaName, ignored -> new HashSet<>()).add(tableName));
-            } else {
-                result.computeIfAbsent(each.getSchema(), ignored -> new 
HashSet<>()).add(each.getTable());
-            }
-        }
-        return result;
-    }
-    
-    private String getDefaultSchema(final DatabaseType databaseType) {
-        if (!(databaseType instanceof SchemaSupportedDatabaseType)) {
-            return null;
-        }
-        return ((SchemaSupportedDatabaseType) databaseType).getDefaultSchema();
-    }
-    
-    private Collection<String> getTableNamesWithoutSchema(final 
ShardingSphereDatabase database, final List<SchemaTable> schemaTables) {
-        Optional<SchemaTable> allTablesOptional = 
schemaTables.stream().filter(each -> each.getTable().equals("*")).findFirst();
-        Set<String> allTableNames = new 
HashSet<>(database.getSchema(database.getName()).getAllTableNames());
-        return allTablesOptional.isPresent() ? allTableNames : 
schemaTables.stream().map(SchemaTable::getTable).collect(Collectors.toSet());
-    }
-    
-    private List<DataNode> getActualDataNodes(final ShardingRule shardingRule, 
final String logicTableName) {
-        TableRule tableRule = shardingRule.getTableRule(logicTableName);
-        // TODO support virtual data source name
-        return tableRule.getActualDataNodes();
-    }
-    
     /**
      * Get database name by job id.
      *
@@ -209,7 +151,7 @@ public final class CDCBackendHandler {
         Comparator<DataRecord> dataRecordComparator = 
cdcJobConfig.isDecodeWithTX()
                 ? 
DataRecordComparatorGenerator.generatorIncrementalComparator(database.getProtocolType())
                 : null;
-        CDCJob job = new CDCJob(new SocketSinkImporterConnector(channel, 
cdcJobConfig.getDatabaseName(), cdcJobConfig.getJobShardingCount(), 
cdcJobConfig.getSchemaTableNames(), dataRecordComparator));
+        CDCJob job = new CDCJob(new SocketSinkImporterConnector(channel, 
database, cdcJobConfig.getJobShardingCount(), 
cdcJobConfig.getSchemaTableNames(), dataRecordComparator));
         PipelineJobCenter.addJob(jobConfigPOJO.getJobName(), job);
         OneOffJobBootstrap oneOffJobBootstrap = new 
OneOffJobBootstrap(PipelineAPIFactory.getRegistryCenter(), job, 
jobConfigPOJO.toJobConfiguration());
         job.setJobBootstrap(oneOffJobBootstrap);
diff --git 
a/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandlerTest.java
 
b/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandlerTest.java
deleted file mode 100644
index 1d96e9c4873..00000000000
--- 
a/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandlerTest.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.proxy.backend.handler.cdc;
-
-import io.netty.channel.Channel;
-import 
org.apache.shardingsphere.data.pipeline.cdc.context.CDCConnectionContext;
-import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest;
-import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest.Type;
-import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody;
-import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody.SchemaTable;
-import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse;
-import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse.Status;
-import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
-import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
-import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
-import 
org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseType;
-import org.apache.shardingsphere.infra.instance.InstanceContext;
-import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
-import 
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
-import 
org.apache.shardingsphere.infra.metadata.database.rule.ShardingSphereRuleMetaData;
-import 
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereSchema;
-import 
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereTable;
-import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
-import org.apache.shardingsphere.infra.util.reflection.ReflectionUtil;
-import org.apache.shardingsphere.mode.manager.ContextManager;
-import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
-import org.apache.shardingsphere.mode.metadata.persist.MetaDataPersistService;
-import org.apache.shardingsphere.sharding.rule.ShardingRule;
-import org.apache.shardingsphere.test.mock.AutoMockExtension;
-import org.apache.shardingsphere.test.mock.StaticMockSettings;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-@ExtendWith(AutoMockExtension.class)
-@StaticMockSettings(PipelineContext.class)
-public final class CDCBackendHandlerTest {
-    
-    private final CDCBackendHandler handler = new CDCBackendHandler();
-    
-    @BeforeEach
-    public void setUp() {
-        MetaDataContexts metaDataContexts = new 
MetaDataContexts(mock(MetaDataPersistService.class),
-                new ShardingSphereMetaData(getDatabases(), 
mock(ShardingSphereRuleMetaData.class), new ConfigurationProperties(new 
Properties())));
-        ContextManager contextManager = new ContextManager(metaDataContexts, 
mock(InstanceContext.class));
-        when(PipelineContext.getContextManager()).thenReturn(contextManager);
-    }
-    
-    private static Map<String, ShardingSphereDatabase> getDatabases() {
-        ShardingSphereDatabase database = mock(ShardingSphereDatabase.class, 
RETURNS_DEEP_STUBS);
-        when(database.getProtocolType()).thenReturn(new MySQLDatabaseType());
-        Set<ShardingSphereRule> shardingRule = 
Collections.singleton(mock(ShardingRule.class));
-        when(database.getRuleMetaData().getRules()).thenReturn(shardingRule);
-        return Collections.singletonMap("foo_db", database);
-    }
-    
-    @Test
-    public void assertStreamDataRequestFailed() {
-        CDCRequest request = 
CDCRequest.newBuilder().setRequestId("1").setType(Type.STREAM_DATA).setStreamDataRequestBody(StreamDataRequestBody.newBuilder().setDatabase("none")).build();
-        CDCResponse actualResponse = 
handler.streamData(request.getRequestId(), request.getStreamDataRequestBody(), 
mock(CDCConnectionContext.class), mock(Channel.class));
-        assertThat(actualResponse.getStatus(), is(Status.FAILED));
-    }
-    
-    @Test
-    public void assertGetSchemaTableMapWithSchema() throws 
NoSuchMethodException {
-        Map<String, ShardingSphereSchema> schemas = new HashMap<>();
-        schemas.put("test", mockSchema());
-        schemas.put("public", mockSchema());
-        ShardingSphereDatabase database = new ShardingSphereDatabase("foo_db", 
new OpenGaussDatabaseType(), null, null, schemas);
-        List<SchemaTable> schemaTables = 
Arrays.asList(SchemaTable.newBuilder().setSchema("public").setTable("t_order").build(),
-                
SchemaTable.newBuilder().setSchema("test").setTable("*").build());
-        Map<String, Collection<String>> expected = new HashMap<>();
-        expected.put("test", new HashSet<>(Arrays.asList("t_order", 
"t_order_item")));
-        expected.put("public", new 
HashSet<>(Collections.singletonList("t_order")));
-        Map<String, String> actual = 
getSchemaTableMapWithSchemaResult(database, schemaTables);
-        assertThat(actual, is(expected));
-        schemaTables = 
Collections.singletonList(SchemaTable.newBuilder().setTable("t_order").build());
-        actual = getSchemaTableMapWithSchemaResult(database, schemaTables);
-        expected = Collections.singletonMap("", 
Collections.singleton("t_order"));
-        assertThat(actual, is(expected));
-        schemaTables = 
Collections.singletonList(SchemaTable.newBuilder().setSchema("*").setTable("t_order").build());
-        actual = getSchemaTableMapWithSchemaResult(database, schemaTables);
-        expected = new HashMap<>();
-        expected.put("test", new 
HashSet<>(Collections.singletonList("t_order")));
-        expected.put("public", new 
HashSet<>(Collections.singletonList("t_order")));
-        assertThat(actual, is(expected));
-    }
-    
-    private ShardingSphereSchema mockSchema() {
-        Map<String, ShardingSphereTable> tables = new HashMap<>();
-        tables.put("t_order", mock(ShardingSphereTable.class));
-        tables.put("t_order_item", mock(ShardingSphereTable.class));
-        return new ShardingSphereSchema(tables, Collections.emptyMap());
-    }
-    
-    private Map<String, String> getSchemaTableMapWithSchemaResult(final 
ShardingSphereDatabase database, final List<SchemaTable> schemaTables) throws 
NoSuchMethodException {
-        return 
ReflectionUtil.invokeMethod(handler.getClass().getDeclaredMethod("getSchemaTableMapWithSchema",
 ShardingSphereDatabase.class, List.class),
-                handler, database, schemaTables);
-    }
-    
-    @Test
-    public void assertGetSchemaTableMapWithoutSchema() throws 
NoSuchMethodException {
-        Map<String, ShardingSphereSchema> schemas = new HashMap<>();
-        schemas.put("foo_db", mockSchema());
-        ShardingSphereDatabase database = new ShardingSphereDatabase("foo_db", 
new MySQLDatabaseType(), null, null, schemas);
-        List<SchemaTable> schemaTables = 
Collections.singletonList(SchemaTable.newBuilder().setTable("*").build());
-        Collection<String> actualWildcardTable = 
getSchemaTableMapWithoutSchemaResult(database, schemaTables);
-        Set<String> expectedWildcardTable = new 
HashSet<>(Arrays.asList("t_order", "t_order_item"));
-        assertThat(actualWildcardTable, is(expectedWildcardTable));
-        schemaTables = 
Collections.singletonList(SchemaTable.newBuilder().setTable("t_order").build());
-        Collection<String> actualSingleTable = 
getSchemaTableMapWithoutSchemaResult(database, schemaTables);
-        Set<String> expectedSingleTable = new 
HashSet<>(Collections.singletonList("t_order"));
-        assertThat(actualSingleTable, is(expectedSingleTable));
-    }
-    
-    private Collection<String> getSchemaTableMapWithoutSchemaResult(final 
ShardingSphereDatabase database, final List<SchemaTable> schemaTables) throws 
NoSuchMethodException {
-        return 
ReflectionUtil.invokeMethod(handler.getClass().getDeclaredMethod("getTableNamesWithoutSchema",
 ShardingSphereDatabase.class, List.class),
-                handler, database, schemaTables);
-    }
-}


Reply via email to