sandynz commented on code in PR #24209:
URL: https://github.com/apache/shardingsphere/pull/24209#discussion_r1109305995


##########
kernel/data-pipeline/cdc/client/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/client/example/opengauss/Bootstrap.java:
##########
@@ -32,7 +32,7 @@ public final class Bootstrap {
      * @param args args
      */
     public static void main(final String[] args) {
-        ImportDataSourceParameter importDataSourceParam = new 
ImportDataSourceParameter("jdbc:opengauss:localhost:5432/cdc_db?stringtype=unspecified",
 "gaussdb", "Root@123");
+        ImportDataSourceParameter importDataSourceParam = new 
ImportDataSourceParameter("jdbc:opengauss://localhost:5432/cdc_db?stringtype=unspecified",
 "gaussdb", "Root@123");

Review Comment:
   Could we move `Bootstrap` to `example` package?
   
   And then merge other similar implementations to here.
   



##########
kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/config/job/CDCJobConfiguration.java:
##########
@@ -36,7 +37,7 @@ public final class CDCJobConfiguration implements 
PipelineJobConfiguration {
     
     private final String database;
     
-    private final List<String> schemaTableNames;
+    private final Collection<String> schemaTableNames;

Review Comment:
   It's better to use `List` not `Collection` in yaml related configuration 
class



##########
kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJobId.java:
##########
@@ -35,11 +35,11 @@ public final class CDCJobId extends AbstractPipelineJobId {
     
     private final String databaseName;
     
-    private final List<String> tableNames;
+    private final Collection<String> tableNames;

Review Comment:
   Does `tableNames` mean `schemaTableNames`?



##########
proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandler.java:
##########
@@ -98,6 +117,58 @@ public CDCResponse streamData(final String requestId, final 
StreamDataRequestBod
         return 
CDCResponseGenerator.succeedBuilder(requestId).setStreamDataResult(StreamDataResult.newBuilder().setStreamingId(jobId).build()).build();
     }
     
+    private Map<String, Collection<String>> getSchemaTableMapWithSchema(final 
ShardingSphereDatabase database, final List<SchemaTable> schemaTables) {
+        Map<String, Collection<String>> result = new HashMap<>();
+        Collection<String> systemSchemas = 
database.getProtocolType().getSystemSchemas();
+        Optional<SchemaTable> allSchemaTablesOptional = 
schemaTables.stream().filter(each -> "*".equals(each.getTable()) && 
"*".equals(each.getSchema())).findFirst();
+        if (allSchemaTablesOptional.isPresent()) {
+            for (Entry<String, ShardingSphereSchema> entry : 
database.getSchemas().entrySet()) {
+                if (systemSchemas.contains(entry.getKey())) {
+                    continue;
+                }
+                entry.getValue().getAllTableNames().forEach(tableName -> 
result.computeIfAbsent(entry.getKey(), ignored -> new 
HashSet<>()).add(tableName));
+            }
+            return result;
+        }
+        for (SchemaTable each : schemaTables) {
+            if ("*".equals(each.getSchema())) {
+                for (Entry<String, ShardingSphereSchema> entry : 
database.getSchemas().entrySet()) {
+                    if (systemSchemas.contains(entry.getKey())) {
+                        continue;
+                    }
+                    
entry.getValue().getAllTableNames().stream().filter(tableName -> 
tableName.equals(each.getTable())).findFirst()
+                            .ifPresent(tableName -> 
result.computeIfAbsent(entry.getKey(), ignored -> new 
HashSet<>()).add(tableName));
+                }
+            } else if ("*".equals(each.getTable())) {
+                String schemaName = each.getSchema().isEmpty() ? 
getDefaultSchema(database.getProtocolType()) : each.getSchema();
+                ShardingSphereSchema schema = database.getSchema(schemaName);
+                if (null == schema) {
+                    throw new SchemaNotFoundException(each.getSchema());
+                }
+                schema.getAllTableNames().forEach(tableName -> 
result.computeIfAbsent(schemaName, ignored -> new HashSet<>()).add(tableName));
+            } else {
+                result.computeIfAbsent(each.getSchema(), ignored -> new 
HashSet<>()).add(each.getTable());
+            }
+        }
+        return result;
+    }
+    
+    private String getDefaultSchema(final DatabaseType databaseType) {
+        if (!(databaseType instanceof SchemaSupportedDatabaseType)) {
+            return null;
+        }
+        return ((SchemaSupportedDatabaseType) databaseType).getDefaultSchema();
+    }
+    
+    private Collection<String> getTableNamesWithoutSchema(final 
ShardingSphereDatabase database, final List<SchemaTable> schemaTables) {
+        Optional<SchemaTable> allTablesOptional = 
schemaTables.stream().filter(each -> each.getTable().equals("*")).findFirst();
+        Set<String> allTableNames = new 
HashSet<>(database.getSchema(database.getName()).getAllTableNames());
+        if (allTablesOptional.isPresent()) {
+            return allTableNames;
+        }

Review Comment:
   `allTableNames` could be combined in `return`



##########
proxy/backend/src/test/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandlerTest.java:
##########
@@ -91,4 +101,61 @@ public void assertStreamDataRequestFailed() {
         CDCResponse actualResponse = 
handler.streamData(request.getRequestId(), request.getStreamDataRequestBody(), 
mock(CDCConnectionContext.class), mock(Channel.class));
         assertThat(actualResponse.getStatus(), is(Status.FAILED));
     }
+    
+    @Test
+    public void assertGetSchemaTableMapWithSchema() throws 
NoSuchMethodException {
+        Map<String, ShardingSphereSchema> schemas = new HashMap<>();
+        schemas.put("test", mockSchema());
+        schemas.put("public", mockSchema());
+        ShardingSphereDatabase database = new 
ShardingSphereDatabase("sharding_db", new PostgreSQLDatabaseType(), null, null, 
schemas);

Review Comment:
   Could `PostgreSQLDatabaseType` be `OpenGaussDatabaseType`?



##########
proxy/backend/src/test/java/org/apache/shardingsphere/proxy/backend/handler/cdc/CDCBackendHandlerTest.java:
##########
@@ -91,4 +101,61 @@ public void assertStreamDataRequestFailed() {
         CDCResponse actualResponse = 
handler.streamData(request.getRequestId(), request.getStreamDataRequestBody(), 
mock(CDCConnectionContext.class), mock(Channel.class));
         assertThat(actualResponse.getStatus(), is(Status.FAILED));
     }
+    
+    @Test
+    public void assertGetSchemaTableMapWithSchema() throws 
NoSuchMethodException {
+        Map<String, ShardingSphereSchema> schemas = new HashMap<>();
+        schemas.put("test", mockSchema());
+        schemas.put("public", mockSchema());
+        ShardingSphereDatabase database = new 
ShardingSphereDatabase("sharding_db", new PostgreSQLDatabaseType(), null, null, 
schemas);
+        List<SchemaTable> schemaTables = 
Arrays.asList(SchemaTable.newBuilder().setSchema("public").setTable("t_order").build(),
+                
SchemaTable.newBuilder().setSchema("test").setTable("*").build());
+        Map<String, Collection<String>> expected = new HashMap<>();
+        expected.put("test", new HashSet<>(Arrays.asList("t_order", 
"t_order_item")));
+        expected.put("public", new 
HashSet<>(Collections.singletonList("t_order")));
+        Map<String, String> actual = 
getSchemaTableMapWithSchemaResult(database, schemaTables);
+        assertThat(actual, is(expected));
+        schemaTables = 
Collections.singletonList(SchemaTable.newBuilder().setTable("t_order").build());
+        actual = getSchemaTableMapWithSchemaResult(database, schemaTables);
+        expected = Collections.singletonMap("", 
Collections.singleton("t_order"));
+        assertThat(actual, is(expected));
+        schemaTables = 
Collections.singletonList(SchemaTable.newBuilder().setSchema("*").setTable("t_order").build());
+        actual = getSchemaTableMapWithSchemaResult(database, schemaTables);
+        expected = new HashMap<>();
+        expected.put("test", new 
HashSet<>(Collections.singletonList("t_order")));
+        expected.put("public", new 
HashSet<>(Collections.singletonList("t_order")));
+        assertThat(actual, is(expected));
+    }
+    
+    private ShardingSphereSchema mockSchema() {
+        Map<String, ShardingSphereTable> tables = new HashMap<>();
+        tables.put("t_order", mock(ShardingSphereTable.class));
+        tables.put("t_order_item", mock(ShardingSphereTable.class));
+        return new ShardingSphereSchema(tables, Collections.emptyMap());
+    }
+    
+    private Map<String, String> getSchemaTableMapWithSchemaResult(final 
ShardingSphereDatabase database, final List<SchemaTable> schemaTables) throws 
NoSuchMethodException {
+        return 
ReflectionUtil.invokeMethod(handler.getClass().getDeclaredMethod("getSchemaTableMapWithSchema",
 ShardingSphereDatabase.class, List.class),
+                handler, database, schemaTables);
+    }
+    
+    @Test
+    public void assertGetSchemaTableMapWithoutSchema() throws 
NoSuchMethodException {
+        Map<String, ShardingSphereSchema> schemas = new HashMap<>();
+        schemas.put("sharding_db", mockSchema());
+        ShardingSphereDatabase database = new 
ShardingSphereDatabase("sharding_db", new PostgreSQLDatabaseType(), null, null, 
schemas);

Review Comment:
   Could `PostgreSQLDatabaseType` be `MySQLDatabaseType`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to