sandynz commented on a change in pull request #12318: URL: https://github.com/apache/shardingsphere/pull/12318#discussion_r706121466
########## File path: shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/component/OpenGaussPositionInitializer.java ########## @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.scaling.opengauss.component; + +import org.apache.shardingsphere.scaling.core.job.position.PositionInitializer; +import org.apache.shardingsphere.scaling.opengauss.wal.event.OpenGaussLogSequenceNumber; +import org.apache.shardingsphere.scaling.postgresql.wal.WalPosition; +import org.opengauss.replication.LogSequenceNumber; + +import javax.sql.DataSource; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; + +/** + * OpenGauss wal position initializer. + */ +public final class OpenGaussPositionInitializer implements PositionInitializer { + + @Override + public WalPosition init(final DataSource dataSource) throws SQLException { + try (Connection connection = dataSource.getConnection()) { + return getWalPosition(connection); + } + } + + @Override + public WalPosition init(final String data) { + return new WalPosition(new OpenGaussLogSequenceNumber( + LogSequenceNumber.valueOf(Long.parseLong(data))) + ); + } + + private WalPosition getWalPosition(final Connection connection) throws SQLException { + try (PreparedStatement ps = connection.prepareStatement(getSql(connection)); + ResultSet rs = ps.executeQuery()) { + rs.next(); + return new WalPosition( + new OpenGaussLogSequenceNumber( + LogSequenceNumber.valueOf(rs.getString(1)) + ) + ); Review comment: Should be in one line. ########## File path: shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/wal/OpenGaussLogicalReplication.java ########## @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.scaling.opengauss.wal; + +import org.apache.shardingsphere.scaling.core.config.datasource.StandardJDBCDataSourceConfiguration; +import org.apache.shardingsphere.scaling.postgresql.wal.event.LogSequenceNumberBase; +import org.opengauss.PGProperty; +import org.opengauss.jdbc.PgConnection; +import org.opengauss.replication.LogSequenceNumber; +import org.opengauss.replication.PGReplicationStream; +import org.opengauss.util.PSQLException; + +import java.sql.CallableStatement; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Properties; + +/** + * OpenGauss logical replication. + */ +public final class OpenGaussLogicalReplication { + + public static final String SLOT_NAME = "sharding_scaling"; + + public static final String DECODE_PLUGIN = "test_decoding"; + + public static final String DUPLICATE_OBJECT_ERROR_CODE = "42710"; + + /** + * Create OpenGauss connection. + * + * @param jdbcDataSourceConfig JDBC data source configuration + * @return OpenGauss connection + * @throws SQLException sql exception + */ + public Connection createPgConnection(final StandardJDBCDataSourceConfiguration jdbcDataSourceConfig) throws SQLException { + return createConnection(jdbcDataSourceConfig); + } + + private Connection createConnection(final StandardJDBCDataSourceConfiguration jdbcDataSourceConfig) throws SQLException { + Properties props = new Properties(); + PGProperty.USER.set(props, jdbcDataSourceConfig.getHikariConfig().getUsername()); + PGProperty.PASSWORD.set(props, jdbcDataSourceConfig.getHikariConfig().getPassword()); + PGProperty.ASSUME_MIN_SERVER_VERSION.set(props, "9.4"); + PGProperty.REPLICATION.set(props, "database"); + PGProperty.PREFER_QUERY_MODE.set(props, "simple"); + return DriverManager.getConnection(jdbcDataSourceConfig.getHikariConfig().getJdbcUrl(), props); + } + + /** + * Create OpenGauss replication stream. + * + * @param pgConnection OpenGauss connection + * @param startPosition start position + * @return replication stream + * @throws SQLException sql exception + */ + public PGReplicationStream createReplicationStream(final PgConnection pgConnection, final LogSequenceNumberBase startPosition) throws SQLException { + return pgConnection.getReplicationAPI() + .replicationStream() + .logical() + .withSlotName(SLOT_NAME) + .withSlotOption("include-xids", true) + .withSlotOption("skip-empty-xacts", true) + .withStartPosition((LogSequenceNumber) startPosition.get()) + .start(); + } + + /** + * Drop exist slots. + * + * @param conn the datasource connection + * @throws SQLException the sql exp + */ + public static void createIfNotExists(final Connection conn) throws SQLException { + if (isSlotNameExist(conn)) { + dropSlot(conn); Review comment: If slot is dropped here, is replication data lost? ########## File path: shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLWalDumper.java ########## @@ -75,16 +77,19 @@ public void start() { } private void dump() { - try (Connection pgConnection = logicalReplication.createPgConnection((StandardJDBCDataSourceConfiguration) dumperConfig.getDataSourceConfig()); - PGReplicationStream stream = logicalReplication.createReplicationStream(pgConnection, PostgreSQLPositionInitializer.SLOT_NAME, walPosition.getLogSequenceNumber())) { - DecodingPlugin decodingPlugin = new TestDecodingPlugin(pgConnection.unwrap(PgConnection.class).getTimestampUtils()); + try { + Connection pgConnection = logicalReplication.createPgConnection((StandardJDBCDataSourceConfiguration) dumperConfig.getDataSourceConfig()); + PostgreSQLTimestampUtils utils = new PostgreSQLTimestampUtils(pgConnection.unwrap(PgConnection.class).getTimestampUtils()); + DecodingPlugin decodingPlugin = new TestDecodingPlugin(utils); + PGReplicationStream stream = logicalReplication.createReplicationStream(pgConnection, PostgreSQLPositionInitializer.SLOT_NAME, walPosition.getLogSequenceNumber()); while (isRunning()) { ByteBuffer message = stream.readPending(); if (null == message) { ThreadUtil.sleep(10L); continue; } - AbstractWalEvent event = decodingPlugin.decode(message, stream.getLastReceiveLSN()); + AbstractWalEvent event = decodingPlugin.decode(message, + new PostgreSQLLogSequenceNumber(stream.getLastReceiveLSN())); Review comment: Should be in one line. ########## File path: shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/decode/PostgreSQLTimestampUtils.java ########## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.scaling.postgresql.wal.decode; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.Setter; +import org.postgresql.jdbc.TimestampUtils; + +import java.sql.SQLException; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.Calendar; + +/** + * PostgreSQL sequence. + */ +@AllArgsConstructor +@Getter +@Setter +public class PostgreSQLTimestampUtils implements TimestampUtilsBase { + + private TimestampUtils timestampUtils; Review comment: Should be final class, and final field ########## File path: shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/wal/WalPositionTest.java ########## @@ -27,13 +28,15 @@ @Test public void assertCompareTo() { - WalPosition walPosition = new WalPosition(LogSequenceNumber.valueOf(100L)); + WalPosition walPosition = new WalPosition(new PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(100L))); assertThat(walPosition.compareTo(null), is(1)); - assertThat(walPosition.compareTo(new WalPosition(LogSequenceNumber.valueOf(100L))), is(0)); + assertThat(walPosition.compareTo(new WalPosition( + new PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(100L)))), is(0)); } @Test public void assertToString() { - assertThat(new WalPosition(LogSequenceNumber.valueOf(100L)).toString(), is("100")); + assertThat(new WalPosition( + new PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(100L))).toString(), is("100")); Review comment: Should be in one line. ########## File path: shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/component/OpenGaussPositionInitializer.java ########## @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.scaling.opengauss.component; + +import org.apache.shardingsphere.scaling.core.job.position.PositionInitializer; +import org.apache.shardingsphere.scaling.opengauss.wal.event.OpenGaussLogSequenceNumber; +import org.apache.shardingsphere.scaling.postgresql.wal.WalPosition; +import org.opengauss.replication.LogSequenceNumber; + +import javax.sql.DataSource; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; + +/** + * OpenGauss wal position initializer. + */ +public final class OpenGaussPositionInitializer implements PositionInitializer { + + @Override + public WalPosition init(final DataSource dataSource) throws SQLException { + try (Connection connection = dataSource.getConnection()) { + return getWalPosition(connection); + } + } + + @Override + public WalPosition init(final String data) { + return new WalPosition(new OpenGaussLogSequenceNumber( + LogSequenceNumber.valueOf(Long.parseLong(data))) Review comment: Should be in one line. ########## File path: shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/wal/event/OpenGaussLogSequenceNumber.java ########## @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.scaling.opengauss.wal.event; + +import lombok.AllArgsConstructor; +import org.apache.shardingsphere.scaling.postgresql.wal.event.LogSequenceNumberBase; +import org.opengauss.replication.LogSequenceNumber; + +/** + * OpenGauss sequence. + */ +@AllArgsConstructor +public class OpenGaussLogSequenceNumber implements LogSequenceNumberBase { + + private LogSequenceNumber logSequenceNumber; Review comment: Should be final class, and final field ########## File path: shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLPositionInitializer.java ########## @@ -51,7 +52,9 @@ public WalPosition init(final DataSource dataSource) throws SQLException { @Override public WalPosition init(final String data) { - return new WalPosition(LogSequenceNumber.valueOf(Long.parseLong(data))); + return new WalPosition( + new PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(Long.parseLong(data))) + ); Review comment: Should be in one line. ########## File path: shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLPositionInitializer.java ########## @@ -83,7 +86,11 @@ private WalPosition getWalPosition(final Connection connection) throws SQLExcept try (PreparedStatement ps = connection.prepareStatement(getSql(connection)); ResultSet rs = ps.executeQuery()) { rs.next(); - return new WalPosition(LogSequenceNumber.valueOf(rs.getString(1))); + return new WalPosition( + new PostgreSQLLogSequenceNumber( + LogSequenceNumber.valueOf(rs.getString(1)) + ) + ); Review comment: Should be in one line. ########## File path: shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLWalDumper.java ########## @@ -75,16 +77,19 @@ public void start() { } private void dump() { - try (Connection pgConnection = logicalReplication.createPgConnection((StandardJDBCDataSourceConfiguration) dumperConfig.getDataSourceConfig()); - PGReplicationStream stream = logicalReplication.createReplicationStream(pgConnection, PostgreSQLPositionInitializer.SLOT_NAME, walPosition.getLogSequenceNumber())) { - DecodingPlugin decodingPlugin = new TestDecodingPlugin(pgConnection.unwrap(PgConnection.class).getTimestampUtils()); + try { + Connection pgConnection = logicalReplication.createPgConnection((StandardJDBCDataSourceConfiguration) dumperConfig.getDataSourceConfig()); + PostgreSQLTimestampUtils utils = new PostgreSQLTimestampUtils(pgConnection.unwrap(PgConnection.class).getTimestampUtils()); + DecodingPlugin decodingPlugin = new TestDecodingPlugin(utils); + PGReplicationStream stream = logicalReplication.createReplicationStream(pgConnection, PostgreSQLPositionInitializer.SLOT_NAME, walPosition.getLogSequenceNumber()); Review comment: `pgConnection` and `stream` should be in `try-resource` block for auto-closed, else it will cause resource leak, and slot could not be dropped when scaling task finished. ########## File path: shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/decode/TimestampUtilsBase.java ########## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.scaling.postgresql.wal.decode; + +import java.sql.SQLException; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.Calendar; + +/** + * logical replication decoding plugin interface. + */ +public interface TimestampUtilsBase { + + /** + * Get time. + * + * @param cal the cal + * @param s the input + * @return Time the time + * @throws SQLException the exp + */ + Time toTime(Calendar cal, String s) throws SQLException; Review comment: Parameter name should be meaningful, and also `toTimestamp` method and sub-classes ########## File path: shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/wal/event/OpenGaussLogSequenceNumber.java ########## @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.scaling.opengauss.wal.event; Review comment: It might be better not put it in `event` package ########## File path: shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-opengauss/src/main/java/org/apache/shardingsphere/scaling/opengauss/wal/decode/OpenGaussTimestampUtils.java ########## @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.scaling.opengauss.wal.decode; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.Setter; +import org.apache.shardingsphere.scaling.postgresql.wal.decode.TimestampUtilsBase; +import org.opengauss.jdbc.TimestampUtils; + +import java.sql.SQLException; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.Calendar; + +/** + * OpenGauss timestamputils. + */ +@AllArgsConstructor +@Getter +@Setter +public class OpenGaussTimestampUtils implements TimestampUtilsBase { + + private TimestampUtils timestampUtils; Review comment: Should be final class, and final field ########## File path: shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/event/LogSequenceNumberBase.java ########## @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.scaling.postgresql.wal.event; + +/** + * logical replication decoding plugin interface. Review comment: Class comment seems wrong ########## File path: shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/decode/TimestampUtilsBase.java ########## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.scaling.postgresql.wal.decode; + +import java.sql.SQLException; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.Calendar; + +/** + * logical replication decoding plugin interface. + */ +public interface TimestampUtilsBase { Review comment: Put `Base` at beginning of class name might be better ########## File path: shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/event/LogSequenceNumberBase.java ########## @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.scaling.postgresql.wal.event; + +/** + * logical replication decoding plugin interface. + */ +public interface LogSequenceNumberBase { + Review comment: Put `Base` at beginning of class name might be better as convention ########## File path: shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/event/LogSequenceNumberBase.java ########## @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.scaling.postgresql.wal.event; + +/** + * logical replication decoding plugin interface. + */ +public interface LogSequenceNumberBase { + + /** + * Decode wal event from logical replication data. + * + * @return Long + */ + long asLong(); + + /** + * Get the binded object. + * + * @return Long Review comment: `return` comment should be meaningful ########## File path: shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/wal/WalPositionTest.java ########## @@ -27,13 +28,15 @@ @Test public void assertCompareTo() { - WalPosition walPosition = new WalPosition(LogSequenceNumber.valueOf(100L)); + WalPosition walPosition = new WalPosition(new PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(100L))); assertThat(walPosition.compareTo(null), is(1)); - assertThat(walPosition.compareTo(new WalPosition(LogSequenceNumber.valueOf(100L))), is(0)); + assertThat(walPosition.compareTo(new WalPosition( + new PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(100L)))), is(0)); Review comment: Should be in one line. ########## File path: shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/main/java/org/apache/shardingsphere/scaling/postgresql/wal/event/PostgreSQLLogSequenceNumber.java ########## @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.shardingsphere.scaling.postgresql.wal.event; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.Setter; +import org.postgresql.replication.LogSequenceNumber; + +/** + * PostgreSQL sequence. + */ +@AllArgsConstructor +@Getter +@Setter +public class PostgreSQLLogSequenceNumber implements LogSequenceNumberBase { + + private LogSequenceNumber logSequenceNumber; Review comment: Should be final class, final field ########## File path: shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/wal/LogicalReplicationTest.java ########## @@ -79,14 +81,18 @@ public void assertCreateReplicationStreamSuccess() throws SQLException { when(chainedLogicalStreamBuilder.withStartPosition(startPosition)).thenReturn(chainedLogicalStreamBuilder); when(chainedLogicalStreamBuilder.withSlotName("")).thenReturn(chainedLogicalStreamBuilder); when(chainedLogicalStreamBuilder.withSlotOption(anyString(), eq(true))).thenReturn(chainedLogicalStreamBuilder, chainedLogicalStreamBuilder); - logicalReplication.createReplicationStream(pgConnection, "", startPosition); + + LogSequenceNumberBase basePosition = new PostgreSQLLogSequenceNumber(startPosition); + logicalReplication.createReplicationStream(pgConnection, "", basePosition); verify(chainedLogicalStreamBuilder).start(); } @Test(expected = SQLException.class) @SneakyThrows(SQLException.class) public void assertCreateReplicationStreamFailure() { when(pgConnection.unwrap(PGConnection.class)).thenThrow(new SQLException("")); - logicalReplication.createReplicationStream(pgConnection, "", LogSequenceNumber.valueOf(100L)); + logicalReplication.createReplicationStream(pgConnection, + "", + new PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(100L))); Review comment: Should be in one line. ########## File path: shardingsphere-scaling/shardingsphere-scaling-dialect/shardingsphere-scaling-postgresql/src/test/java/org/apache/shardingsphere/scaling/postgresql/component/PostgreSQLWalDumperTest.java ########## @@ -70,7 +71,8 @@ @Before public void setUp() { ScalingContext.getInstance().init(new ServerConfiguration()); - position = new WalPosition(LogSequenceNumber.valueOf(100L)); + position = new WalPosition( + new PostgreSQLLogSequenceNumber(LogSequenceNumber.valueOf(100L))); Review comment: Should be in one line. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
