This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new b79d2fd9955 Add ReadOnly state for Proxy (#23944)
b79d2fd9955 is described below

commit b79d2fd995583ee5cb02c3b35e7d3a7f62abd2e4
Author: zhaojinchao <[email protected]>
AuthorDate: Sat Feb 4 08:25:16 2023 +0800

    Add ReadOnly state for Proxy (#23944)
---
 .../user-manual/error-code/sql-error-code.cn.md    |  1 +
 .../user-manual/error-code/sql-error-code.en.md    |  1 +
 .../algorithm/DatabaseDiscoveryEngine.java         | 26 ++++++--
 .../dbdiscovery/heartbeat/HeartbeatJob.java        |  6 +-
 .../dbdiscovery/rule/DatabaseDiscoveryRule.java    |  4 +-
 .../shardingsphere/infra/state/StateType.java      |  2 +-
 .../event/ComputeNodeStatusChangedEvent.java       |  6 +-
 .../subscriber/ComputeNodeStatusSubscriber.java    |  2 +-
 .../ral/updatable/SetInstanceStatusHandler.java    |  4 +-
 .../frontend/exception/ReadOnlyException.java      | 19 +++---
 .../proxy/frontend/state/ProxyStateContext.java    |  2 +
 .../frontend/state/impl/ReadOnlyProxyState.java    | 70 ++++++++++++++++++++++
 12 files changed, 115 insertions(+), 28 deletions(-)

diff --git a/docs/document/content/user-manual/error-code/sql-error-code.cn.md 
b/docs/document/content/user-manual/error-code/sql-error-code.cn.md
index cdb54cda0ce..c620aef7976 100644
--- a/docs/document/content/user-manual/error-code/sql-error-code.cn.md
+++ b/docs/document/content/user-manual/error-code/sql-error-code.cn.md
@@ -51,6 +51,7 @@ SQL 错误码以标准的 SQL State,Vendor Code 和详细错误信息提供,
 | --------- | ----------- | ------ |
 | 08000     | 13000       | Can not register driver, reason is: %s |
 | 01000     | 13010       | Circuit break open, the request has been ignored. |
+| 01000     | 13011       | The current instance is read-only, Not allowed 
write traffic. |
 | 08000     | 13020       | Can not get %d connections one time, partition 
succeed connection(%d) have released. Please consider increasing the 
\`maxPoolSize\` of the data sources or decreasing the 
\`max-connections-size-per-query\` in properties. |
 | 08000     | 13030       | Connection has been closed. |
 | 08000     | 13031       | Result set has been closed. |
diff --git a/docs/document/content/user-manual/error-code/sql-error-code.en.md 
b/docs/document/content/user-manual/error-code/sql-error-code.en.md
index f8b13c497a1..84b6e67783b 100644
--- a/docs/document/content/user-manual/error-code/sql-error-code.en.md
+++ b/docs/document/content/user-manual/error-code/sql-error-code.en.md
@@ -51,6 +51,7 @@ SQL error codes provide by standard `SQL State`, `Vendor 
Code` and `Reason`, whi
 | --------- | ----------- | ------ |
 | 08000     | 13000       | Can not register driver, reason is: %s |
 | 01000     | 13010       | Circuit break open, the request has been ignored. |
+| 01000     | 13011       | The current instance is read-only, Not allowed 
write traffic. |
 | 08000     | 13020       | Can not get %d connections one time, partition 
succeed connection(%d) have released. Please consider increasing the 
\`maxPoolSize\` of the data sources or decreasing the 
\`max-connections-size-per-query\` in properties. |
 | 08000     | 13030       | Connection has been closed. |
 | 08000     | 13031       | Result set has been closed. |
diff --git 
a/features/db-discovery/core/src/main/java/org/apache/shardingsphere/dbdiscovery/algorithm/DatabaseDiscoveryEngine.java
 
b/features/db-discovery/core/src/main/java/org/apache/shardingsphere/dbdiscovery/algorithm/DatabaseDiscoveryEngine.java
index 73d279d6eab..694e0533bf0 100644
--- 
a/features/db-discovery/core/src/main/java/org/apache/shardingsphere/dbdiscovery/algorithm/DatabaseDiscoveryEngine.java
+++ 
b/features/db-discovery/core/src/main/java/org/apache/shardingsphere/dbdiscovery/algorithm/DatabaseDiscoveryEngine.java
@@ -17,13 +17,16 @@
 
 package org.apache.shardingsphere.dbdiscovery.algorithm;
 
+import com.google.common.base.Strings;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.dbdiscovery.spi.DatabaseDiscoveryProvider;
 import org.apache.shardingsphere.dbdiscovery.spi.ReplicaDataSourceStatus;
 import org.apache.shardingsphere.infra.datasource.state.DataSourceState;
+import org.apache.shardingsphere.infra.instance.InstanceContext;
 import 
org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDatabase;
-import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
+import org.apache.shardingsphere.infra.state.StateType;
+import 
org.apache.shardingsphere.mode.metadata.compute.event.ComputeNodeStatusChangedEvent;
 import org.apache.shardingsphere.mode.metadata.storage.StorageNodeDataSource;
 import org.apache.shardingsphere.mode.metadata.storage.StorageNodeRole;
 import 
org.apache.shardingsphere.mode.metadata.storage.event.DataSourceDisabledEvent;
@@ -46,7 +49,7 @@ public final class DatabaseDiscoveryEngine {
     
     private final DatabaseDiscoveryProvider provider;
     
-    private final EventBusContext eventBusContext;
+    private final InstanceContext instanceContext;
     
     /**
      * Check environment of database cluster.
@@ -71,6 +74,7 @@ public final class DatabaseDiscoveryEngine {
     public String changePrimaryDataSource(final String databaseName, final 
String groupName, final String originalPrimaryDataSourceName,
                                           final Map<String, DataSource> 
dataSourceMap, final Collection<String> disabledDataSourceNames) {
         Optional<String> newPrimaryDataSourceName = 
findPrimaryDataSourceName(dataSourceMap);
+        postComputeNodeStatusChangedEvent(newPrimaryDataSourceName.orElse(""));
         newPrimaryDataSourceName.ifPresent(optional -> 
postPrimaryChangedEvent(databaseName, groupName, originalPrimaryDataSourceName, 
optional));
         Map<String, DataSource> replicaDataSourceMap = new 
HashMap<>(dataSourceMap);
         newPrimaryDataSourceName.ifPresent(replicaDataSourceMap::remove);
@@ -91,9 +95,19 @@ public final class DatabaseDiscoveryEngine {
         return Optional.empty();
     }
     
+    private void postComputeNodeStatusChangedEvent(final String 
newPrimaryDataSourceName) {
+        if (Strings.isNullOrEmpty(newPrimaryDataSourceName) && 
StateType.OK.equals(instanceContext.getInstance().getState().getCurrentState()))
 {
+            instanceContext.getEventBusContext().post(new 
ComputeNodeStatusChangedEvent(instanceContext.getInstance().getCurrentInstanceId(),
 StateType.READ_ONLY));
+            return;
+        }
+        if (!Strings.isNullOrEmpty(newPrimaryDataSourceName) && 
StateType.READ_ONLY.equals(instanceContext.getInstance().getState().getCurrentState()))
 {
+            instanceContext.getEventBusContext().post(new 
ComputeNodeStatusChangedEvent(instanceContext.getInstance().getCurrentInstanceId(),
 StateType.OK));
+        }
+    }
+    
     private void postPrimaryChangedEvent(final String databaseName, final 
String groupName, final String originalPrimaryDataSourceName, final String 
newPrimaryDataSourceName) {
         if (!newPrimaryDataSourceName.equals(originalPrimaryDataSourceName)) {
-            eventBusContext.post(new PrimaryDataSourceChangedEvent(new 
QualifiedDatabase(databaseName, groupName, newPrimaryDataSourceName)));
+            instanceContext.getEventBusContext().post(new 
PrimaryDataSourceChangedEvent(new QualifiedDatabase(databaseName, groupName, 
newPrimaryDataSourceName)));
         }
     }
     
@@ -104,16 +118,16 @@ public final class DatabaseDiscoveryEngine {
             StorageNodeDataSource replicaStorageNode = 
createReplicaStorageNode(loadReplicaStatus(entry.getValue()));
             if (DataSourceState.ENABLED == replicaStorageNode.getStatus()) {
                 enabledReplicasCount += 
disabledDataSourceNames.contains(entry.getKey()) ? 1 : 0;
-                eventBusContext.post(new DataSourceDisabledEvent(databaseName, 
groupName, entry.getKey(), replicaStorageNode));
+                instanceContext.getEventBusContext().post(new 
DataSourceDisabledEvent(databaseName, groupName, entry.getKey(), 
replicaStorageNode));
                 continue;
             }
             if (provider.getMinEnabledReplicas().isPresent() && 0 == 
provider.getMinEnabledReplicas().get()) {
-                eventBusContext.post(new DataSourceDisabledEvent(databaseName, 
groupName, entry.getKey(), replicaStorageNode));
+                instanceContext.getEventBusContext().post(new 
DataSourceDisabledEvent(databaseName, groupName, entry.getKey(), 
replicaStorageNode));
                 continue;
             }
             if (enabledReplicasCount > provider.getMinEnabledReplicas().get()) 
{
                 enabledReplicasCount -= 
disabledDataSourceNames.contains(entry.getKey()) ? 0 : 1;
-                eventBusContext.post(new DataSourceDisabledEvent(databaseName, 
groupName, entry.getKey(), replicaStorageNode));
+                instanceContext.getEventBusContext().post(new 
DataSourceDisabledEvent(databaseName, groupName, entry.getKey(), 
replicaStorageNode));
             }
         }
     }
diff --git 
a/features/db-discovery/core/src/main/java/org/apache/shardingsphere/dbdiscovery/heartbeat/HeartbeatJob.java
 
b/features/db-discovery/core/src/main/java/org/apache/shardingsphere/dbdiscovery/heartbeat/HeartbeatJob.java
index cab23c449bf..67ac474b253 100644
--- 
a/features/db-discovery/core/src/main/java/org/apache/shardingsphere/dbdiscovery/heartbeat/HeartbeatJob.java
+++ 
b/features/db-discovery/core/src/main/java/org/apache/shardingsphere/dbdiscovery/heartbeat/HeartbeatJob.java
@@ -22,7 +22,7 @@ import 
org.apache.shardingsphere.dbdiscovery.algorithm.DatabaseDiscoveryEngine;
 import org.apache.shardingsphere.dbdiscovery.spi.DatabaseDiscoveryProvider;
 import org.apache.shardingsphere.elasticjob.api.ShardingContext;
 import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
-import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
+import org.apache.shardingsphere.infra.instance.InstanceContext;
 
 import javax.sql.DataSource;
 import java.util.Collection;
@@ -46,11 +46,11 @@ public final class HeartbeatJob implements SimpleJob {
     
     private final Collection<String> disabledDataSourceNames;
     
-    private final EventBusContext eventBusContext;
+    private final InstanceContext instanceContext;
     
     @Override
     public void execute(final ShardingContext shardingContext) {
-        new DatabaseDiscoveryEngine(databaseDiscoveryProvider, 
eventBusContext).changePrimaryDataSource(databaseName, groupName, 
originalPrimaryDataSourceName,
+        new DatabaseDiscoveryEngine(databaseDiscoveryProvider, 
instanceContext).changePrimaryDataSource(databaseName, groupName, 
originalPrimaryDataSourceName,
                 dataSourceMap, disabledDataSourceNames);
     }
 }
diff --git 
a/features/db-discovery/core/src/main/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryRule.java
 
b/features/db-discovery/core/src/main/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryRule.java
index d0a3063f13d..252481d87d8 100644
--- 
a/features/db-discovery/core/src/main/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryRule.java
+++ 
b/features/db-discovery/core/src/main/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryRule.java
@@ -112,7 +112,7 @@ public final class DatabaseDiscoveryRule implements 
DatabaseRule, DataSourceCont
             String groupName = entry.getKey();
             DatabaseDiscoveryDataSourceRule dataSourceRule = entry.getValue();
             Map<String, DataSource> originalDataSourceMap = 
dataSourceRule.getDataSourceGroup(dataSourceMap);
-            DatabaseDiscoveryEngine engine = new 
DatabaseDiscoveryEngine(dataSourceRule.getProvider(), 
instanceContext.getEventBusContext());
+            DatabaseDiscoveryEngine engine = new 
DatabaseDiscoveryEngine(dataSourceRule.getProvider(), instanceContext);
             engine.checkEnvironment(databaseName, originalDataSourceMap);
             
dataSourceRule.changePrimaryDataSourceName(engine.changePrimaryDataSource(
                     databaseName, groupName, 
entry.getValue().getPrimaryDataSourceName(), originalDataSourceMap, 
dataSourceRule.getDisabledDataSourceNames()));
@@ -177,7 +177,7 @@ public final class DatabaseDiscoveryRule implements 
DatabaseRule, DataSourceCont
             DatabaseDiscoveryDataSourceRule rule = entry.getValue();
             String jobName = rule.getProvider().getType() + "-" + databaseName 
+ "-" + rule.getGroupName();
             CronJob job = new CronJob(jobName, each -> new 
HeartbeatJob(databaseName, rule.getGroupName(), 
rule.getPrimaryDataSourceName(), rule.getDataSourceGroup(dataSourceMap),
-                    rule.getProvider(), rule.getDisabledDataSourceNames(), 
instanceContext.getEventBusContext()).execute(null),
+                    rule.getProvider(), rule.getDisabledDataSourceNames(), 
instanceContext).execute(null),
                     rule.getHeartbeatProps().getProperty("keep-alive-cron"));
             scheduleContext.startSchedule(job);
         }
diff --git 
a/infra/common/src/main/java/org/apache/shardingsphere/infra/state/StateType.java
 
b/infra/common/src/main/java/org/apache/shardingsphere/infra/state/StateType.java
index e68c1d7e329..e87e24cf687 100644
--- 
a/infra/common/src/main/java/org/apache/shardingsphere/infra/state/StateType.java
+++ 
b/infra/common/src/main/java/org/apache/shardingsphere/infra/state/StateType.java
@@ -22,5 +22,5 @@ package org.apache.shardingsphere.infra.state;
  */
 public enum StateType {
     
-    OK, CIRCUIT_BREAK, LOCK
+    OK, CIRCUIT_BREAK, READ_ONLY, LOCK
 }
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/ComputeNodeStatusChangedEvent.java
 
b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/compute/event/ComputeNodeStatusChangedEvent.java
similarity index 92%
copy from 
mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/ComputeNodeStatusChangedEvent.java
copy to 
mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/compute/event/ComputeNodeStatusChangedEvent.java
index fd7d5191ebe..d7f1df5bb67 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/ComputeNodeStatusChangedEvent.java
+++ 
b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/compute/event/ComputeNodeStatusChangedEvent.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event;
+package org.apache.shardingsphere.mode.metadata.compute.event;
 
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
@@ -28,7 +28,7 @@ import org.apache.shardingsphere.infra.state.StateType;
 @Getter
 public final class ComputeNodeStatusChangedEvent {
     
-    private final StateType state;
-    
     private final String instanceId;
+    
+    private final StateType state;
 }
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/subscriber/ComputeNodeStatusSubscriber.java
 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/subscriber/ComputeNodeStatusSubscriber.java
index 7f36f2b7acb..97e65f58f14 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/subscriber/ComputeNodeStatusSubscriber.java
+++ 
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/subscriber/ComputeNodeStatusSubscriber.java
@@ -19,7 +19,7 @@ package 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.stat
 
 import com.google.common.eventbus.Subscribe;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter;
-import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ComputeNodeStatusChangedEvent;
+import 
org.apache.shardingsphere.mode.metadata.compute.event.ComputeNodeStatusChangedEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.LabelsChangedEvent;
 import org.apache.shardingsphere.mode.metadata.persist.node.ComputeNode;
 import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
diff --git 
a/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/SetInstanceStatusHandler.java
 
b/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/SetInstanceStatusHandler.java
index 3ab33a7c853..0c1010fc923 100644
--- 
a/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/SetInstanceStatusHandler.java
+++ 
b/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/SetInstanceStatusHandler.java
@@ -22,7 +22,7 @@ import org.apache.shardingsphere.infra.state.StateType;
 import 
org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
 import 
org.apache.shardingsphere.infra.util.exception.external.sql.type.generic.UnsupportedSQLOperationException;
 import org.apache.shardingsphere.mode.manager.ContextManager;
-import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ComputeNodeStatusChangedEvent;
+import 
org.apache.shardingsphere.mode.metadata.compute.event.ComputeNodeStatusChangedEvent;
 import 
org.apache.shardingsphere.proxy.backend.handler.distsql.ral.UpdatableRALBackendHandler;
 
 /**
@@ -40,7 +40,7 @@ public final class SetInstanceStatusHandler extends 
UpdatableRALBackendHandler<S
         } else {
             checkEnablingIsValid(contextManager, instanceId);
         }
-        contextManager.getInstanceContext().getEventBusContext().post(new 
ComputeNodeStatusChangedEvent(isDisable ? StateType.CIRCUIT_BREAK : 
StateType.OK, instanceId));
+        contextManager.getInstanceContext().getEventBusContext().post(new 
ComputeNodeStatusChangedEvent(instanceId, isDisable ? StateType.CIRCUIT_BREAK : 
StateType.OK));
     }
     
     private void checkEnablingIsValid(final ContextManager contextManager, 
final String instanceId) {
diff --git 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/ComputeNodeStatusChangedEvent.java
 
b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/exception/ReadOnlyException.java
similarity index 60%
rename from 
mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/ComputeNodeStatusChangedEvent.java
rename to 
proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/exception/ReadOnlyException.java
index fd7d5191ebe..1e0e394fe7a 100644
--- 
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/ComputeNodeStatusChangedEvent.java
+++ 
b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/exception/ReadOnlyException.java
@@ -15,20 +15,19 @@
  * limitations under the License.
  */
 
-package 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event;
+package org.apache.shardingsphere.proxy.frontend.exception;
 
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.infra.state.StateType;
+import org.apache.shardingsphere.infra.exception.ConnectionSQLException;
+import 
org.apache.shardingsphere.infra.util.exception.external.sql.sqlstate.XOpenSQLState;
 
 /**
- * Compute node status changed event.
+ * Read only exception.
  */
-@RequiredArgsConstructor
-@Getter
-public final class ComputeNodeStatusChangedEvent {
+public final class ReadOnlyException extends ConnectionSQLException {
     
-    private final StateType state;
+    private static final long serialVersionUID = 6339672680026286798L;
     
-    private final String instanceId;
+    public ReadOnlyException() {
+        super(XOpenSQLState.GENERAL_WARNING, 11, "The current instance is 
read-only, Not allowed write traffic.");
+    }
 }
diff --git 
a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/state/ProxyStateContext.java
 
b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/state/ProxyStateContext.java
index 776cd423594..7f1a4355c3c 100644
--- 
a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/state/ProxyStateContext.java
+++ 
b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/state/ProxyStateContext.java
@@ -27,6 +27,7 @@ import 
org.apache.shardingsphere.proxy.frontend.spi.DatabaseProtocolFrontendEngi
 import 
org.apache.shardingsphere.proxy.frontend.state.impl.CircuitBreakProxyState;
 import org.apache.shardingsphere.proxy.frontend.state.impl.OKProxyState;
 import org.apache.shardingsphere.proxy.frontend.state.impl.LockProxyState;
+import org.apache.shardingsphere.proxy.frontend.state.impl.ReadOnlyProxyState;
 
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -43,6 +44,7 @@ public final class ProxyStateContext {
         STATES.put(StateType.OK, new OKProxyState());
         STATES.put(StateType.LOCK, new LockProxyState());
         STATES.put(StateType.CIRCUIT_BREAK, new CircuitBreakProxyState());
+        STATES.put(StateType.READ_ONLY, new ReadOnlyProxyState());
     }
     
     /**
diff --git 
a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/state/impl/ReadOnlyProxyState.java
 
b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/state/impl/ReadOnlyProxyState.java
new file mode 100644
index 00000000000..09bf8761366
--- /dev/null
+++ 
b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/state/impl/ReadOnlyProxyState.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.proxy.frontend.state.impl;
+
+import io.netty.channel.ChannelHandlerContext;
+import org.apache.shardingsphere.db.protocol.packet.DatabasePacket;
+import 
org.apache.shardingsphere.infra.binder.statement.CommonSQLStatementContext;
+import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
+import 
org.apache.shardingsphere.infra.binder.statement.dml.SelectStatementContext;
+import org.apache.shardingsphere.infra.hint.HintManager;
+import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
+import org.apache.shardingsphere.proxy.frontend.exception.ReadOnlyException;
+import 
org.apache.shardingsphere.proxy.frontend.spi.DatabaseProtocolFrontendEngine;
+import org.apache.shardingsphere.proxy.frontend.state.ProxyState;
+import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
+import 
org.apache.shardingsphere.sql.parser.sql.common.statement.dml.SelectStatement;
+import 
org.apache.shardingsphere.sql.parser.sql.dialect.handler.dml.SelectStatementHandler;
+
+import java.util.Optional;
+
+/**
+ * ReadOnly proxy state.
+ */
+public final class ReadOnlyProxyState implements ProxyState {
+    
+    @Override
+    public void execute(final ChannelHandlerContext context, final Object 
message, final DatabaseProtocolFrontendEngine databaseProtocolFrontendEngine, 
final ConnectionSession connectionSession) {
+        if 
(isPrimaryRoute(connectionSession.getQueryContext().getSqlStatementContext())) {
+            
context.writeAndFlush(databaseProtocolFrontendEngine.getCommandExecuteEngine().getErrorPacket(new
 ReadOnlyException()));
+            Optional<DatabasePacket<?>> databasePacket = 
databaseProtocolFrontendEngine.getCommandExecuteEngine().getOtherPacket(connectionSession);
+            databasePacket.ifPresent(context::writeAndFlush);
+        }
+    }
+    
+    private boolean isPrimaryRoute(final SQLStatementContext<?> 
sqlStatementContext) {
+        return isWriteRouteStatement(sqlStatementContext) || 
isHintWriteRouteOnly(sqlStatementContext);
+    }
+    
+    private boolean isWriteRouteStatement(final SQLStatementContext<?> 
sqlStatementContext) {
+        SQLStatement sqlStatement = sqlStatementContext.getSqlStatement();
+        return containsLockSegment(sqlStatement) || 
containsLastInsertIdProjection(sqlStatementContext) || !(sqlStatement 
instanceof SelectStatement);
+    }
+    
+    private boolean containsLockSegment(final SQLStatement sqlStatement) {
+        return sqlStatement instanceof SelectStatement && 
SelectStatementHandler.getLockSegment((SelectStatement) 
sqlStatement).isPresent();
+    }
+    
+    private boolean containsLastInsertIdProjection(final 
SQLStatementContext<?> sqlStatementContext) {
+        return sqlStatementContext instanceof SelectStatementContext && 
((SelectStatementContext) 
sqlStatementContext).getProjectionsContext().isContainsLastInsertIdProjection();
+    }
+    
+    private boolean isHintWriteRouteOnly(final SQLStatementContext<?> 
sqlStatementContext) {
+        return HintManager.isWriteRouteOnly() || (sqlStatementContext 
instanceof CommonSQLStatementContext && ((CommonSQLStatementContext<?>) 
sqlStatementContext).isHintWriteRouteOnly());
+    }
+}

Reply via email to