This is an automated email from the ASF dual-hosted git repository.
menghaoran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 7ffe598 Add show slave status database discovery type. (#16106)
7ffe598 is described below
commit 7ffe5985a7a6d870128f8126ce425985a94fe268
Author: zhaojinchao <[email protected]>
AuthorDate: Wed Mar 16 11:14:36 2022 +0800
Add show slave status database discovery type. (#16106)
* Add show slave status database discovery type.
* Add License.
* Adjust code style.
* Close resultSet resource
---
.../ShowSlaveStatusDatabaseDiscoveryType.java | 159 +++++++++++++++++++++
...ingsphere.dbdiscovery.spi.DatabaseDiscoveryType | 1 +
2 files changed, 160 insertions(+)
diff --git
a/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-provider/shardingsphere-db-discovery-mysql/src/main/java/org/apache/shardingsphere/dbdiscovery/mysql/ShowSlaveStatusDatabaseDiscoveryType.java
b/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-provider/shardingsphere-db-discovery-mysql/src/main/java/org/apache/shardingsphere/dbdiscovery/mysql/ShowSlaveStatusDatabaseDiscoveryType.java
new file mode 100644
index 0000000..3181a2e
--- /dev/null
+++
b/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-provider/shardingsphere-db-discovery-mysql/src/main/java/org/apache/shardingsphere/dbdiscovery/mysql/ShowSlaveStatusDatabaseDiscoveryType.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.dbdiscovery.mysql;
+
+import lombok.Getter;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.dbdiscovery.spi.DatabaseDiscoveryType;
+import org.apache.shardingsphere.infra.eventbus.ShardingSphereEventBus;
+import org.apache.shardingsphere.infra.rule.event.impl.DataSourceDisabledEvent;
+import
org.apache.shardingsphere.infra.rule.event.impl.PrimaryDataSourceChangedEvent;
+
+import javax.sql.DataSource;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+
+/**
+ * Show slave status database discovery type.
+ */
+@Slf4j
+public final class ShowSlaveStatusDatabaseDiscoveryType implements
DatabaseDiscoveryType {
+
+ private static final String SHOW_SLAVE_STATUS = "SHOW SLAVE STATUS";
+
+ private String oldPrimaryDataSource;
+
+ @Getter
+ @Setter
+ private Properties props = new Properties();
+
+ @Override
+ public void checkDatabaseDiscoveryConfiguration(final String schemaName,
final Map<String, DataSource> dataSourceMap) {
+ //TODO Check master-slave mode
+ }
+
+ //TODO Consider merge MGR 's get primary datasource method
+ @Override
+ public void updatePrimaryDataSource(final String schemaName, final
Map<String, DataSource> dataSourceMap, final Collection<String>
disabledDataSourceNames, final String groupName) {
+ Map<String, DataSource> activeDataSourceMap = new
HashMap<>(dataSourceMap);
+ if (!disabledDataSourceNames.isEmpty()) {
+ activeDataSourceMap.entrySet().removeIf(each ->
disabledDataSourceNames.contains(each.getKey()));
+ }
+ String primaryDatasource =
determinePrimaryDataSource(activeDataSourceMap);
+ if (primaryDatasource.isEmpty()) {
+ return;
+ }
+ if (!primaryDatasource.equals(oldPrimaryDataSource)) {
+ oldPrimaryDataSource = primaryDatasource;
+ ShardingSphereEventBus.getInstance().post(new
PrimaryDataSourceChangedEvent(schemaName, groupName, primaryDatasource));
+ }
+ }
+
+ //TODO Consider merge MGR 's get primary datasource method
+ private String determinePrimaryDataSource(final Map<String, DataSource>
dataSourceMap) {
+ String primaryDataSourceURL = findPrimaryDataSourceURL(dataSourceMap);
+ return findPrimaryDataSourceName(primaryDataSourceURL, dataSourceMap);
+ }
+
+ //TODO Consider merge MGR 's get primary datasource method
+ private String findPrimaryDataSourceURL(final Map<String, DataSource>
dataSourceMap) {
+ for (Entry<String, DataSource> entry : dataSourceMap.entrySet()) {
+ try (Connection connection = entry.getValue().getConnection();
+ Statement statement = connection.createStatement();
+ ResultSet resultSet =
statement.executeQuery(SHOW_SLAVE_STATUS)) {
+ if (resultSet.next()) {
+ return String.format("%s:%s",
resultSet.getString("Master_Host"), resultSet.getString("Master_Port"));
+ }
+ } catch (SQLException ex) {
+ log.error("An exception occurred while find primary data
source url", ex);
+ }
+ }
+ return null;
+ }
+
+ private String findPrimaryDataSourceName(final String
primaryDataSourceURL, final Map<String, DataSource> dataSourceMap) {
+ String result = "";
+ for (Entry<String, DataSource> entry : dataSourceMap.entrySet()) {
+ String url;
+ try (Connection connection = entry.getValue().getConnection()) {
+ url = connection.getMetaData().getURL();
+ if (null != url && url.contains(primaryDataSourceURL)) {
+ return entry.getKey();
+ }
+ } catch (final SQLException ex) {
+ log.error("An exception occurred while find primary data
source name", ex);
+ }
+ }
+ return result;
+ }
+
+ @Override
+ public void updateMemberState(final String schemaName, final Map<String,
DataSource> dataSourceMap, final Collection<String> disabledDataSourceNames) {
+ Map<String, DataSource> activeDataSourceMap = new
HashMap<>(dataSourceMap);
+ if (!disabledDataSourceNames.isEmpty()) {
+ activeDataSourceMap.entrySet().removeIf(each ->
disabledDataSourceNames.contains(each.getKey()));
+ }
+ for (Entry<String, DataSource> entry : activeDataSourceMap.entrySet())
{
+ if (oldPrimaryDataSource.equals(entry.getKey())) {
+ continue;
+ }
+ determineDatasourceState(schemaName, entry.getKey(),
entry.getValue());
+ }
+ }
+
+ private void determineDatasourceState(final String schemaName, final
String datasourceName, final DataSource dataSource) {
+ try (Connection connection = dataSource.getConnection();
+ Statement statement = connection.createStatement()) {
+ long replicationDelayTime = getSecondsBehindMaster(statement);
+ if (replicationDelayTime * 1000 <
Integer.parseInt(props.getProperty("delay_milliseconds_threshold"))) {
+ ShardingSphereEventBus.getInstance().post(new
DataSourceDisabledEvent(schemaName, datasourceName, false));
+ } else {
+ ShardingSphereEventBus.getInstance().post(new
DataSourceDisabledEvent(schemaName, datasourceName, true));
+ }
+ } catch (SQLException ex) {
+ log.error("An exception occurred while find member data source
`Seconds_Behind_Master`", ex);
+ }
+ }
+
+ private long getSecondsBehindMaster(final Statement statement) throws
SQLException {
+ try (ResultSet resultSet = statement.executeQuery(SHOW_SLAVE_STATUS)) {
+ if (resultSet.next()) {
+ return resultSet.getLong("Seconds_Behind_Master");
+ }
+ return 0L;
+ }
+ }
+
+ @Override
+ public String getPrimaryDataSource() {
+ return oldPrimaryDataSource;
+ }
+
+ @Override
+ public String getType() {
+ return "SHOW_SLAVE_STATUS";
+ }
+}
diff --git
a/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-provider/shardingsphere-db-discovery-mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.dbdiscovery.spi.DatabaseDiscoveryType
b/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-provider/shardingsphere-db-discovery-mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.dbdiscovery.spi.DatabaseDiscoveryType
index 8d40f4c..6dacef6 100644
---
a/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-provider/shardingsphere-db-discovery-mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.dbdiscovery.spi.DatabaseDiscoveryType
+++
b/shardingsphere-features/shardingsphere-db-discovery/shardingsphere-db-discovery-provider/shardingsphere-db-discovery-mysql/src/main/resources/META-INF/services/org.apache.shardingsphere.dbdiscovery.spi.DatabaseDiscoveryType
@@ -16,3 +16,4 @@
#
org.apache.shardingsphere.dbdiscovery.mysql.MGRDatabaseDiscoveryType
+org.apache.shardingsphere.dbdiscovery.mysql.ShowSlaveStatusDatabaseDiscoveryType