sandynz commented on a change in pull request #12318:
URL: https://github.com/apache/shardingsphere/pull/12318#discussion_r706121466



##########
File path: 
shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/component/OpenGaussPositionInitializer.java
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.opengauss.component;
+
+import org.apache.shardingsphere.scaling.core.job.position.PositionInitializer;
+import 
org.apache.shardingsphere.scaling.opengauss.wal.event.OpenGaussLogSequenceNumber;
+import org.apache.shardingsphere.scaling.postgresql.wal.WalPosition;
+import org.opengauss.replication.LogSequenceNumber;
+
+import javax.sql.DataSource;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+/**
+ * OpenGauss wal position initializer.
+ */
+public final class OpenGaussPositionInitializer implements PositionInitializer 
{
+    
+    @Override
+    public WalPosition init(final DataSource dataSource) throws SQLException {
+        try (Connection connection = dataSource.getConnection()) {
+            return getWalPosition(connection);
+        }
+    }
+    
+    @Override
+    public WalPosition init(final String data) {
+        return new WalPosition(new OpenGaussLogSequenceNumber(
+                LogSequenceNumber.valueOf(Long.parseLong(data)))
+        );
+    }
+
+    private WalPosition getWalPosition(final Connection connection) throws 
SQLException {
+        try (PreparedStatement ps = 
connection.prepareStatement(getSql(connection));
+             ResultSet rs = ps.executeQuery()) {
+            rs.next();
+            return new WalPosition(
+                    new OpenGaussLogSequenceNumber(
+                            LogSequenceNumber.valueOf(rs.getString(1))
+                    )
+            );

Review comment:
       Should be in one line.

##########
File path: 
shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/wal/OpenGaussLogicalReplication.java
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.opengauss.wal;
+
+import 
org.apache.shardingsphere.scaling.core.config.datasource.StandardJDBCDataSourceConfiguration;
+import 
org.apache.shardingsphere.scaling.postgresql.wal.event.LogSequenceNumberBase;
+import org.opengauss.PGProperty;
+import org.opengauss.jdbc.PgConnection;
+import org.opengauss.replication.LogSequenceNumber;
+import org.opengauss.replication.PGReplicationStream;
+import org.opengauss.util.PSQLException;
+
+import java.sql.CallableStatement;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Properties;
+
+/**
+ * OpenGauss logical replication.
+ */
+public final class OpenGaussLogicalReplication {
+
+    public static final String SLOT_NAME = "sharding_scaling";
+
+    public static final String DECODE_PLUGIN = "test_decoding";
+
+    public static final String DUPLICATE_OBJECT_ERROR_CODE = "42710";
+
+    /**
+     * Create OpenGauss connection.
+     *
+     * @param jdbcDataSourceConfig JDBC data source configuration
+     * @return OpenGauss connection
+     * @throws SQLException sql exception
+     */
+    public Connection createPgConnection(final 
StandardJDBCDataSourceConfiguration jdbcDataSourceConfig) throws SQLException {
+        return createConnection(jdbcDataSourceConfig);
+    }
+    
+    private Connection createConnection(final 
StandardJDBCDataSourceConfiguration jdbcDataSourceConfig) throws SQLException {
+        Properties props = new Properties();
+        PGProperty.USER.set(props, 
jdbcDataSourceConfig.getHikariConfig().getUsername());
+        PGProperty.PASSWORD.set(props, 
jdbcDataSourceConfig.getHikariConfig().getPassword());
+        PGProperty.ASSUME_MIN_SERVER_VERSION.set(props, "9.4");
+        PGProperty.REPLICATION.set(props, "database");
+        PGProperty.PREFER_QUERY_MODE.set(props, "simple");
+        return 
DriverManager.getConnection(jdbcDataSourceConfig.getHikariConfig().getJdbcUrl(),
 props);
+    }
+    
+    /**
+     * Create OpenGauss replication stream.
+     *
+     * @param pgConnection OpenGauss connection
+     * @param startPosition start position
+     * @return replication stream
+     * @throws SQLException sql exception
+     */
+    public PGReplicationStream createReplicationStream(final PgConnection 
pgConnection, final LogSequenceNumberBase startPosition) throws SQLException {
+        return pgConnection.getReplicationAPI()
+                .replicationStream()
+                .logical()
+                .withSlotName(SLOT_NAME)
+                .withSlotOption("include-xids", true)
+                .withSlotOption("skip-empty-xacts", true)
+                .withStartPosition((LogSequenceNumber) startPosition.get())
+                .start();
+    }
+
+    /**
+     * Drop exist slots.
+     *
+     * @param conn the datasource connection
+     * @throws SQLException the sql exp
+     */
+    public static void createIfNotExists(final Connection conn) throws 
SQLException {
+        if (isSlotNameExist(conn)) {
+            dropSlot(conn);

Review comment:
       If slot is dropped here, is replication data lost?

##########
File path: 
shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLWalDumper.java
##########
@@ -75,16 +77,19 @@ public void start() {
     }
     
     private void dump() {
-        try (Connection pgConnection = 
logicalReplication.createPgConnection((StandardJDBCDataSourceConfiguration) 
dumperConfig.getDataSourceConfig());
-             PGReplicationStream stream = 
logicalReplication.createReplicationStream(pgConnection, 
PostgreSQLPositionInitializer.SLOT_NAME, walPosition.getLogSequenceNumber())) {
-            DecodingPlugin decodingPlugin = new 
TestDecodingPlugin(pgConnection.unwrap(PgConnection.class).getTimestampUtils());
+        try {
+            Connection pgConnection = 
logicalReplication.createPgConnection((StandardJDBCDataSourceConfiguration) 
dumperConfig.getDataSourceConfig());
+            PostgreSQLTimestampUtils utils = new 
PostgreSQLTimestampUtils(pgConnection.unwrap(PgConnection.class).getTimestampUtils());
+            DecodingPlugin decodingPlugin = new TestDecodingPlugin(utils);
+            PGReplicationStream stream = 
logicalReplication.createReplicationStream(pgConnection, 
PostgreSQLPositionInitializer.SLOT_NAME, walPosition.getLogSequenceNumber());
             while (isRunning()) {
                 ByteBuffer message = stream.readPending();
                 if (null == message) {
                     ThreadUtil.sleep(10L);
                     continue;
                 }
-                AbstractWalEvent event = decodingPlugin.decode(message, 
stream.getLastReceiveLSN());
+                AbstractWalEvent event = decodingPlugin.decode(message,
+                        new 
PostgreSQLLogSequenceNumber(stream.getLastReceiveLSN()));

Review comment:
       Should be in one line.

##########
File path: 
shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/decode/PostgreSQLTimestampUtils.java
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.postgresql.wal.decode;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.Setter;
+import org.postgresql.jdbc.TimestampUtils;
+
+import java.sql.SQLException;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.Calendar;
+
+/**
+ * PostgreSQL sequence.
+ */
+@AllArgsConstructor
+@Getter
+@Setter
+public class PostgreSQLTimestampUtils implements TimestampUtilsBase {
+    
+    private TimestampUtils timestampUtils;

Review comment:
       Should be final class, and final field

##########
File path: 
shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/wal/WalPositionTest.java
##########
@@ -27,13 +28,15 @@
     
     @Test
     public void assertCompareTo() {
-        WalPosition walPosition = new 
WalPosition(LogSequenceNumber.valueOf(100L));
+        WalPosition walPosition = new WalPosition(new 
PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(100L)));
         assertThat(walPosition.compareTo(null), is(1));
-        assertThat(walPosition.compareTo(new 
WalPosition(LogSequenceNumber.valueOf(100L))), is(0));
+        assertThat(walPosition.compareTo(new WalPosition(
+                new 
PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(100L)))), is(0));
     }
     
     @Test
     public void assertToString() {
-        assertThat(new 
WalPosition(LogSequenceNumber.valueOf(100L)).toString(), is("100"));
+        assertThat(new WalPosition(
+                new 
PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(100L))).toString(), 
is("100"));

Review comment:
       Should be in one line.

##########
File path: 
shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/component/OpenGaussPositionInitializer.java
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.opengauss.component;
+
+import org.apache.shardingsphere.scaling.core.job.position.PositionInitializer;
+import 
org.apache.shardingsphere.scaling.opengauss.wal.event.OpenGaussLogSequenceNumber;
+import org.apache.shardingsphere.scaling.postgresql.wal.WalPosition;
+import org.opengauss.replication.LogSequenceNumber;
+
+import javax.sql.DataSource;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+/**
+ * OpenGauss wal position initializer.
+ */
+public final class OpenGaussPositionInitializer implements PositionInitializer 
{
+    
+    @Override
+    public WalPosition init(final DataSource dataSource) throws SQLException {
+        try (Connection connection = dataSource.getConnection()) {
+            return getWalPosition(connection);
+        }
+    }
+    
+    @Override
+    public WalPosition init(final String data) {
+        return new WalPosition(new OpenGaussLogSequenceNumber(
+                LogSequenceNumber.valueOf(Long.parseLong(data)))

Review comment:
       Should be in one line.

##########
File path: 
shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/wal/event/OpenGaussLogSequenceNumber.java
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.opengauss.wal.event;
+
+import lombok.AllArgsConstructor;
+import 
org.apache.shardingsphere.scaling.postgresql.wal.event.LogSequenceNumberBase;
+import org.opengauss.replication.LogSequenceNumber;
+
+/**
+ * OpenGauss sequence.
+ */
+@AllArgsConstructor
+public class OpenGaussLogSequenceNumber implements LogSequenceNumberBase {
+    
+    private LogSequenceNumber logSequenceNumber;

Review comment:
       Should be final class, and final field

##########
File path: 
shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLPositionInitializer.java
##########
@@ -51,7 +52,9 @@ public WalPosition init(final DataSource dataSource) throws 
SQLException {
     
     @Override
     public WalPosition init(final String data) {
-        return new 
WalPosition(LogSequenceNumber.valueOf(Long.parseLong(data)));
+        return new WalPosition(
+                new 
PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(Long.parseLong(data)))
+        );

Review comment:
       Should be in one line.

##########
File path: 
shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLPositionInitializer.java
##########
@@ -83,7 +86,11 @@ private WalPosition getWalPosition(final Connection 
connection) throws SQLExcept
         try (PreparedStatement ps = 
connection.prepareStatement(getSql(connection));
              ResultSet rs = ps.executeQuery()) {
             rs.next();
-            return new WalPosition(LogSequenceNumber.valueOf(rs.getString(1)));
+            return new WalPosition(
+                    new PostgreSQLLogSequenceNumber(
+                            LogSequenceNumber.valueOf(rs.getString(1))
+                    )
+            );

Review comment:
       Should be in one line.

##########
File path: 
shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLWalDumper.java
##########
@@ -75,16 +77,19 @@ public void start() {
     }
     
     private void dump() {
-        try (Connection pgConnection = 
logicalReplication.createPgConnection((StandardJDBCDataSourceConfiguration) 
dumperConfig.getDataSourceConfig());
-             PGReplicationStream stream = 
logicalReplication.createReplicationStream(pgConnection, 
PostgreSQLPositionInitializer.SLOT_NAME, walPosition.getLogSequenceNumber())) {
-            DecodingPlugin decodingPlugin = new 
TestDecodingPlugin(pgConnection.unwrap(PgConnection.class).getTimestampUtils());
+        try {
+            Connection pgConnection = 
logicalReplication.createPgConnection((StandardJDBCDataSourceConfiguration) 
dumperConfig.getDataSourceConfig());
+            PostgreSQLTimestampUtils utils = new 
PostgreSQLTimestampUtils(pgConnection.unwrap(PgConnection.class).getTimestampUtils());
+            DecodingPlugin decodingPlugin = new TestDecodingPlugin(utils);
+            PGReplicationStream stream = 
logicalReplication.createReplicationStream(pgConnection, 
PostgreSQLPositionInitializer.SLOT_NAME, walPosition.getLogSequenceNumber());

Review comment:
       `pgConnection` and `stream` should be in `try-resource` block for 
auto-closed, else it will cause resource leak, and slot could not be dropped 
when scaling task finished.

##########
File path: 
shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/decode/TimestampUtilsBase.java
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.postgresql.wal.decode;
+
+import java.sql.SQLException;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.Calendar;
+
+/**
+ * logical replication decoding plugin interface.
+ */
+public interface TimestampUtilsBase {
+    
+    /**
+     * Get time.
+     *
+     * @param cal the cal
+     * @param s the input
+     * @return Time the time
+     * @throws SQLException the exp
+     */
+    Time toTime(Calendar cal, String s) throws SQLException;

Review comment:
       Parameter name should be meaningful, and also `toTimestamp` method and 
sub-classes

##########
File path: 
shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/wal/event/OpenGaussLogSequenceNumber.java
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.opengauss.wal.event;

Review comment:
       It might be better not put it in `event` package

##########
File path: 
shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/wal/decode/OpenGaussTimestampUtils.java
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.opengauss.wal.decode;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.Setter;
+import 
org.apache.shardingsphere.scaling.postgresql.wal.decode.TimestampUtilsBase;
+import org.opengauss.jdbc.TimestampUtils;
+
+import java.sql.SQLException;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.Calendar;
+
+/**
+ * OpenGauss timestamputils.
+ */
+@AllArgsConstructor
+@Getter
+@Setter
+public class OpenGaussTimestampUtils implements TimestampUtilsBase {
+    
+    private TimestampUtils timestampUtils;

Review comment:
       Should be final class, and final field

##########
File path: 
shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/event/LogSequenceNumberBase.java
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.postgresql.wal.event;
+
+/**
+ * logical replication decoding plugin interface.

Review comment:
       Class comment seems wrong

##########
File path: 
shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/decode/TimestampUtilsBase.java
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.postgresql.wal.decode;
+
+import java.sql.SQLException;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.Calendar;
+
+/**
+ * logical replication decoding plugin interface.
+ */
+public interface TimestampUtilsBase {

Review comment:
       Put `Base` at beginning of class name might be better

##########
File path: 
shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/event/LogSequenceNumberBase.java
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.postgresql.wal.event;
+
+/**
+ * logical replication decoding plugin interface.
+ */
+public interface LogSequenceNumberBase {
+    

Review comment:
       Put `Base` at beginning of class name might be better as convention

##########
File path: 
shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/event/LogSequenceNumberBase.java
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.postgresql.wal.event;
+
+/**
+ * logical replication decoding plugin interface.
+ */
+public interface LogSequenceNumberBase {
+    
+    /**
+     * Decode wal event from logical replication data.
+     *
+     * @return Long
+     */
+    long asLong();
+
+    /**
+     * Get the binded object.
+     *
+     * @return Long

Review comment:
       `return` comment should be meaningful

##########
File path: 
shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/wal/WalPositionTest.java
##########
@@ -27,13 +28,15 @@
     
     @Test
     public void assertCompareTo() {
-        WalPosition walPosition = new 
WalPosition(LogSequenceNumber.valueOf(100L));
+        WalPosition walPosition = new WalPosition(new 
PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(100L)));
         assertThat(walPosition.compareTo(null), is(1));
-        assertThat(walPosition.compareTo(new 
WalPosition(LogSequenceNumber.valueOf(100L))), is(0));
+        assertThat(walPosition.compareTo(new WalPosition(
+                new 
PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(100L)))), is(0));

Review comment:
       Should be in one line.

##########
File path: 
shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/event/PostgreSQLLogSequenceNumber.java
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.scaling.postgresql.wal.event;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.Setter;
+import org.postgresql.replication.LogSequenceNumber;
+
+/**
+ * PostgreSQL sequence.
+ */
+@AllArgsConstructor
+@Getter
+@Setter
+public class PostgreSQLLogSequenceNumber implements LogSequenceNumberBase {
+    
+    private LogSequenceNumber logSequenceNumber;

Review comment:
       Should be final class, final field

##########
File path: 
shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/wal/LogicalReplicationTest.java
##########
@@ -79,14 +81,18 @@ public void assertCreateReplicationStreamSuccess() throws 
SQLException {
         
when(chainedLogicalStreamBuilder.withStartPosition(startPosition)).thenReturn(chainedLogicalStreamBuilder);
         
when(chainedLogicalStreamBuilder.withSlotName("")).thenReturn(chainedLogicalStreamBuilder);
         when(chainedLogicalStreamBuilder.withSlotOption(anyString(), 
eq(true))).thenReturn(chainedLogicalStreamBuilder, chainedLogicalStreamBuilder);
-        logicalReplication.createReplicationStream(pgConnection, "", 
startPosition);
+
+        LogSequenceNumberBase basePosition = new 
PostgreSQLLogSequenceNumber(startPosition);
+        logicalReplication.createReplicationStream(pgConnection, "", 
basePosition);
         verify(chainedLogicalStreamBuilder).start();
     }
     
     @Test(expected = SQLException.class)
     @SneakyThrows(SQLException.class)
     public void assertCreateReplicationStreamFailure() {
         when(pgConnection.unwrap(PGConnection.class)).thenThrow(new 
SQLException(""));
-        logicalReplication.createReplicationStream(pgConnection, "", 
LogSequenceNumber.valueOf(100L));
+        logicalReplication.createReplicationStream(pgConnection,
+                "",
+                new 
PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(100L)));

Review comment:
       Should be in one line.

##########
File path: 
shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLWalDumperTest.java
##########
@@ -70,7 +71,8 @@
     @Before
     public void setUp() {
         ScalingContext.getInstance().init(new ServerConfiguration());
-        position = new WalPosition(LogSequenceNumber.valueOf(100L));
+        position = new WalPosition(
+                new 
PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(100L)));

Review comment:
       Should be in one line.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to