This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 5b5e5b9d079 Extraction of code for reuse at CDC (#24578)
5b5e5b9d079 is described below
commit 5b5e5b9d0790dab04a9b80c0b4f845d8045106b5
Author: Xinze Guo <[email protected]>
AuthorDate: Tue Mar 14 20:27:03 2023 +0800
Extraction of code for reuse at CDC (#24578)
* Extraction of code for reuse at CDC
* Adjust SocketSinkImporterConnector database parameter
* Remove unused unit test
* Improve parameter name
* Rename
---
.../connector/SocketSinkImporterConnector.java | 9 +-
.../data/pipeline/cdc/core/job/CDCJob.java | 4 +
.../data/pipeline/cdc/util/CDCSchemaTableUtil.java | 103 ++++++++++++++
.../data/pipeline/cdc/util/CDCTableRuleUtil.java | 43 ++++++
.../importer/SocketSinkImporterCreatorTest.java | 3 +-
.../pipeline/cdc/util/CDCSchemaTableUtilTest.java | 95 +++++++++++++
.../cdc/util/DataRecordResultConvertUtilTest.java | 74 ++++++++++
.../backend/handler/cdc/CDCBackendHandler.java | 74 ++--------
.../backend/handler/cdc/CDCBackendHandlerTest.java | 151 ---------------------
9 files changed, 333 insertions(+), 223 deletions(-)
diff --git
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/connector/SocketSinkImporterConnector.java
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/connector/SocketSinkImporterConnector.java
index 58d250d09bf..5a27f961474 100644
---
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/connector/SocketSinkImporterConnector.java
+++
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/connector/SocketSinkImporterConnector.java
@@ -18,7 +18,6 @@
package org.apache.shardingsphere.data.pipeline.cdc.core.connector;
import io.netty.channel.Channel;
-import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.SneakyThrows;
@@ -38,6 +37,7 @@ import
org.apache.shardingsphere.data.pipeline.cdc.util.DataRecordResultConvertU
import org.apache.shardingsphere.data.pipeline.core.record.RecordUtil;
import org.apache.shardingsphere.data.pipeline.core.util.ThreadUtil;
import
org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
+import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import java.util.Collection;
import java.util.Comparator;
@@ -69,8 +69,7 @@ public final class SocketSinkImporterConnector implements
ImporterConnector {
@Setter
private volatile boolean incrementalTaskRunning = true;
- @Getter
- private final String database;
+ private final ShardingSphereDatabase database;
private final Channel channel;
@@ -86,7 +85,7 @@ public final class SocketSinkImporterConnector implements
ImporterConnector {
private Thread incrementalImporterTask;
- public SocketSinkImporterConnector(final Channel channel, final String
database, final int jobShardingCount, final Collection<String> schemaTableNames,
+ public SocketSinkImporterConnector(final Channel channel, final
ShardingSphereDatabase database, final int jobShardingCount, final
Collection<String> schemaTableNames,
final Comparator<DataRecord>
dataRecordComparator) {
this.channel = channel;
this.database = database;
@@ -142,7 +141,7 @@ public final class SocketSinkImporterConnector implements
ImporterConnector {
continue;
}
DataRecord dataRecord = (DataRecord) each;
-
records.add(DataRecordResultConvertUtil.convertDataRecordToRecord(database,
tableNameSchemaMap.get(dataRecord.getTableName()), dataRecord));
+
records.add(DataRecordResultConvertUtil.convertDataRecordToRecord(database.getName(),
tableNameSchemaMap.get(dataRecord.getTableName()), dataRecord));
}
String ackId =
CDCAckHolder.getInstance().bindAckIdWithPosition(importerDataRecordMap);
DataRecordResult dataRecordResult =
DataRecordResult.newBuilder().addAllRecords(records).setAckId(ackId).build();
diff --git
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
index 4e10f0ca63d..9193c4e7c2e 100644
---
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
+++
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
@@ -34,6 +34,7 @@ import
org.apache.shardingsphere.data.pipeline.cdc.yaml.job.YamlCDCJobConfigurat
import
org.apache.shardingsphere.data.pipeline.core.context.InventoryIncrementalJobItemContext;
import
org.apache.shardingsphere.data.pipeline.core.datasource.DefaultPipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.core.job.AbstractSimplePipelineJob;
+import org.apache.shardingsphere.data.pipeline.core.util.CloseUtil;
import
org.apache.shardingsphere.data.pipeline.spi.importer.connector.ImporterConnector;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
@@ -77,5 +78,8 @@ public final class CDCJob extends AbstractSimplePipelineJob {
@Override
protected void doClean() {
dataSourceManager.close();
+ if (importerConnector instanceof AutoCloseable) {
+ CloseUtil.closeQuietly((AutoCloseable) importerConnector);
+ }
}
}
diff --git
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCSchemaTableUtil.java
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCSchemaTableUtil.java
new file mode 100644
index 00000000000..978d8a11d9c
--- /dev/null
+++
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCSchemaTableUtil.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.util;
+
+import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody.SchemaTable;
+import org.apache.shardingsphere.infra.database.type.DatabaseType;
+import
org.apache.shardingsphere.infra.database.type.SchemaSupportedDatabaseType;
+import org.apache.shardingsphere.infra.exception.SchemaNotFoundException;
+import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereSchema;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * CDC schema table util.
+ */
+public final class CDCSchemaTableUtil {
+
+ /**
+ * Parse table expression with schema.
+ *
+ * @param database database
+ * @param schemaTables schema tables
+ * @return map key is schema, value is table names
+ */
+ public static Map<String, Set<String>>
parseTableExpressionWithSchema(final ShardingSphereDatabase database, final
Collection<SchemaTable> schemaTables) {
+ Map<String, Set<String>> result = new HashMap<>();
+ Collection<String> systemSchemas =
database.getProtocolType().getSystemSchemas();
+ Optional<SchemaTable> allSchemaTablesOptional =
schemaTables.stream().filter(each -> "*".equals(each.getTable()) &&
("*".equals(each.getSchema()) || each.getSchema().isEmpty())).findFirst();
+ if (allSchemaTablesOptional.isPresent()) {
+ for (Entry<String, ShardingSphereSchema> entry :
database.getSchemas().entrySet()) {
+ if (systemSchemas.contains(entry.getKey())) {
+ continue;
+ }
+ entry.getValue().getAllTableNames().forEach(tableName ->
result.computeIfAbsent(entry.getKey(), ignored -> new
HashSet<>()).add(tableName));
+ }
+ return result;
+ }
+ for (SchemaTable each : schemaTables) {
+ if ("*".equals(each.getSchema())) {
+ for (Entry<String, ShardingSphereSchema> entry :
database.getSchemas().entrySet()) {
+ if (systemSchemas.contains(entry.getKey())) {
+ continue;
+ }
+
entry.getValue().getAllTableNames().stream().filter(tableName ->
tableName.equals(each.getTable())).findFirst()
+ .ifPresent(tableName ->
result.computeIfAbsent(entry.getKey(), ignored -> new
HashSet<>()).add(tableName));
+ }
+ } else if ("*".equals(each.getTable())) {
+ String schemaName = each.getSchema().isEmpty() ?
getDefaultSchema(database.getProtocolType()) : each.getSchema();
+ ShardingSphereSchema schema = database.getSchema(schemaName);
+ if (null == schema) {
+ throw new SchemaNotFoundException(each.getSchema());
+ }
+ schema.getAllTableNames().forEach(tableName ->
result.computeIfAbsent(schemaName, ignored -> new HashSet<>()).add(tableName));
+ } else {
+ result.computeIfAbsent(each.getSchema(), ignored -> new
HashSet<>()).add(each.getTable());
+ }
+ }
+ return result;
+ }
+
+ private static String getDefaultSchema(final DatabaseType databaseType) {
+ if (!(databaseType instanceof SchemaSupportedDatabaseType)) {
+ return null;
+ }
+ return ((SchemaSupportedDatabaseType) databaseType).getDefaultSchema();
+ }
+
+ /**
+ * Parse table expression without schema.
+ *
+ * @param database database
+ * @param tableNames table names
+ * @return parsed table names
+ */
+ public static Collection<String> parseTableExpressionWithoutSchema(final
ShardingSphereDatabase database, final List<String> tableNames) {
+ Optional<String> allTablesOptional =
tableNames.stream().filter("*"::equals).findFirst();
+ Set<String> allTableNames = new
HashSet<>(database.getSchema(database.getName()).getAllTableNames());
+ return allTablesOptional.isPresent() ? allTableNames : new
HashSet<>(tableNames);
+ }
+}
diff --git
a/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCTableRuleUtil.java
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCTableRuleUtil.java
new file mode 100644
index 00000000000..bf0908b7935
--- /dev/null
+++
b/kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCTableRuleUtil.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.util;
+
+import org.apache.shardingsphere.infra.datanode.DataNode;
+import org.apache.shardingsphere.sharding.rule.ShardingRule;
+import org.apache.shardingsphere.sharding.rule.TableRule;
+
+import java.util.List;
+
+/**
+ * CDC table rule util.
+ */
+public final class CDCTableRuleUtil {
+
+ /**
+ * Get actual data nodes.
+ *
+ * @param shardingRule sharding rule
+ * @param tableName table name
+ * @return data nodes
+ */
+ public static List<DataNode> getActualDataNodes(final ShardingRule
shardingRule, final String tableName) {
+ TableRule tableRule = shardingRule.getTableRule(tableName);
+ // TODO support virtual data source name
+ return tableRule.getActualDataNodes();
+ }
+}
diff --git
a/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/SocketSinkImporterCreatorTest.java
b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/SocketSinkImporterCreatorTest.java
index b04cda6390f..9047fc2edcd 100644
---
a/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/SocketSinkImporterCreatorTest.java
+++
b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/importer/SocketSinkImporterCreatorTest.java
@@ -21,6 +21,7 @@ import io.netty.channel.Channel;
import
org.apache.shardingsphere.data.pipeline.api.config.ImporterConfiguration;
import
org.apache.shardingsphere.data.pipeline.cdc.core.connector.SocketSinkImporterConnector;
import org.apache.shardingsphere.data.pipeline.spi.importer.ImporterCreator;
+import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -41,7 +42,7 @@ public final class SocketSinkImporterCreatorTest {
@Test
public void assertCreateCDCImporter() {
- SocketSinkImporterConnector importerConnector = new
SocketSinkImporterConnector(mock(Channel.class), "foo_db", 1,
Collections.emptyList(), null);
+ SocketSinkImporterConnector importerConnector = new
SocketSinkImporterConnector(mock(Channel.class),
mock(ShardingSphereDatabase.class), 1, Collections.emptyList(), null);
assertThat(TypedSPILoader.getService(ImporterCreator.class,
"Socket").createImporter(importerConfig, importerConnector, null, null, null),
instanceOf(SocketSinkImporter.class));
}
}
diff --git
a/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCSchemaTableUtilTest.java
b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCSchemaTableUtilTest.java
new file mode 100644
index 00000000000..8d560b14649
--- /dev/null
+++
b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/CDCSchemaTableUtilTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.util;
+
+import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody.SchemaTable;
+import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
+import
org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseType;
+import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
+import
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereSchema;
+import
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereTable;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.Mockito.mock;
+
+public final class CDCSchemaTableUtilTest {
+
+ @Test
+ public void assertParseTableExpression() {
+ Map<String, ShardingSphereSchema> schemas = new HashMap<>();
+ schemas.put("public", mockedPublicSchema());
+ schemas.put("test", mockedTestSchema());
+ ShardingSphereDatabase database = new
ShardingSphereDatabase("sharding_db", new OpenGaussDatabaseType(), null, null,
schemas);
+ List<SchemaTable> schemaTables =
Arrays.asList(SchemaTable.newBuilder().setSchema("public").setTable("t_order").build(),
+
SchemaTable.newBuilder().setSchema("test").setTable("*").build());
+ Map<String, Set<String>> expected = new HashMap<>();
+ expected.put("test", new HashSet<>(Arrays.asList("t_order_item",
"t_order_item2")));
+ expected.put("public", Collections.singleton("t_order"));
+ Map<String, Set<String>> actual =
CDCSchemaTableUtil.parseTableExpressionWithSchema(database, schemaTables);
+ assertThat(actual, is(expected));
+ schemaTables =
Collections.singletonList(SchemaTable.newBuilder().setTable("t_order").build());
+ actual = CDCSchemaTableUtil.parseTableExpressionWithSchema(database,
schemaTables);
+ expected = Collections.singletonMap("",
Collections.singleton("t_order"));
+ assertThat(actual, is(expected));
+ schemaTables =
Collections.singletonList(SchemaTable.newBuilder().setSchema("*").setTable("t_order").build());
+ actual = CDCSchemaTableUtil.parseTableExpressionWithSchema(database,
schemaTables);
+ expected = new HashMap<>();
+ expected.put("public", Collections.singleton("t_order"));
+ assertThat(actual, is(expected));
+ }
+
+ private ShardingSphereSchema mockedPublicSchema() {
+ Map<String, ShardingSphereTable> tables = new HashMap<>();
+ tables.put("t_order", mock(ShardingSphereTable.class));
+ tables.put("t_order2", mock(ShardingSphereTable.class));
+ return new ShardingSphereSchema(tables, Collections.emptyMap());
+ }
+
+ private ShardingSphereSchema mockedTestSchema() {
+ Map<String, ShardingSphereTable> tables = new HashMap<>();
+ tables.put("t_order_item", mock(ShardingSphereTable.class));
+ tables.put("t_order_item2", mock(ShardingSphereTable.class));
+ return new ShardingSphereSchema(tables, Collections.emptyMap());
+ }
+
+ @Test
+ public void assertParseTableExpressionWithoutSchema() {
+ Map<String, ShardingSphereSchema> schemas = new HashMap<>();
+ schemas.put("sharding_db", mockedPublicSchema());
+ ShardingSphereDatabase database = new
ShardingSphereDatabase("sharding_db", new MySQLDatabaseType(), null, null,
schemas);
+ List<String> schemaTables = Collections.singletonList("*");
+ Collection<String> actualWildcardTable =
CDCSchemaTableUtil.parseTableExpressionWithoutSchema(database, schemaTables);
+ Set<String> expectedWildcardTable = new
HashSet<>(Arrays.asList("t_order", "t_order2"));
+ assertThat(actualWildcardTable, is(expectedWildcardTable));
+ schemaTables = Collections.singletonList("t_order");
+ Collection<String> actualSingleTable =
CDCSchemaTableUtil.parseTableExpressionWithoutSchema(database, schemaTables);
+ Set<String> expectedSingleTable = new
HashSet<>(Collections.singletonList("t_order"));
+ assertThat(actualSingleTable, is(expectedSingleTable));
+ }
+}
diff --git
a/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/DataRecordResultConvertUtilTest.java
b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/DataRecordResultConvertUtilTest.java
new file mode 100644
index 00000000000..6126551315a
--- /dev/null
+++
b/kernel/data-pipeline/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/util/DataRecordResultConvertUtilTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.util;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.TimestampProto;
+import com.google.protobuf.TypeRegistry;
+import com.google.protobuf.WrappersProto;
+import com.google.protobuf.util.JsonFormat;
+import
org.apache.shardingsphere.data.pipeline.api.ingest.position.IntegerPrimaryKeyPosition;
+import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
+import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
+import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponseProtocol;
+import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record;
+import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record.Builder;
+import org.junit.Test;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.sql.Blob;
+import java.sql.Clob;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.time.LocalTime;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public final class DataRecordResultConvertUtilTest {
+
+ @Test
+ public void assertConvertDataRecordToRecord() throws
InvalidProtocolBufferException, SQLException {
+ DataRecord dataRecord = new DataRecord(new
IntegerPrimaryKeyPosition(0, 1), 2);
+ dataRecord.addColumn(new Column("order_id", BigInteger.ONE, false,
true));
+ dataRecord.addColumn(new Column("price", BigDecimal.valueOf(123),
false, false));
+ dataRecord.addColumn(new Column("user_id", Long.MAX_VALUE, false,
false));
+ dataRecord.addColumn(new Column("item_id", Integer.MAX_VALUE, false,
false));
+ dataRecord.addColumn(new Column("create_time", LocalTime.now(), false,
false));
+ Blob mockedBlob = mock(Blob.class);
+ when(mockedBlob.getBytes(anyLong(), anyInt())).thenReturn(new
byte[]{-1, 0, 1});
+ dataRecord.addColumn(new Column("data_blob", mockedBlob, false,
false));
+ Clob mockedClob = mock(Clob.class);
+ when(mockedClob.getSubString(anyLong(),
anyInt())).thenReturn("clob\n");
+ dataRecord.addColumn(new Column("text_clob", mockedClob, false,
false));
+ dataRecord.addColumn(new Column("update_time", new
Timestamp(System.currentTimeMillis()), false, false));
+ dataRecord.setTableName("t_order");
+ dataRecord.setType("INSERT");
+ TypeRegistry registry =
TypeRegistry.newBuilder().add(CDCResponseProtocol.getDescriptor().getFile().getMessageTypes()).add(WrappersProto.getDescriptor().getMessageTypes())
+ .add(TimestampProto.getDescriptor().getMessageTypes()).build();
+ Record expectedRecord =
DataRecordResultConvertUtil.convertDataRecordToRecord("test", null, dataRecord);
+ String print =
JsonFormat.printer().usingTypeRegistry(registry).print(expectedRecord);
+ Builder actualRecord = Record.newBuilder();
+ JsonFormat.parser().usingTypeRegistry(registry).merge(print,
actualRecord);
+ assertEquals(actualRecord.build(), expectedRecord);
+ }
+}
diff --git
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandler.java
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandler.java
index b5b6017681f..295ab70e631 100644
---
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandler.java
+++
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandler.java
@@ -38,20 +38,17 @@ import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRe
import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody.SchemaTable;
import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse;
import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.StreamDataResult;
+import org.apache.shardingsphere.data.pipeline.cdc.util.CDCSchemaTableUtil;
+import org.apache.shardingsphere.data.pipeline.cdc.util.CDCTableRuleUtil;
import org.apache.shardingsphere.data.pipeline.core.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import
org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
-import org.apache.shardingsphere.infra.database.type.DatabaseType;
-import
org.apache.shardingsphere.infra.database.type.SchemaSupportedDatabaseType;
import
org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseType;
import org.apache.shardingsphere.infra.datanode.DataNode;
-import org.apache.shardingsphere.infra.exception.SchemaNotFoundException;
import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
-import
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereSchema;
import org.apache.shardingsphere.sharding.rule.ShardingRule;
-import org.apache.shardingsphere.sharding.rule.TableRule;
import java.sql.SQLException;
import java.util.Collection;
@@ -61,7 +58,6 @@ import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
@@ -89,16 +85,17 @@ public final class CDCBackendHandler {
if (null == database) {
return CDCResponseGenerator.failed(requestId,
CDCResponseErrorCode.SERVER_ERROR, String.format("%s database is not exists",
requestBody.getDatabase()));
}
- Map<String, Collection<String>> schemaTableNameMap;
+ Map<String, Set<String>> schemaTableNameMap;
Collection<String> tableNames;
Set<String> schemaTableNames = new HashSet<>();
if (database.getProtocolType().isSchemaAvailable()) {
- schemaTableNameMap = getSchemaTableMapWithSchema(database,
requestBody.getSourceSchemaTablesList());
+ schemaTableNameMap =
CDCSchemaTableUtil.parseTableExpressionWithSchema(database,
requestBody.getSourceSchemaTablesList());
// TODO if different schema have same table names, table name may
be overwritten, because the table name at sharding rule not contain schema.
tableNames =
schemaTableNameMap.values().stream().flatMap(Collection::stream).collect(Collectors.toList());
schemaTableNameMap.forEach((k, v) -> v.forEach(tableName ->
schemaTableNames.add(k.isEmpty() ? tableName : String.join(".", k,
tableName))));
} else {
- schemaTableNames.addAll(getTableNamesWithoutSchema(database,
requestBody.getSourceSchemaTablesList()));
+
schemaTableNames.addAll(CDCSchemaTableUtil.parseTableExpressionWithoutSchema(database,
requestBody.getSourceSchemaTablesList().stream().map(SchemaTable::getTable)
+ .collect(Collectors.toList())));
tableNames = schemaTableNames;
}
if (tableNames.isEmpty()) {
@@ -111,7 +108,7 @@ public final class CDCBackendHandler {
Map<String, List<DataNode>> actualDataNodesMap = new HashMap<>();
// TODO need support case-insensitive later
for (String each : tableNames) {
- actualDataNodesMap.put(each,
getActualDataNodes(shardingRule.get(), each));
+ actualDataNodesMap.put(each,
CDCTableRuleUtil.getActualDataNodes(shardingRule.get(), each));
}
boolean decodeWithTx = database.getProtocolType() instanceof
OpenGaussDatabaseType;
StreamDataParameter parameter = new
StreamDataParameter(requestBody.getDatabase(), new
LinkedList<>(schemaTableNames), requestBody.getFull(), actualDataNodesMap,
decodeWithTx);
@@ -121,61 +118,6 @@ public final class CDCBackendHandler {
return
CDCResponseGenerator.succeedBuilder(requestId).setStreamDataResult(StreamDataResult.newBuilder().setStreamingId(jobId).build()).build();
}
- private Map<String, Collection<String>> getSchemaTableMapWithSchema(final
ShardingSphereDatabase database, final List<SchemaTable> schemaTables) {
- Map<String, Collection<String>> result = new HashMap<>();
- Collection<String> systemSchemas =
database.getProtocolType().getSystemSchemas();
- Optional<SchemaTable> allSchemaTablesOptional =
schemaTables.stream().filter(each -> "*".equals(each.getTable()) &&
"*".equals(each.getSchema())).findFirst();
- if (allSchemaTablesOptional.isPresent()) {
- for (Entry<String, ShardingSphereSchema> entry :
database.getSchemas().entrySet()) {
- if (systemSchemas.contains(entry.getKey())) {
- continue;
- }
- entry.getValue().getAllTableNames().forEach(tableName ->
result.computeIfAbsent(entry.getKey(), ignored -> new
HashSet<>()).add(tableName));
- }
- return result;
- }
- for (SchemaTable each : schemaTables) {
- if ("*".equals(each.getSchema())) {
- for (Entry<String, ShardingSphereSchema> entry :
database.getSchemas().entrySet()) {
- if (systemSchemas.contains(entry.getKey())) {
- continue;
- }
-
entry.getValue().getAllTableNames().stream().filter(tableName ->
tableName.equals(each.getTable())).findFirst()
- .ifPresent(tableName ->
result.computeIfAbsent(entry.getKey(), ignored -> new
HashSet<>()).add(tableName));
- }
- } else if ("*".equals(each.getTable())) {
- String schemaName = each.getSchema().isEmpty() ?
getDefaultSchema(database.getProtocolType()) : each.getSchema();
- ShardingSphereSchema schema = database.getSchema(schemaName);
- if (null == schema) {
- throw new SchemaNotFoundException(each.getSchema());
- }
- schema.getAllTableNames().forEach(tableName ->
result.computeIfAbsent(schemaName, ignored -> new HashSet<>()).add(tableName));
- } else {
- result.computeIfAbsent(each.getSchema(), ignored -> new
HashSet<>()).add(each.getTable());
- }
- }
- return result;
- }
-
- private String getDefaultSchema(final DatabaseType databaseType) {
- if (!(databaseType instanceof SchemaSupportedDatabaseType)) {
- return null;
- }
- return ((SchemaSupportedDatabaseType) databaseType).getDefaultSchema();
- }
-
- private Collection<String> getTableNamesWithoutSchema(final
ShardingSphereDatabase database, final List<SchemaTable> schemaTables) {
- Optional<SchemaTable> allTablesOptional =
schemaTables.stream().filter(each -> each.getTable().equals("*")).findFirst();
- Set<String> allTableNames = new
HashSet<>(database.getSchema(database.getName()).getAllTableNames());
- return allTablesOptional.isPresent() ? allTableNames :
schemaTables.stream().map(SchemaTable::getTable).collect(Collectors.toSet());
- }
-
- private List<DataNode> getActualDataNodes(final ShardingRule shardingRule,
final String logicTableName) {
- TableRule tableRule = shardingRule.getTableRule(logicTableName);
- // TODO support virtual data source name
- return tableRule.getActualDataNodes();
- }
-
/**
* Get database name by job id.
*
@@ -209,7 +151,7 @@ public final class CDCBackendHandler {
Comparator<DataRecord> dataRecordComparator =
cdcJobConfig.isDecodeWithTX()
?
DataRecordComparatorGenerator.generatorIncrementalComparator(database.getProtocolType())
: null;
- CDCJob job = new CDCJob(new SocketSinkImporterConnector(channel,
cdcJobConfig.getDatabaseName(), cdcJobConfig.getJobShardingCount(),
cdcJobConfig.getSchemaTableNames(), dataRecordComparator));
+ CDCJob job = new CDCJob(new SocketSinkImporterConnector(channel,
database, cdcJobConfig.getJobShardingCount(),
cdcJobConfig.getSchemaTableNames(), dataRecordComparator));
PipelineJobCenter.addJob(jobConfigPOJO.getJobName(), job);
OneOffJobBootstrap oneOffJobBootstrap = new
OneOffJobBootstrap(PipelineAPIFactory.getRegistryCenter(), job,
jobConfigPOJO.toJobConfiguration());
job.setJobBootstrap(oneOffJobBootstrap);
diff --git
a/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandlerTest.java
b/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandlerTest.java
deleted file mode 100644
index 1d96e9c4873..00000000000
---
a/proxy/backend/core/src/test/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandlerTest.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.proxy.backend.handler.cdc;
-
-import io.netty.channel.Channel;
-import
org.apache.shardingsphere.data.pipeline.cdc.context.CDCConnectionContext;
-import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest;
-import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CDCRequest.Type;
-import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody;
-import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody.SchemaTable;
-import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse;
-import
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.CDCResponse.Status;
-import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
-import org.apache.shardingsphere.infra.config.props.ConfigurationProperties;
-import org.apache.shardingsphere.infra.database.type.dialect.MySQLDatabaseType;
-import
org.apache.shardingsphere.infra.database.type.dialect.OpenGaussDatabaseType;
-import org.apache.shardingsphere.infra.instance.InstanceContext;
-import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
-import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
-import
org.apache.shardingsphere.infra.metadata.database.rule.ShardingSphereRuleMetaData;
-import
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereSchema;
-import
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereTable;
-import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
-import org.apache.shardingsphere.infra.util.reflection.ReflectionUtil;
-import org.apache.shardingsphere.mode.manager.ContextManager;
-import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
-import org.apache.shardingsphere.mode.metadata.persist.MetaDataPersistService;
-import org.apache.shardingsphere.sharding.rule.ShardingRule;
-import org.apache.shardingsphere.test.mock.AutoMockExtension;
-import org.apache.shardingsphere.test.mock.StaticMockSettings;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-@ExtendWith(AutoMockExtension.class)
-@StaticMockSettings(PipelineContext.class)
-public final class CDCBackendHandlerTest {
-
- private final CDCBackendHandler handler = new CDCBackendHandler();
-
- @BeforeEach
- public void setUp() {
- MetaDataContexts metaDataContexts = new
MetaDataContexts(mock(MetaDataPersistService.class),
- new ShardingSphereMetaData(getDatabases(),
mock(ShardingSphereRuleMetaData.class), new ConfigurationProperties(new
Properties())));
- ContextManager contextManager = new ContextManager(metaDataContexts,
mock(InstanceContext.class));
- when(PipelineContext.getContextManager()).thenReturn(contextManager);
- }
-
- private static Map<String, ShardingSphereDatabase> getDatabases() {
- ShardingSphereDatabase database = mock(ShardingSphereDatabase.class,
RETURNS_DEEP_STUBS);
- when(database.getProtocolType()).thenReturn(new MySQLDatabaseType());
- Set<ShardingSphereRule> shardingRule =
Collections.singleton(mock(ShardingRule.class));
- when(database.getRuleMetaData().getRules()).thenReturn(shardingRule);
- return Collections.singletonMap("foo_db", database);
- }
-
- @Test
- public void assertStreamDataRequestFailed() {
- CDCRequest request =
CDCRequest.newBuilder().setRequestId("1").setType(Type.STREAM_DATA).setStreamDataRequestBody(StreamDataRequestBody.newBuilder().setDatabase("none")).build();
- CDCResponse actualResponse =
handler.streamData(request.getRequestId(), request.getStreamDataRequestBody(),
mock(CDCConnectionContext.class), mock(Channel.class));
- assertThat(actualResponse.getStatus(), is(Status.FAILED));
- }
-
- @Test
- public void assertGetSchemaTableMapWithSchema() throws
NoSuchMethodException {
- Map<String, ShardingSphereSchema> schemas = new HashMap<>();
- schemas.put("test", mockSchema());
- schemas.put("public", mockSchema());
- ShardingSphereDatabase database = new ShardingSphereDatabase("foo_db",
new OpenGaussDatabaseType(), null, null, schemas);
- List<SchemaTable> schemaTables =
Arrays.asList(SchemaTable.newBuilder().setSchema("public").setTable("t_order").build(),
-
SchemaTable.newBuilder().setSchema("test").setTable("*").build());
- Map<String, Collection<String>> expected = new HashMap<>();
- expected.put("test", new HashSet<>(Arrays.asList("t_order",
"t_order_item")));
- expected.put("public", new
HashSet<>(Collections.singletonList("t_order")));
- Map<String, String> actual =
getSchemaTableMapWithSchemaResult(database, schemaTables);
- assertThat(actual, is(expected));
- schemaTables =
Collections.singletonList(SchemaTable.newBuilder().setTable("t_order").build());
- actual = getSchemaTableMapWithSchemaResult(database, schemaTables);
- expected = Collections.singletonMap("",
Collections.singleton("t_order"));
- assertThat(actual, is(expected));
- schemaTables =
Collections.singletonList(SchemaTable.newBuilder().setSchema("*").setTable("t_order").build());
- actual = getSchemaTableMapWithSchemaResult(database, schemaTables);
- expected = new HashMap<>();
- expected.put("test", new
HashSet<>(Collections.singletonList("t_order")));
- expected.put("public", new
HashSet<>(Collections.singletonList("t_order")));
- assertThat(actual, is(expected));
- }
-
- private ShardingSphereSchema mockSchema() {
- Map<String, ShardingSphereTable> tables = new HashMap<>();
- tables.put("t_order", mock(ShardingSphereTable.class));
- tables.put("t_order_item", mock(ShardingSphereTable.class));
- return new ShardingSphereSchema(tables, Collections.emptyMap());
- }
-
- private Map<String, String> getSchemaTableMapWithSchemaResult(final
ShardingSphereDatabase database, final List<SchemaTable> schemaTables) throws
NoSuchMethodException {
- return
ReflectionUtil.invokeMethod(handler.getClass().getDeclaredMethod("getSchemaTableMapWithSchema",
ShardingSphereDatabase.class, List.class),
- handler, database, schemaTables);
- }
-
- @Test
- public void assertGetSchemaTableMapWithoutSchema() throws
NoSuchMethodException {
- Map<String, ShardingSphereSchema> schemas = new HashMap<>();
- schemas.put("foo_db", mockSchema());
- ShardingSphereDatabase database = new ShardingSphereDatabase("foo_db",
new MySQLDatabaseType(), null, null, schemas);
- List<SchemaTable> schemaTables =
Collections.singletonList(SchemaTable.newBuilder().setTable("*").build());
- Collection<String> actualWildcardTable =
getSchemaTableMapWithoutSchemaResult(database, schemaTables);
- Set<String> expectedWildcardTable = new
HashSet<>(Arrays.asList("t_order", "t_order_item"));
- assertThat(actualWildcardTable, is(expectedWildcardTable));
- schemaTables =
Collections.singletonList(SchemaTable.newBuilder().setTable("t_order").build());
- Collection<String> actualSingleTable =
getSchemaTableMapWithoutSchemaResult(database, schemaTables);
- Set<String> expectedSingleTable = new
HashSet<>(Collections.singletonList("t_order"));
- assertThat(actualSingleTable, is(expectedSingleTable));
- }
-
- private Collection<String> getSchemaTableMapWithoutSchemaResult(final
ShardingSphereDatabase database, final List<SchemaTable> schemaTables) throws
NoSuchMethodException {
- return
ReflectionUtil.invokeMethod(handler.getClass().getDeclaredMethod("getTableNamesWithoutSchema",
ShardingSphereDatabase.class, List.class),
- handler, database, schemaTables);
- }
-}