sandynz commented on code in PR #24209:
URL: https://github.com/apache/shardingsphere/pull/24209#discussion_r1109305995
##########
kernel/data-pipeline/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/example/opengauss/Bootstrap.java:
##########
@@ -32,7 +32,7 @@ public final class Bootstrap {
* @param args args
*/
public static void main(final String[] args) {
- ImportDataSourceParameter importDataSourceParam = new
ImportDataSourceParameter("jdbc:opengauss:localhost:5432/cdc_db?stringtype=unspecified",
"gaussdb", "Root@123");
+ ImportDataSourceParameter importDataSourceParam = new
ImportDataSourceParameter("jdbc:opengauss://localhost:5432/cdc_db?stringtype=unspecified",
"gaussdb", "Root@123");
Review Comment:
Could we move `Bootstrap` to `example` package?
And then merge other similar implementations to here.
##########
kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/config/job/CDCJobConfiguration.java:
##########
@@ -36,7 +37,7 @@ public final class CDCJobConfiguration implements
PipelineJobConfiguration {
private final String database;
- private final List<String> schemaTableNames;
+ private final Collection<String> schemaTableNames;
Review Comment:
It's better to use `List` not `Collection` in yaml related configuration
class
##########
kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJobId.java:
##########
@@ -35,11 +35,11 @@ public final class CDCJobId extends AbstractPipelineJobId {
private final String databaseName;
- private final List<String> tableNames;
+ private final Collection<String> tableNames;
Review Comment:
Does `tableNames` mean `schemaTableNames`?
##########
proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandler.java:
##########
@@ -98,6 +117,58 @@ public CDCResponse streamData(final String requestId, final
StreamDataRequestBod
return
CDCResponseGenerator.succeedBuilder(requestId).setStreamDataResult(StreamDataResult.newBuilder().setStreamingId(jobId).build()).build();
}
+ private Map<String, Collection<String>> getSchemaTableMapWithSchema(final
ShardingSphereDatabase database, final List<SchemaTable> schemaTables) {
+ Map<String, Collection<String>> result = new HashMap<>();
+ Collection<String> systemSchemas =
database.getProtocolType().getSystemSchemas();
+ Optional<SchemaTable> allSchemaTablesOptional =
schemaTables.stream().filter(each -> "*".equals(each.getTable()) &&
"*".equals(each.getSchema())).findFirst();
+ if (allSchemaTablesOptional.isPresent()) {
+ for (Entry<String, ShardingSphereSchema> entry :
database.getSchemas().entrySet()) {
+ if (systemSchemas.contains(entry.getKey())) {
+ continue;
+ }
+ entry.getValue().getAllTableNames().forEach(tableName ->
result.computeIfAbsent(entry.getKey(), ignored -> new
HashSet<>()).add(tableName));
+ }
+ return result;
+ }
+ for (SchemaTable each : schemaTables) {
+ if ("*".equals(each.getSchema())) {
+ for (Entry<String, ShardingSphereSchema> entry :
database.getSchemas().entrySet()) {
+ if (systemSchemas.contains(entry.getKey())) {
+ continue;
+ }
+
entry.getValue().getAllTableNames().stream().filter(tableName ->
tableName.equals(each.getTable())).findFirst()
+ .ifPresent(tableName ->
result.computeIfAbsent(entry.getKey(), ignored -> new
HashSet<>()).add(tableName));
+ }
+ } else if ("*".equals(each.getTable())) {
+ String schemaName = each.getSchema().isEmpty() ?
getDefaultSchema(database.getProtocolType()) : each.getSchema();
+ ShardingSphereSchema schema = database.getSchema(schemaName);
+ if (null == schema) {
+ throw new SchemaNotFoundException(each.getSchema());
+ }
+ schema.getAllTableNames().forEach(tableName ->
result.computeIfAbsent(schemaName, ignored -> new HashSet<>()).add(tableName));
+ } else {
+ result.computeIfAbsent(each.getSchema(), ignored -> new
HashSet<>()).add(each.getTable());
+ }
+ }
+ return result;
+ }
+
+ private String getDefaultSchema(final DatabaseType databaseType) {
+ if (!(databaseType instanceof SchemaSupportedDatabaseType)) {
+ return null;
+ }
+ return ((SchemaSupportedDatabaseType) databaseType).getDefaultSchema();
+ }
+
+ private Collection<String> getTableNamesWithoutSchema(final
ShardingSphereDatabase database, final List<SchemaTable> schemaTables) {
+ Optional<SchemaTable> allTablesOptional =
schemaTables.stream().filter(each -> each.getTable().equals("*")).findFirst();
+ Set<String> allTableNames = new
HashSet<>(database.getSchema(database.getName()).getAllTableNames());
+ if (allTablesOptional.isPresent()) {
+ return allTableNames;
+ }
Review Comment:
`allTableNames` could be combined in `return`
##########
proxy/backend/src/test/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandlerTest.java:
##########
@@ -91,4 +101,61 @@ public void assertStreamDataRequestFailed() {
CDCResponse actualResponse =
handler.streamData(request.getRequestId(), request.getStreamDataRequestBody(),
mock(CDCConnectionContext.class), mock(Channel.class));
assertThat(actualResponse.getStatus(), is(Status.FAILED));
}
+
+ @Test
+ public void assertGetSchemaTableMapWithSchema() throws
NoSuchMethodException {
+ Map<String, ShardingSphereSchema> schemas = new HashMap<>();
+ schemas.put("test", mockSchema());
+ schemas.put("public", mockSchema());
+ ShardingSphereDatabase database = new
ShardingSphereDatabase("sharding_db", new PostgreSQLDatabaseType(), null, null,
schemas);
Review Comment:
Could `PostgreSQLDatabaseType` be `OpenGaussDatabaseType`?
##########
proxy/backend/src/test/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandlerTest.java:
##########
@@ -91,4 +101,61 @@ public void assertStreamDataRequestFailed() {
CDCResponse actualResponse =
handler.streamData(request.getRequestId(), request.getStreamDataRequestBody(),
mock(CDCConnectionContext.class), mock(Channel.class));
assertThat(actualResponse.getStatus(), is(Status.FAILED));
}
+
+ @Test
+ public void assertGetSchemaTableMapWithSchema() throws
NoSuchMethodException {
+ Map<String, ShardingSphereSchema> schemas = new HashMap<>();
+ schemas.put("test", mockSchema());
+ schemas.put("public", mockSchema());
+ ShardingSphereDatabase database = new
ShardingSphereDatabase("sharding_db", new PostgreSQLDatabaseType(), null, null,
schemas);
+ List<SchemaTable> schemaTables =
Arrays.asList(SchemaTable.newBuilder().setSchema("public").setTable("t_order").build(),
+
SchemaTable.newBuilder().setSchema("test").setTable("*").build());
+ Map<String, Collection<String>> expected = new HashMap<>();
+ expected.put("test", new HashSet<>(Arrays.asList("t_order",
"t_order_item")));
+ expected.put("public", new
HashSet<>(Collections.singletonList("t_order")));
+ Map<String, String> actual =
getSchemaTableMapWithSchemaResult(database, schemaTables);
+ assertThat(actual, is(expected));
+ schemaTables =
Collections.singletonList(SchemaTable.newBuilder().setTable("t_order").build());
+ actual = getSchemaTableMapWithSchemaResult(database, schemaTables);
+ expected = Collections.singletonMap("",
Collections.singleton("t_order"));
+ assertThat(actual, is(expected));
+ schemaTables =
Collections.singletonList(SchemaTable.newBuilder().setSchema("*").setTable("t_order").build());
+ actual = getSchemaTableMapWithSchemaResult(database, schemaTables);
+ expected = new HashMap<>();
+ expected.put("test", new
HashSet<>(Collections.singletonList("t_order")));
+ expected.put("public", new
HashSet<>(Collections.singletonList("t_order")));
+ assertThat(actual, is(expected));
+ }
+
+ private ShardingSphereSchema mockSchema() {
+ Map<String, ShardingSphereTable> tables = new HashMap<>();
+ tables.put("t_order", mock(ShardingSphereTable.class));
+ tables.put("t_order_item", mock(ShardingSphereTable.class));
+ return new ShardingSphereSchema(tables, Collections.emptyMap());
+ }
+
+ private Map<String, String> getSchemaTableMapWithSchemaResult(final
ShardingSphereDatabase database, final List<SchemaTable> schemaTables) throws
NoSuchMethodException {
+ return
ReflectionUtil.invokeMethod(handler.getClass().getDeclaredMethod("getSchemaTableMapWithSchema",
ShardingSphereDatabase.class, List.class),
+ handler, database, schemaTables);
+ }
+
+ @Test
+ public void assertGetSchemaTableMapWithoutSchema() throws
NoSuchMethodException {
+ Map<String, ShardingSphereSchema> schemas = new HashMap<>();
+ schemas.put("sharding_db", mockSchema());
+ ShardingSphereDatabase database = new
ShardingSphereDatabase("sharding_db", new PostgreSQLDatabaseType(), null, null,
schemas);
Review Comment:
Could `PostgreSQLDatabaseType` be `MySQLDatabaseType`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]