sandynz commented on code in PR #23320:
URL: https://github.com/apache/shardingsphere/pull/23320#discussion_r1061156357


##########
kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/sqlbuilder/AbstractSQLBuilder.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.client.sqlbuilder;
+
+import com.google.common.base.Strings;
+import lombok.Getter;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record.TableMetaData;
+
+import java.util.Collection;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+public abstract class AbstractSQLBuilder implements SQLBuilder {

Review Comment:
   Class javadoc is required



##########
kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/sqlbuilder/SQLBuilderFactory.java:
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.client.sqlbuilder;
+
+/**
+ * SQL builder factory.
+ */
+public final class SQLBuilderFactory {
+    
+    /**
+     * Get sql builder.
+     *
+     * @param databaseType database type
+     * @return SQL builder
+     */
+    public static SQLBuilder getSQLBuilder(final String databaseType) {
+        switch (databaseType) {
+            case "openGauss":
+                return new OpenGaussSQLBuilder();
+            default:
+        }
+        throw new UnsupportedOperationException(String.format("Not supported 
%s now", databaseType));

Review Comment:
   It could be put to `default:` block



##########
kernel/data-pipeline/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/task/CDCInventoryIncrementalTasksRunner.java:
##########
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.core.task;
+
+import 
org.apache.shardingsphere.data.pipeline.api.context.PipelineJobItemContext;
+import org.apache.shardingsphere.data.pipeline.core.task.IncrementalTask;
+import 
org.apache.shardingsphere.data.pipeline.core.task.InventoryIncrementalTasksRunner;
+import org.apache.shardingsphere.data.pipeline.core.task.InventoryTask;
+
+import java.util.Collection;
+
+public final class CDCInventoryIncrementalTasksRunner extends 
InventoryIncrementalTasksRunner {

Review Comment:
   1, Class javadoc is required
   
   2, `CDCInventoryIncrementalTasksRunner` name could be simplified, e.g. 
`CDCTasksRunner`
   



##########
kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/example/opengauss/Bootstrap.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.client.example.opengauss;
+
+import org.apache.shardingsphere.data.pipeline.cdc.client.CDCClient;
+import 
org.apache.shardingsphere.data.pipeline.cdc.client.parameter.StartCDCClientParameter;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CreateSubscriptionRequest.SubscriptionMode;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.CreateSubscriptionRequest.TableName;
+
+import java.util.Collections;
+
+public final class Bootstrap {

Review Comment:
   Could it be in test?



##########
kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/importer/ImporterFactory.java:
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.client.importer;
+
+/**
+ * Importer factory.
+ */
+public final class ImporterFactory {
+    
+    /**
+     * Get importer.
+     *
+     * @param databaseType database type
+     * @return importer
+     */
+    public static Importer getImporter(final String databaseType) {
+        switch (databaseType) {
+            case "openGauss":
+                return new OpenGaussImporter();
+            default:
+        }
+        return null;
+    }

Review Comment:
   It's better to use SPI for better extension by user (since user could not 
modify `ImporterFactory`), TODO could be added for now



##########
kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/importer/OpenGaussImporter.java:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.client.importer;
+
+import com.google.protobuf.Any;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.ProtocolStringList;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import 
org.apache.shardingsphere.data.pipeline.cdc.client.sqlbuilder.SQLBuilder;
+import 
org.apache.shardingsphere.data.pipeline.cdc.client.sqlbuilder.SQLBuilderFactory;
+import org.apache.shardingsphere.data.pipeline.cdc.client.util.AnyValueConvert;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * OpenGauss importer.
+ */
+@RequiredArgsConstructor
+@Slf4j
+public final class OpenGaussImporter implements Importer {
+    
+    private final SQLBuilder sqlBuilder = 
SQLBuilderFactory.getSQLBuilder("openGauss");
+    
+    private final Connection connection;
+    
+    public OpenGaussImporter() {
+        Properties properties = new Properties();
+        try (InputStream inputStream = 
OpenGaussImporter.class.getClassLoader().getResourceAsStream("env/opengauss.properties"))
 {

Review Comment:
   There's `env/opengauss.properties` in module (and jar after uploading to 
Maven repository), how use override the settings



##########
kernel/data-pipeline/cdc/client/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/client/importer/OpenGaussImporter.java:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.cdc.client.importer;
+
+import com.google.protobuf.Any;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.ProtocolStringList;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import 
org.apache.shardingsphere.data.pipeline.cdc.client.sqlbuilder.SQLBuilder;
+import 
org.apache.shardingsphere.data.pipeline.cdc.client.sqlbuilder.SQLBuilderFactory;
+import org.apache.shardingsphere.data.pipeline.cdc.client.util.AnyValueConvert;
+import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * OpenGauss importer.
+ */
+@RequiredArgsConstructor
+@Slf4j
+public final class OpenGaussImporter implements Importer {
+    
+    private final SQLBuilder sqlBuilder = 
SQLBuilderFactory.getSQLBuilder("openGauss");
+    
+    private final Connection connection;
+    
+    public OpenGaussImporter() {
+        Properties properties = new Properties();
+        try (InputStream inputStream = 
OpenGaussImporter.class.getClassLoader().getResourceAsStream("env/opengauss.properties"))
 {
+            properties.load(inputStream);
+            String url = properties.getProperty("url");
+            String port = properties.getProperty("port");
+            String database = properties.getProperty("database");
+            String username = properties.getProperty("username");
+            String password = properties.getProperty("password");
+            connection = 
DriverManager.getConnection(String.format("jdbc:opengauss://%s:%s/%s", url, 
port, database), username, password);
+        } catch (final IOException | SQLException ex) {
+            throw new RuntimeException(ex);
+        }
+    }
+    
+    @Override
+    public void write(final Record record) throws SQLException, 
InvalidProtocolBufferException {
+        String sql = buildSQL(record);
+        try (PreparedStatement preparedStatement = 
connection.prepareStatement(sql)) {
+            List<Any> afterValue = new 
ArrayList<>(record.getAfterMap().values());
+            ProtocolStringList uniqueKeyNamesList = 
record.getTableMetaData().getUniqueKeyNamesList();
+            List<String> conditionColumnNames = 
record.getBeforeMap().keySet().containsAll(uniqueKeyNamesList) ? 
uniqueKeyNamesList : new ArrayList<>(record.getBeforeMap().keySet());
+            switch (record.getDataChangeType()) {
+                case INSERT:
+                    for (int i = 0; i < afterValue.size(); i++) {
+                        preparedStatement.setObject(i + 1, 
AnyValueConvert.convertToObject(afterValue.get(i)));
+                    }
+                    break;
+                case UPDATE:
+                    for (int i = 0; i < afterValue.size(); i++) {
+                        preparedStatement.setObject(i + 1, 
AnyValueConvert.convertToObject(afterValue.get(i)));
+                    }
+                    for (int i = 0; i < conditionColumnNames.size(); i++) {
+                        preparedStatement.setObject(afterValue.size() + i + 1, 
AnyValueConvert.convertToObject(record.getBeforeMap().get(conditionColumnNames.get(i))));
+                    }
+                    int updateCount = preparedStatement.executeUpdate();
+                    if (1 != updateCount) {
+                        log.warn("executeUpdate failed, updateCount={}, 
updateSql={}, updatedColumns={}, conditionColumns={}", updateCount, sql, 
record.getAfterMap().keySet(), conditionColumnNames);
+                    }
+                    break;
+                case DELETE:
+                    for (int i = 0; i < conditionColumnNames.size(); i++) {
+                        preparedStatement.setObject(i + 1, 
AnyValueConvert.convertToObject(record.getAfterMap().get(conditionColumnNames.get(i))));
+                    }
+                    preparedStatement.execute();
+                    break;
+                default:
+            }
+            preparedStatement.execute();
+        }
+    }
+    
+    private String buildSQL(final Record record) {
+        switch (record.getDataChangeType()) {
+            case INSERT:
+                return sqlBuilder.buildInsertSQL(record);
+            case UPDATE:
+                return sqlBuilder.buildUpdateSQL(record);
+            case DELETE:
+                return sqlBuilder.buildDeleteSQL(record);
+            default:
+        }
+        return null;

Review Comment:
   1, It could be put to `default:` block
   
   2, The result of `buildSQL` is not checked null or not
   
   3, It's better to return Optional<String>
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to