This is an automated email from the ASF dual-hosted git repository. yx9o pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push: new 375e18ff910 Remove DatabaseDiscoveryType.updateMemberState() (#17048) 375e18ff910 is described below commit 375e18ff9103f992e84298c02c9f9fff6b4bede2 Author: Liang Zhang <zhangli...@apache.org> AuthorDate: Sun Apr 24 15:02:39 2022 +0800 Remove DatabaseDiscoveryType.updateMemberState() (#17048) * Refactor DatabaseDiscoveryType.updateMemberState() * Refactor MGRMySQLDatabaseDiscoveryType.updateMemberState() * Refactor MGRMySQLDatabaseDiscoveryType.updateMemberState() * Refactor MGRMySQLDatabaseDiscoveryType * Refactor OpenGaussNormalReplicationDatabaseDiscoveryType * Refactor ReplicationDatabaseDiscoveryType.updateMemberState() * Remove DatabaseDiscoveryType.updateMemberState() * Remove DatabaseDiscoveryType.updateMemberState() --- .../dbdiscovery/spi/DatabaseDiscoveryType.java | 12 ++-- .../algorithm/DatabaseDiscoveryEngine.java | 18 ++++-- .../dbdiscovery/heartbeat/HeartbeatJob.java | 6 +- .../dbdiscovery/rule/DatabaseDiscoveryRule.java | 4 +- .../fixture/CoreFixtureDatabaseDiscoveryType.java | 6 +- .../DistSQLFixtureDatabaseDiscoveryType.java | 6 +- .../type/mgr/MGRMySQLDatabaseDiscoveryType.java | 24 +------- ...ormalReplicationMySQLDatabaseDiscoveryType.java | 18 +----- .../type/mgr/MGRDatabaseDiscoveryTypeTest.java | 64 +++++----------------- ...aussNormalReplicationDatabaseDiscoveryType.java | 11 +--- 10 files changed, 53 insertions(+), 116 deletions(-) diff --git a/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-api/src/main/java/org/apache/shardingsphere/dbdiscovery/spi/DatabaseDiscoveryType.java b/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-api/src/main/java/org/apache/shardingsphere/dbdiscovery/spi/DatabaseDiscoveryType.java index c35dc1a2f1e..fa53a658e68 100644 --- a/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-api/src/main/java/org/apache/shardingsphere/dbdiscovery/spi/DatabaseDiscoveryType.java +++ b/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-api/src/main/java/org/apache/shardingsphere/dbdiscovery/spi/DatabaseDiscoveryType.java @@ -19,6 +19,7 @@ package org.apache.shardingsphere.dbdiscovery.spi; import org.apache.shardingsphere.dbdiscovery.spi.status.HighlyAvailableStatus; import org.apache.shardingsphere.infra.config.algorithm.ShardingSphereAlgorithm; +import org.apache.shardingsphere.infra.storage.StorageNodeDataSource; import javax.sql.DataSource; import java.sql.SQLException; @@ -48,13 +49,12 @@ public interface DatabaseDiscoveryType extends ShardingSphereAlgorithm { Optional<String> findPrimaryDataSourceName(Map<String, DataSource> dataSourceMap); /** - * Update member state. - * - * @param databaseName database name - * @param dataSourceMap data source map - * @param groupName group name + * Get storage node data source. + * + * @param replicaDataSource replica data source + * @return storage node data source */ - void updateMemberState(String databaseName, Map<String, DataSource> dataSourceMap, String groupName); + StorageNodeDataSource getStorageNodeDataSource(DataSource replicaDataSource); /** * Get primary data source. diff --git a/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/main/java/org/apache/shardingsphere/dbdiscovery/algorithm/DatabaseDiscoveryEngine.java b/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/main/java/org/apache/shardingsphere/dbdiscovery/algorithm/DatabaseDiscoveryEngine.java index 4fd393f9a91..30623910dcb 100644 --- a/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/main/java/org/apache/shardingsphere/dbdiscovery/algorithm/DatabaseDiscoveryEngine.java +++ b/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/main/java/org/apache/shardingsphere/dbdiscovery/algorithm/DatabaseDiscoveryEngine.java @@ -25,6 +25,7 @@ import org.apache.shardingsphere.dbdiscovery.spi.status.HighlyAvailableStatus; import org.apache.shardingsphere.dbdiscovery.spi.status.RoleSeparatedHighlyAvailableStatus; import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus; import org.apache.shardingsphere.infra.metadata.schema.QualifiedDatabase; +import org.apache.shardingsphere.infra.rule.event.impl.DataSourceDisabledEvent; import org.apache.shardingsphere.infra.rule.event.impl.PrimaryDataSourceChangedEvent; import javax.sql.DataSource; @@ -86,19 +87,28 @@ public final class DatabaseDiscoveryEngine { * Update primary data source. * * @param databaseName database name + * @param groupName group name * @param dataSourceMap data source map * @param disabledDataSourceNames disabled data source names - * @param groupName group name * @return updated primary data source name */ - public String updatePrimaryDataSource(final String databaseName, final Map<String, DataSource> dataSourceMap, final Collection<String> disabledDataSourceNames, final String groupName) { + public String updatePrimaryDataSource(final String databaseName, final String groupName, final Map<String, DataSource> dataSourceMap, final Collection<String> disabledDataSourceNames) { Optional<String> newPrimaryDataSourceName = databaseDiscoveryType.findPrimaryDataSourceName(getActiveDataSourceMap(dataSourceMap, disabledDataSourceNames)); if (newPrimaryDataSourceName.isPresent() && !newPrimaryDataSourceName.get().equals(databaseDiscoveryType.getPrimaryDataSource())) { databaseDiscoveryType.setPrimaryDataSource(newPrimaryDataSourceName.get()); ShardingSphereEventBus.getInstance().post(new PrimaryDataSourceChangedEvent(new QualifiedDatabase(databaseName, groupName, newPrimaryDataSourceName.get()))); } - databaseDiscoveryType.updateMemberState(databaseName, dataSourceMap, groupName); - return newPrimaryDataSourceName.orElseGet(databaseDiscoveryType::getPrimaryDataSource); + String result = newPrimaryDataSourceName.orElseGet(databaseDiscoveryType::getPrimaryDataSource); + postReplicaDataSourceDisabledEvent(databaseName, groupName, result, dataSourceMap); + return result; + } + + private void postReplicaDataSourceDisabledEvent(final String databaseName, final String groupName, final String primaryDataSourceName, final Map<String, DataSource> dataSourceMap) { + for (Entry<String, DataSource> entry : dataSourceMap.entrySet()) { + if (!entry.getKey().equals(primaryDataSourceName)) { + ShardingSphereEventBus.getInstance().post(new DataSourceDisabledEvent(databaseName, groupName, entry.getKey(), databaseDiscoveryType.getStorageNodeDataSource(entry.getValue()))); + } + } } private Map<String, DataSource> getActiveDataSourceMap(final Map<String, DataSource> dataSourceMap, final Collection<String> disabledDataSourceNames) { diff --git a/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/main/java/org/apache/shardingsphere/dbdiscovery/heartbeat/HeartbeatJob.java b/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/main/java/org/apache/shardingsphere/dbdiscovery/heartbeat/HeartbeatJob.java index 4b925f52057..eb1b2ce3cc3 100644 --- a/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/main/java/org/apache/shardingsphere/dbdiscovery/heartbeat/HeartbeatJob.java +++ b/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/main/java/org/apache/shardingsphere/dbdiscovery/heartbeat/HeartbeatJob.java @@ -35,16 +35,16 @@ public final class HeartbeatJob implements SimpleJob { private final String databaseName; - private final Map<String, DataSource> dataSourceMap; - private final String groupName; + private final Map<String, DataSource> dataSourceMap; + private final DatabaseDiscoveryType databaseDiscoveryType; private final Collection<String> disabledDataSourceNames; @Override public void execute(final ShardingContext shardingContext) { - new DatabaseDiscoveryEngine(databaseDiscoveryType).updatePrimaryDataSource(databaseName, dataSourceMap, disabledDataSourceNames, groupName); + new DatabaseDiscoveryEngine(databaseDiscoveryType).updatePrimaryDataSource(databaseName, groupName, dataSourceMap, disabledDataSourceNames); } } diff --git a/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/main/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryRule.java b/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/main/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryRule.java index 662e6b4af41..3819d2696d6 100644 --- a/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/main/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryRule.java +++ b/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/main/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryRule.java @@ -115,7 +115,7 @@ public final class DatabaseDiscoveryRule implements SchemaRule, DataSourceContai } catch (final SQLException ex) { throw new ShardingSphereException(ex); } - dataSourceRule.updatePrimaryDataSourceName(engine.updatePrimaryDataSource(databaseName, originalDataSourceMap, dataSourceRule.getDisabledDataSourceNames(), groupName)); + dataSourceRule.updatePrimaryDataSourceName(engine.updatePrimaryDataSource(databaseName, groupName, originalDataSourceMap, dataSourceRule.getDisabledDataSourceNames())); } } @@ -190,7 +190,7 @@ public final class DatabaseDiscoveryRule implements SchemaRule, DataSourceContai Map<String, DataSource> dataSources = dataSourceMap.entrySet().stream().filter(dataSource -> !entry.getValue().getDisabledDataSourceNames().contains(dataSource.getKey())) .collect(Collectors.toMap(Entry::getKey, Entry::getValue)); String jobName = entry.getValue().getDatabaseDiscoveryType().getType() + "-" + databaseName + "-" + entry.getValue().getGroupName(); - CronJob job = new CronJob(jobName, each -> new HeartbeatJob(databaseName, dataSources, entry.getValue().getGroupName(), entry.getValue().getDatabaseDiscoveryType(), + CronJob job = new CronJob(jobName, each -> new HeartbeatJob(databaseName, entry.getValue().getGroupName(), dataSources, entry.getValue().getDatabaseDiscoveryType(), entry.getValue().getDisabledDataSourceNames()).execute(null), entry.getValue().getHeartbeatProps().getProperty("keep-alive-cron")); modeScheduleContext.get().startCronJob(job); diff --git a/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/test/java/org/apache/shardingsphere/dbdiscovery/fixture/CoreFixtureDatabaseDiscoveryType.java b/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/test/java/org/apache/shardingsphere/dbdiscovery/fixture/CoreFixtureDatabaseDiscoveryType.java index 84a0fd823a2..6ea467190b9 100644 --- a/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/test/java/org/apache/shardingsphere/dbdiscovery/fixture/CoreFixtureDatabaseDiscoveryType.java +++ b/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-core/src/test/java/org/apache/shardingsphere/dbdiscovery/fixture/CoreFixtureDatabaseDiscoveryType.java @@ -19,6 +19,9 @@ package org.apache.shardingsphere.dbdiscovery.fixture; import org.apache.shardingsphere.dbdiscovery.spi.DatabaseDiscoveryType; import org.apache.shardingsphere.dbdiscovery.spi.status.HighlyAvailableStatus; +import org.apache.shardingsphere.infra.storage.StorageNodeDataSource; +import org.apache.shardingsphere.infra.storage.StorageNodeRole; +import org.apache.shardingsphere.infra.storage.StorageNodeStatus; import javax.sql.DataSource; import java.util.Map; @@ -39,7 +42,8 @@ public final class CoreFixtureDatabaseDiscoveryType implements DatabaseDiscovery } @Override - public void updateMemberState(final String databaseName, final Map<String, DataSource> dataSourceMap, final String groupName) { + public StorageNodeDataSource getStorageNodeDataSource(final DataSource replicaDataSource) { + return new StorageNodeDataSource(StorageNodeRole.MEMBER, StorageNodeStatus.ENABLED); } @Override diff --git a/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-distsql/shardingsphere-db-discovery-distsql-handler/src/test/java/org/apache/shardingsphere/dbdiscovery/distsql/handler/fixture/DistSQLFixtureDatabaseDiscoveryType.java b/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-distsql/shardingsphere-db-discovery-distsql-handler/src/test/java/org/apache/shardingsphere/dbdiscovery/distsql/handler/fixture/DistSQLFixtureDatabas [...] index 31c7f953764..9557363ae2c 100644 --- a/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-distsql/shardingsphere-db-discovery-distsql-handler/src/test/java/org/apache/shardingsphere/dbdiscovery/distsql/handler/fixture/DistSQLFixtureDatabaseDiscoveryType.java +++ b/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-distsql/shardingsphere-db-discovery-distsql-handler/src/test/java/org/apache/shardingsphere/dbdiscovery/distsql/handler/fixture/DistSQLFixtureDatabaseDiscoveryType.java @@ -19,6 +19,9 @@ package org.apache.shardingsphere.dbdiscovery.distsql.handler.fixture; import org.apache.shardingsphere.dbdiscovery.spi.DatabaseDiscoveryType; import org.apache.shardingsphere.dbdiscovery.spi.status.HighlyAvailableStatus; +import org.apache.shardingsphere.infra.storage.StorageNodeDataSource; +import org.apache.shardingsphere.infra.storage.StorageNodeRole; +import org.apache.shardingsphere.infra.storage.StorageNodeStatus; import javax.sql.DataSource; import java.util.Map; @@ -39,7 +42,8 @@ public final class DistSQLFixtureDatabaseDiscoveryType implements DatabaseDiscov } @Override - public void updateMemberState(final String databaseName, final Map<String, DataSource> dataSourceMap, final String groupName) { + public StorageNodeDataSource getStorageNodeDataSource(final DataSource replicaDataSource) { + return new StorageNodeDataSource(StorageNodeRole.MEMBER, StorageNodeStatus.ENABLED); } @Override diff --git a/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-provider/shardingsphere-db-discovery-mysql/src/main/java/org/apache/shardingsphere/dbdiscovery/mysql/type/mgr/MGRMySQLDatabaseDiscoveryType.java b/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-provider/shardingsphere-db-discovery-mysql/src/main/java/org/apache/shardingsphere/dbdiscovery/mysql/type/mgr/MGRMySQLDatabaseDiscoveryType.java index c0eee856aab..99b3b27ec87 100644 --- a/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-provider/shardingsphere-db-discovery-mysql/src/main/java/org/apache/shardingsphere/dbdiscovery/mysql/type/mgr/MGRMySQLDatabaseDiscoveryType.java +++ b/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-provider/shardingsphere-db-discovery-mysql/src/main/java/org/apache/shardingsphere/dbdiscovery/mysql/type/mgr/MGRMySQLDatabaseDiscoveryType.java @@ -20,8 +20,6 @@ package org.apache.shardingsphere.dbdiscovery.mysql.type.mgr; import lombok.extern.slf4j.Slf4j; import org.apache.shardingsphere.dbdiscovery.mysql.AbstractMySQLDatabaseDiscoveryType; import org.apache.shardingsphere.infra.database.metadata.dialect.MySQLDataSourceMetaData; -import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus; -import org.apache.shardingsphere.infra.rule.event.impl.DataSourceDisabledEvent; import org.apache.shardingsphere.infra.storage.StorageNodeDataSource; import org.apache.shardingsphere.infra.storage.StorageNodeRole; import org.apache.shardingsphere.infra.storage.StorageNodeStatus; @@ -34,8 +32,6 @@ import java.sql.SQLException; import java.sql.Statement; import java.util.Collection; import java.util.LinkedList; -import java.util.Map; -import java.util.Map.Entry; import java.util.Optional; /** @@ -97,28 +93,12 @@ public final class MGRMySQLDatabaseDiscoveryType extends AbstractMySQLDatabaseDi @Override protected Optional<String> loadPrimaryDatabaseInstanceURL(final Statement statement) throws SQLException { try (ResultSet resultSet = statement.executeQuery(QUERY_PRIMARY_DATA_SOURCE)) { - if (resultSet.next()) { - return Optional.of(String.format("%s:%s", resultSet.getString("MEMBER_HOST"), resultSet.getString("MEMBER_PORT"))); - } - return Optional.empty(); + return resultSet.next() ? Optional.of(String.format("%s:%s", resultSet.getString("MEMBER_HOST"), resultSet.getString("MEMBER_PORT"))) : Optional.empty(); } } @Override - public void updateMemberState(final String databaseName, final Map<String, DataSource> dataSourceMap, final String groupName) { - for (Entry<String, DataSource> entry : dataSourceMap.entrySet()) { - if (!entry.getKey().equals(getPrimaryDataSource())) { - postDataSourceDisabledEvent(databaseName, groupName, entry.getKey(), entry.getValue()); - } - } - } - - private void postDataSourceDisabledEvent(final String databaseName, final String groupName, final String replicaDataSourceName, final DataSource replicaDataSource) { - StorageNodeDataSource storageNodeDataSource = getStorageNodeDataSource(replicaDataSource); - ShardingSphereEventBus.getInstance().post(new DataSourceDisabledEvent(databaseName, groupName, replicaDataSourceName, storageNodeDataSource)); - } - - private StorageNodeDataSource getStorageNodeDataSource(final DataSource replicaDataSource) { + public StorageNodeDataSource getStorageNodeDataSource(final DataSource replicaDataSource) { return new StorageNodeDataSource(StorageNodeRole.MEMBER, isDisabledDataSource(replicaDataSource) ? StorageNodeStatus.DISABLED : StorageNodeStatus.ENABLED); } diff --git a/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-provider/shardingsphere-db-discovery-mysql/src/main/java/org/apache/shardingsphere/dbdiscovery/mysql/type/replication/MySQLNormalReplicationMySQLDatabaseDiscoveryType.java b/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-provider/shardingsphere-db-discovery-mysql/src/main/java/org/apache/shardingsphere/dbdiscovery/mysql/type/replication/MySQLNormalReplicationMySQLD [...] index 4f4a965a01a..6b3a83f311e 100644 --- a/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-provider/shardingsphere-db-discovery-mysql/src/main/java/org/apache/shardingsphere/dbdiscovery/mysql/type/replication/MySQLNormalReplicationMySQLDatabaseDiscoveryType.java +++ b/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-provider/shardingsphere-db-discovery-mysql/src/main/java/org/apache/shardingsphere/dbdiscovery/mysql/type/replication/MySQLNormalReplicationMySQLDatabaseDiscoveryType.java @@ -19,8 +19,6 @@ package org.apache.shardingsphere.dbdiscovery.mysql.type.replication; import lombok.extern.slf4j.Slf4j; import org.apache.shardingsphere.dbdiscovery.mysql.AbstractMySQLDatabaseDiscoveryType; -import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus; -import org.apache.shardingsphere.infra.rule.event.impl.DataSourceDisabledEvent; import org.apache.shardingsphere.infra.storage.StorageNodeDataSource; import org.apache.shardingsphere.infra.storage.StorageNodeRole; import org.apache.shardingsphere.infra.storage.StorageNodeStatus; @@ -30,8 +28,6 @@ import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; -import java.util.Map; -import java.util.Map.Entry; import java.util.Optional; /** @@ -66,19 +62,7 @@ public final class MySQLNormalReplicationMySQLDatabaseDiscoveryType extends Abst } @Override - public void updateMemberState(final String databaseName, final Map<String, DataSource> dataSourceMap, final String groupName) { - for (Entry<String, DataSource> entry : dataSourceMap.entrySet()) { - if (!entry.getKey().equals(getPrimaryDataSource())) { - postDataSourceDisabledEvent(databaseName, groupName, entry.getKey(), entry.getValue()); - } - } - } - - private void postDataSourceDisabledEvent(final String databaseName, final String groupName, final String datasourceName, final DataSource replicaDataSource) { - ShardingSphereEventBus.getInstance().post(new DataSourceDisabledEvent(databaseName, groupName, datasourceName, getStorageNodeDataSource(replicaDataSource))); - } - - private StorageNodeDataSource getStorageNodeDataSource(final DataSource replicaDataSource) { + public StorageNodeDataSource getStorageNodeDataSource(final DataSource replicaDataSource) { try ( Connection connection = replicaDataSource.getConnection(); Statement statement = connection.createStatement()) { diff --git a/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-provider/shardingsphere-db-discovery-mysql/src/test/java/org/apache/shardingsphere/dbdiscovery/mysql/type/mgr/MGRDatabaseDiscoveryTypeTest.java b/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-provider/shardingsphere-db-discovery-mysql/src/test/java/org/apache/shardingsphere/dbdiscovery/mysql/type/mgr/MGRDatabaseDiscoveryTypeTest.java index 05940cea768..4750ad30412 100644 --- a/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-provider/shardingsphere-db-discovery-mysql/src/test/java/org/apache/shardingsphere/dbdiscovery/mysql/type/mgr/MGRDatabaseDiscoveryTypeTest.java +++ b/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-provider/shardingsphere-db-discovery-mysql/src/test/java/org/apache/shardingsphere/dbdiscovery/mysql/type/mgr/MGRDatabaseDiscoveryTypeTest.java @@ -17,19 +17,10 @@ package org.apache.shardingsphere.dbdiscovery.mysql.type.mgr; -import com.google.common.eventbus.EventBus; -import org.apache.shardingsphere.dbdiscovery.mysql.AbstractMySQLDatabaseDiscoveryType; -import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus; -import org.apache.shardingsphere.infra.rule.event.impl.DataSourceDisabledEvent; import org.apache.shardingsphere.infra.storage.StorageNodeDataSource; -import org.apache.shardingsphere.infra.storage.StorageNodeRole; -import org.apache.shardingsphere.infra.storage.StorageNodeStatus; -import org.junit.Ignore; import org.junit.Test; -import org.mockito.Mockito; import javax.sql.DataSource; -import java.lang.reflect.Field; import java.sql.Connection; import java.sql.DatabaseMetaData; import java.sql.ResultSet; @@ -48,8 +39,6 @@ import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.RETURNS_DEEP_STUBS; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.mockStatic; -import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public final class MGRDatabaseDiscoveryTypeTest { @@ -112,46 +101,19 @@ public final class MGRDatabaseDiscoveryTypeTest { assertThat(actual.get(), is("ds_2")); } - // TODO Fix me - @Ignore @Test - public void assertUpdateMemberState() throws SQLException, IllegalAccessException, NoSuchFieldException { - Field declaredField = AbstractMySQLDatabaseDiscoveryType.class.getDeclaredField("oldPrimaryDataSource"); - declaredField.setAccessible(true); - declaredField.set(databaseDiscoveryType, "ds_0"); - EventBus eventBus = mock(EventBus.class); - mockStatic(ShardingSphereEventBus.class); - when(ShardingSphereEventBus.getInstance()).thenReturn(eventBus); - List<DataSource> dataSources = new LinkedList<>(); - List<Connection> connections = new LinkedList<>(); - List<Statement> statements = new LinkedList<>(); - List<ResultSet> resultSets = new LinkedList<>(); - List<DatabaseMetaData> databaseMetaData = new LinkedList<>(); - for (int i = 0; i < 3; i++) { - dataSources.add(mock(DataSource.class)); - connections.add(mock(Connection.class)); - statements.add(mock(Statement.class)); - resultSets.add(mock(ResultSet.class)); - databaseMetaData.add(mock(DatabaseMetaData.class)); - } - String sql = "SELECT MEMBER_HOST, MEMBER_PORT, MEMBER_STATE FROM performance_schema.replication_group_members"; - for (int i = 0; i < 3; i++) { - when(dataSources.get(i).getConnection()).thenReturn(connections.get(i)); - when(connections.get(i).createStatement()).thenReturn(statements.get(i)); - when(statements.get(i).executeQuery(sql)).thenReturn(resultSets.get(i)); - when(resultSets.get(i).next()).thenReturn(true, false); - when(resultSets.get(i).getString("MEMBER_HOST")).thenReturn("127.0.0.1"); - when(resultSets.get(i).getString("MEMBER_PORT")).thenReturn(Integer.toString(3306 + i)); - when(resultSets.get(i).getString("MEMBER_STATE")).thenReturn("ONLINE"); - when(connections.get(i).getMetaData()).thenReturn(databaseMetaData.get(i)); - when(databaseMetaData.get(i).getURL()).thenReturn("jdbc:mysql://127.0.0.1:" + (3306 + i) + "/ds_0?serverTimezone=UTC&useSSL=false"); - } - Map<String, DataSource> dataSourceMap = new HashMap<>(3, 1); - for (int i = 0; i < 3; i++) { - dataSourceMap.put(String.format("ds_%s", i), dataSources.get(i)); - } - databaseDiscoveryType.updateMemberState("discovery_db", dataSourceMap, "readwrite_ds"); - verify(eventBus).post(Mockito.refEq(new DataSourceDisabledEvent("discovery_db", "readwrite_ds", "ds_2", - new StorageNodeDataSource(StorageNodeRole.MEMBER, StorageNodeStatus.DISABLED)))); + public void assertGetDisabledStorageNodeDataSource() throws SQLException { + databaseDiscoveryType.setPrimaryDataSource("foo_ds"); + DataSource dataSource = mock(DataSource.class, RETURNS_DEEP_STUBS); + when(dataSource.getConnection().getMetaData().getURL()).thenReturn("jdbc:mysql://127.0.0.1:3306/foo_ds"); + StorageNodeDataSource actual = databaseDiscoveryType.getStorageNodeDataSource(dataSource); + assertThat(actual.getRole(), is("member")); + assertThat(actual.getStatus(), is("disabled")); + assertThat(actual.getReplicationDelayMilliseconds(), is(0L)); + } + + @Test + public void assertGetEnabledStorageNodeDataSource() { + // TODO } } diff --git a/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-provider/shardingsphere-db-discovery-opengauss/src/main/java/org/apache/shardingsphere/dbdiscovery/opengauss/replication/OpenGaussNormalReplicationDatabaseDiscoveryType.java b/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-provider/shardingsphere-db-discovery-opengauss/src/main/java/org/apache/shardingsphere/dbdiscovery/opengauss/replication/OpenGaussNormalReplicat [...] index 4efc78c1a77..f7781157534 100644 --- a/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-provider/shardingsphere-db-discovery-opengauss/src/main/java/org/apache/shardingsphere/dbdiscovery/opengauss/replication/OpenGaussNormalReplicationDatabaseDiscoveryType.java +++ b/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-provider/shardingsphere-db-discovery-opengauss/src/main/java/org/apache/shardingsphere/dbdiscovery/opengauss/replication/OpenGaussNormalReplicationDatabaseDiscoveryType.java @@ -21,8 +21,6 @@ import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.apache.shardingsphere.dbdiscovery.spi.DatabaseDiscoveryType; -import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus; -import org.apache.shardingsphere.infra.rule.event.impl.DataSourceDisabledEvent; import org.apache.shardingsphere.infra.storage.StorageNodeDataSource; import org.apache.shardingsphere.infra.storage.StorageNodeRole; import org.apache.shardingsphere.infra.storage.StorageNodeStatus; @@ -79,13 +77,8 @@ public final class OpenGaussNormalReplicationDatabaseDiscoveryType implements Da } @Override - public void updateMemberState(final String databaseName, final Map<String, DataSource> dataSourceMap, final String groupName) { - for (Entry<String, DataSource> entry : dataSourceMap.entrySet()) { - if (!entry.getKey().equals(primaryDataSource)) { - StorageNodeStatus storageNodeStatus = isDisabledDataSource(entry.getValue()) ? StorageNodeStatus.DISABLED : StorageNodeStatus.ENABLED; - ShardingSphereEventBus.getInstance().post(new DataSourceDisabledEvent(databaseName, groupName, entry.getKey(), new StorageNodeDataSource(StorageNodeRole.MEMBER, storageNodeStatus))); - } - } + public StorageNodeDataSource getStorageNodeDataSource(final DataSource replicaDataSource) { + return new StorageNodeDataSource(StorageNodeRole.MEMBER, isDisabledDataSource(replicaDataSource) ? StorageNodeStatus.DISABLED : StorageNodeStatus.ENABLED); } private boolean isDisabledDataSource(final DataSource replicaDataSource) {