This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new b79d2fd9955 Add ReadOnly state for Proxy (#23944)
b79d2fd9955 is described below
commit b79d2fd995583ee5cb02c3b35e7d3a7f62abd2e4
Author: zhaojinchao <[email protected]>
AuthorDate: Sat Feb 4 08:25:16 2023 +0800
Add ReadOnly state for Proxy (#23944)
---
.../user-manual/error-code/sql-error-code.cn.md | 1 +
.../user-manual/error-code/sql-error-code.en.md | 1 +
.../algorithm/DatabaseDiscoveryEngine.java | 26 ++++++--
.../dbdiscovery/heartbeat/HeartbeatJob.java | 6 +-
.../dbdiscovery/rule/DatabaseDiscoveryRule.java | 4 +-
.../shardingsphere/infra/state/StateType.java | 2 +-
.../event/ComputeNodeStatusChangedEvent.java | 6 +-
.../subscriber/ComputeNodeStatusSubscriber.java | 2 +-
.../ral/updatable/SetInstanceStatusHandler.java | 4 +-
.../frontend/exception/ReadOnlyException.java | 19 +++---
.../proxy/frontend/state/ProxyStateContext.java | 2 +
.../frontend/state/impl/ReadOnlyProxyState.java | 70 ++++++++++++++++++++++
12 files changed, 115 insertions(+), 28 deletions(-)
diff --git a/docs/document/content/user-manual/error-code/sql-error-code.cn.md
b/docs/document/content/user-manual/error-code/sql-error-code.cn.md
index cdb54cda0ce..c620aef7976 100644
--- a/docs/document/content/user-manual/error-code/sql-error-code.cn.md
+++ b/docs/document/content/user-manual/error-code/sql-error-code.cn.md
@@ -51,6 +51,7 @@ SQL 错误码以标准的 SQL State,Vendor Code 和详细错误信息提供,
| --------- | ----------- | ------ |
| 08000 | 13000 | Can not register driver, reason is: %s |
| 01000 | 13010 | Circuit break open, the request has been ignored. |
+| 01000 | 13011 | The current instance is read-only, Not allowed
write traffic. |
| 08000 | 13020 | Can not get %d connections one time, partition
succeed connection(%d) have released. Please consider increasing the
\`maxPoolSize\` of the data sources or decreasing the
\`max-connections-size-per-query\` in properties. |
| 08000 | 13030 | Connection has been closed. |
| 08000 | 13031 | Result set has been closed. |
diff --git a/docs/document/content/user-manual/error-code/sql-error-code.en.md
b/docs/document/content/user-manual/error-code/sql-error-code.en.md
index f8b13c497a1..84b6e67783b 100644
--- a/docs/document/content/user-manual/error-code/sql-error-code.en.md
+++ b/docs/document/content/user-manual/error-code/sql-error-code.en.md
@@ -51,6 +51,7 @@ SQL error codes provide by standard `SQL State`, `Vendor
Code` and `Reason`, whi
| --------- | ----------- | ------ |
| 08000 | 13000 | Can not register driver, reason is: %s |
| 01000 | 13010 | Circuit break open, the request has been ignored. |
+| 01000 | 13011 | The current instance is read-only, Not allowed
write traffic. |
| 08000 | 13020 | Can not get %d connections one time, partition
succeed connection(%d) have released. Please consider increasing the
\`maxPoolSize\` of the data sources or decreasing the
\`max-connections-size-per-query\` in properties. |
| 08000 | 13030 | Connection has been closed. |
| 08000 | 13031 | Result set has been closed. |
diff --git
a/features/db-discovery/core/src/main/java/org/apache/shardingsphere/dbdiscovery/algorithm/DatabaseDiscoveryEngine.java
b/features/db-discovery/core/src/main/java/org/apache/shardingsphere/dbdiscovery/algorithm/DatabaseDiscoveryEngine.java
index 73d279d6eab..694e0533bf0 100644
---
a/features/db-discovery/core/src/main/java/org/apache/shardingsphere/dbdiscovery/algorithm/DatabaseDiscoveryEngine.java
+++
b/features/db-discovery/core/src/main/java/org/apache/shardingsphere/dbdiscovery/algorithm/DatabaseDiscoveryEngine.java
@@ -17,13 +17,16 @@
package org.apache.shardingsphere.dbdiscovery.algorithm;
+import com.google.common.base.Strings;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.dbdiscovery.spi.DatabaseDiscoveryProvider;
import org.apache.shardingsphere.dbdiscovery.spi.ReplicaDataSourceStatus;
import org.apache.shardingsphere.infra.datasource.state.DataSourceState;
+import org.apache.shardingsphere.infra.instance.InstanceContext;
import
org.apache.shardingsphere.infra.metadata.database.schema.QualifiedDatabase;
-import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
+import org.apache.shardingsphere.infra.state.StateType;
+import
org.apache.shardingsphere.mode.metadata.compute.event.ComputeNodeStatusChangedEvent;
import org.apache.shardingsphere.mode.metadata.storage.StorageNodeDataSource;
import org.apache.shardingsphere.mode.metadata.storage.StorageNodeRole;
import
org.apache.shardingsphere.mode.metadata.storage.event.DataSourceDisabledEvent;
@@ -46,7 +49,7 @@ public final class DatabaseDiscoveryEngine {
private final DatabaseDiscoveryProvider provider;
- private final EventBusContext eventBusContext;
+ private final InstanceContext instanceContext;
/**
* Check environment of database cluster.
@@ -71,6 +74,7 @@ public final class DatabaseDiscoveryEngine {
public String changePrimaryDataSource(final String databaseName, final
String groupName, final String originalPrimaryDataSourceName,
final Map<String, DataSource>
dataSourceMap, final Collection<String> disabledDataSourceNames) {
Optional<String> newPrimaryDataSourceName =
findPrimaryDataSourceName(dataSourceMap);
+ postComputeNodeStatusChangedEvent(newPrimaryDataSourceName.orElse(""));
newPrimaryDataSourceName.ifPresent(optional ->
postPrimaryChangedEvent(databaseName, groupName, originalPrimaryDataSourceName,
optional));
Map<String, DataSource> replicaDataSourceMap = new
HashMap<>(dataSourceMap);
newPrimaryDataSourceName.ifPresent(replicaDataSourceMap::remove);
@@ -91,9 +95,19 @@ public final class DatabaseDiscoveryEngine {
return Optional.empty();
}
+ private void postComputeNodeStatusChangedEvent(final String
newPrimaryDataSourceName) {
+ if (Strings.isNullOrEmpty(newPrimaryDataSourceName) &&
StateType.OK.equals(instanceContext.getInstance().getState().getCurrentState()))
{
+ instanceContext.getEventBusContext().post(new
ComputeNodeStatusChangedEvent(instanceContext.getInstance().getCurrentInstanceId(),
StateType.READ_ONLY));
+ return;
+ }
+ if (!Strings.isNullOrEmpty(newPrimaryDataSourceName) &&
StateType.READ_ONLY.equals(instanceContext.getInstance().getState().getCurrentState()))
{
+ instanceContext.getEventBusContext().post(new
ComputeNodeStatusChangedEvent(instanceContext.getInstance().getCurrentInstanceId(),
StateType.OK));
+ }
+ }
+
private void postPrimaryChangedEvent(final String databaseName, final
String groupName, final String originalPrimaryDataSourceName, final String
newPrimaryDataSourceName) {
if (!newPrimaryDataSourceName.equals(originalPrimaryDataSourceName)) {
- eventBusContext.post(new PrimaryDataSourceChangedEvent(new
QualifiedDatabase(databaseName, groupName, newPrimaryDataSourceName)));
+ instanceContext.getEventBusContext().post(new
PrimaryDataSourceChangedEvent(new QualifiedDatabase(databaseName, groupName,
newPrimaryDataSourceName)));
}
}
@@ -104,16 +118,16 @@ public final class DatabaseDiscoveryEngine {
StorageNodeDataSource replicaStorageNode =
createReplicaStorageNode(loadReplicaStatus(entry.getValue()));
if (DataSourceState.ENABLED == replicaStorageNode.getStatus()) {
enabledReplicasCount +=
disabledDataSourceNames.contains(entry.getKey()) ? 1 : 0;
- eventBusContext.post(new DataSourceDisabledEvent(databaseName,
groupName, entry.getKey(), replicaStorageNode));
+ instanceContext.getEventBusContext().post(new
DataSourceDisabledEvent(databaseName, groupName, entry.getKey(),
replicaStorageNode));
continue;
}
if (provider.getMinEnabledReplicas().isPresent() && 0 ==
provider.getMinEnabledReplicas().get()) {
- eventBusContext.post(new DataSourceDisabledEvent(databaseName,
groupName, entry.getKey(), replicaStorageNode));
+ instanceContext.getEventBusContext().post(new
DataSourceDisabledEvent(databaseName, groupName, entry.getKey(),
replicaStorageNode));
continue;
}
if (enabledReplicasCount > provider.getMinEnabledReplicas().get())
{
enabledReplicasCount -=
disabledDataSourceNames.contains(entry.getKey()) ? 0 : 1;
- eventBusContext.post(new DataSourceDisabledEvent(databaseName,
groupName, entry.getKey(), replicaStorageNode));
+ instanceContext.getEventBusContext().post(new
DataSourceDisabledEvent(databaseName, groupName, entry.getKey(),
replicaStorageNode));
}
}
}
diff --git
a/features/db-discovery/core/src/main/java/org/apache/shardingsphere/dbdiscovery/heartbeat/HeartbeatJob.java
b/features/db-discovery/core/src/main/java/org/apache/shardingsphere/dbdiscovery/heartbeat/HeartbeatJob.java
index cab23c449bf..67ac474b253 100644
---
a/features/db-discovery/core/src/main/java/org/apache/shardingsphere/dbdiscovery/heartbeat/HeartbeatJob.java
+++
b/features/db-discovery/core/src/main/java/org/apache/shardingsphere/dbdiscovery/heartbeat/HeartbeatJob.java
@@ -22,7 +22,7 @@ import
org.apache.shardingsphere.dbdiscovery.algorithm.DatabaseDiscoveryEngine;
import org.apache.shardingsphere.dbdiscovery.spi.DatabaseDiscoveryProvider;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
-import org.apache.shardingsphere.infra.util.eventbus.EventBusContext;
+import org.apache.shardingsphere.infra.instance.InstanceContext;
import javax.sql.DataSource;
import java.util.Collection;
@@ -46,11 +46,11 @@ public final class HeartbeatJob implements SimpleJob {
private final Collection<String> disabledDataSourceNames;
- private final EventBusContext eventBusContext;
+ private final InstanceContext instanceContext;
@Override
public void execute(final ShardingContext shardingContext) {
- new DatabaseDiscoveryEngine(databaseDiscoveryProvider,
eventBusContext).changePrimaryDataSource(databaseName, groupName,
originalPrimaryDataSourceName,
+ new DatabaseDiscoveryEngine(databaseDiscoveryProvider,
instanceContext).changePrimaryDataSource(databaseName, groupName,
originalPrimaryDataSourceName,
dataSourceMap, disabledDataSourceNames);
}
}
diff --git
a/features/db-discovery/core/src/main/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryRule.java
b/features/db-discovery/core/src/main/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryRule.java
index d0a3063f13d..252481d87d8 100644
---
a/features/db-discovery/core/src/main/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryRule.java
+++
b/features/db-discovery/core/src/main/java/org/apache/shardingsphere/dbdiscovery/rule/DatabaseDiscoveryRule.java
@@ -112,7 +112,7 @@ public final class DatabaseDiscoveryRule implements
DatabaseRule, DataSourceCont
String groupName = entry.getKey();
DatabaseDiscoveryDataSourceRule dataSourceRule = entry.getValue();
Map<String, DataSource> originalDataSourceMap =
dataSourceRule.getDataSourceGroup(dataSourceMap);
- DatabaseDiscoveryEngine engine = new
DatabaseDiscoveryEngine(dataSourceRule.getProvider(),
instanceContext.getEventBusContext());
+ DatabaseDiscoveryEngine engine = new
DatabaseDiscoveryEngine(dataSourceRule.getProvider(), instanceContext);
engine.checkEnvironment(databaseName, originalDataSourceMap);
dataSourceRule.changePrimaryDataSourceName(engine.changePrimaryDataSource(
databaseName, groupName,
entry.getValue().getPrimaryDataSourceName(), originalDataSourceMap,
dataSourceRule.getDisabledDataSourceNames()));
@@ -177,7 +177,7 @@ public final class DatabaseDiscoveryRule implements
DatabaseRule, DataSourceCont
DatabaseDiscoveryDataSourceRule rule = entry.getValue();
String jobName = rule.getProvider().getType() + "-" + databaseName
+ "-" + rule.getGroupName();
CronJob job = new CronJob(jobName, each -> new
HeartbeatJob(databaseName, rule.getGroupName(),
rule.getPrimaryDataSourceName(), rule.getDataSourceGroup(dataSourceMap),
- rule.getProvider(), rule.getDisabledDataSourceNames(),
instanceContext.getEventBusContext()).execute(null),
+ rule.getProvider(), rule.getDisabledDataSourceNames(),
instanceContext).execute(null),
rule.getHeartbeatProps().getProperty("keep-alive-cron"));
scheduleContext.startSchedule(job);
}
diff --git
a/infra/common/src/main/java/org/apache/shardingsphere/infra/state/StateType.java
b/infra/common/src/main/java/org/apache/shardingsphere/infra/state/StateType.java
index e68c1d7e329..e87e24cf687 100644
---
a/infra/common/src/main/java/org/apache/shardingsphere/infra/state/StateType.java
+++
b/infra/common/src/main/java/org/apache/shardingsphere/infra/state/StateType.java
@@ -22,5 +22,5 @@ package org.apache.shardingsphere.infra.state;
*/
public enum StateType {
- OK, CIRCUIT_BREAK, LOCK
+ OK, CIRCUIT_BREAK, READ_ONLY, LOCK
}
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/ComputeNodeStatusChangedEvent.java
b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/compute/event/ComputeNodeStatusChangedEvent.java
similarity index 92%
copy from
mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/ComputeNodeStatusChangedEvent.java
copy to
mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/compute/event/ComputeNodeStatusChangedEvent.java
index fd7d5191ebe..d7f1df5bb67 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/ComputeNodeStatusChangedEvent.java
+++
b/mode/core/src/main/java/org/apache/shardingsphere/mode/metadata/compute/event/ComputeNodeStatusChangedEvent.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event;
+package org.apache.shardingsphere.mode.metadata.compute.event;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
@@ -28,7 +28,7 @@ import org.apache.shardingsphere.infra.state.StateType;
@Getter
public final class ComputeNodeStatusChangedEvent {
- private final StateType state;
-
private final String instanceId;
+
+ private final StateType state;
}
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/subscriber/ComputeNodeStatusSubscriber.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/subscriber/ComputeNodeStatusSubscriber.java
index 7f36f2b7acb..97e65f58f14 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/subscriber/ComputeNodeStatusSubscriber.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/subscriber/ComputeNodeStatusSubscriber.java
@@ -19,7 +19,7 @@ package
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.stat
import com.google.common.eventbus.Subscribe;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter;
-import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ComputeNodeStatusChangedEvent;
+import
org.apache.shardingsphere.mode.metadata.compute.event.ComputeNodeStatusChangedEvent;
import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.LabelsChangedEvent;
import org.apache.shardingsphere.mode.metadata.persist.node.ComputeNode;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
diff --git
a/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/SetInstanceStatusHandler.java
b/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/SetInstanceStatusHandler.java
index 3ab33a7c853..0c1010fc923 100644
---
a/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/SetInstanceStatusHandler.java
+++
b/proxy/backend/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/SetInstanceStatusHandler.java
@@ -22,7 +22,7 @@ import org.apache.shardingsphere.infra.state.StateType;
import
org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
import
org.apache.shardingsphere.infra.util.exception.external.sql.type.generic.UnsupportedSQLOperationException;
import org.apache.shardingsphere.mode.manager.ContextManager;
-import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ComputeNodeStatusChangedEvent;
+import
org.apache.shardingsphere.mode.metadata.compute.event.ComputeNodeStatusChangedEvent;
import
org.apache.shardingsphere.proxy.backend.handler.distsql.ral.UpdatableRALBackendHandler;
/**
@@ -40,7 +40,7 @@ public final class SetInstanceStatusHandler extends
UpdatableRALBackendHandler<S
} else {
checkEnablingIsValid(contextManager, instanceId);
}
- contextManager.getInstanceContext().getEventBusContext().post(new
ComputeNodeStatusChangedEvent(isDisable ? StateType.CIRCUIT_BREAK :
StateType.OK, instanceId));
+ contextManager.getInstanceContext().getEventBusContext().post(new
ComputeNodeStatusChangedEvent(instanceId, isDisable ? StateType.CIRCUIT_BREAK :
StateType.OK));
}
private void checkEnablingIsValid(final ContextManager contextManager,
final String instanceId) {
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/ComputeNodeStatusChangedEvent.java
b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/exception/ReadOnlyException.java
similarity index 60%
rename from
mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/ComputeNodeStatusChangedEvent.java
rename to
proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/exception/ReadOnlyException.java
index fd7d5191ebe..1e0e394fe7a 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/event/ComputeNodeStatusChangedEvent.java
+++
b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/exception/ReadOnlyException.java
@@ -15,20 +15,19 @@
* limitations under the License.
*/
-package
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event;
+package org.apache.shardingsphere.proxy.frontend.exception;
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import org.apache.shardingsphere.infra.state.StateType;
+import org.apache.shardingsphere.infra.exception.ConnectionSQLException;
+import
org.apache.shardingsphere.infra.util.exception.external.sql.sqlstate.XOpenSQLState;
/**
- * Compute node status changed event.
+ * Read only exception.
*/
-@RequiredArgsConstructor
-@Getter
-public final class ComputeNodeStatusChangedEvent {
+public final class ReadOnlyException extends ConnectionSQLException {
- private final StateType state;
+ private static final long serialVersionUID = 6339672680026286798L;
- private final String instanceId;
+ public ReadOnlyException() {
+ super(XOpenSQLState.GENERAL_WARNING, 11, "The current instance is
read-only, Not allowed write traffic.");
+ }
}
diff --git
a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/state/ProxyStateContext.java
b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/state/ProxyStateContext.java
index 776cd423594..7f1a4355c3c 100644
---
a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/state/ProxyStateContext.java
+++
b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/state/ProxyStateContext.java
@@ -27,6 +27,7 @@ import
org.apache.shardingsphere.proxy.frontend.spi.DatabaseProtocolFrontendEngi
import
org.apache.shardingsphere.proxy.frontend.state.impl.CircuitBreakProxyState;
import org.apache.shardingsphere.proxy.frontend.state.impl.OKProxyState;
import org.apache.shardingsphere.proxy.frontend.state.impl.LockProxyState;
+import org.apache.shardingsphere.proxy.frontend.state.impl.ReadOnlyProxyState;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -43,6 +44,7 @@ public final class ProxyStateContext {
STATES.put(StateType.OK, new OKProxyState());
STATES.put(StateType.LOCK, new LockProxyState());
STATES.put(StateType.CIRCUIT_BREAK, new CircuitBreakProxyState());
+ STATES.put(StateType.READ_ONLY, new ReadOnlyProxyState());
}
/**
diff --git
a/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/state/impl/ReadOnlyProxyState.java
b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/state/impl/ReadOnlyProxyState.java
new file mode 100644
index 00000000000..09bf8761366
--- /dev/null
+++
b/proxy/frontend/core/src/main/java/org/apache/shardingsphere/proxy/frontend/state/impl/ReadOnlyProxyState.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.proxy.frontend.state.impl;
+
+import io.netty.channel.ChannelHandlerContext;
+import org.apache.shardingsphere.db.protocol.packet.DatabasePacket;
+import
org.apache.shardingsphere.infra.binder.statement.CommonSQLStatementContext;
+import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
+import
org.apache.shardingsphere.infra.binder.statement.dml.SelectStatementContext;
+import org.apache.shardingsphere.infra.hint.HintManager;
+import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
+import org.apache.shardingsphere.proxy.frontend.exception.ReadOnlyException;
+import
org.apache.shardingsphere.proxy.frontend.spi.DatabaseProtocolFrontendEngine;
+import org.apache.shardingsphere.proxy.frontend.state.ProxyState;
+import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
+import
org.apache.shardingsphere.sql.parser.sql.common.statement.dml.SelectStatement;
+import
org.apache.shardingsphere.sql.parser.sql.dialect.handler.dml.SelectStatementHandler;
+
+import java.util.Optional;
+
+/**
+ * ReadOnly proxy state.
+ */
+public final class ReadOnlyProxyState implements ProxyState {
+
+ @Override
+ public void execute(final ChannelHandlerContext context, final Object
message, final DatabaseProtocolFrontendEngine databaseProtocolFrontendEngine,
final ConnectionSession connectionSession) {
+ if
(isPrimaryRoute(connectionSession.getQueryContext().getSqlStatementContext())) {
+
context.writeAndFlush(databaseProtocolFrontendEngine.getCommandExecuteEngine().getErrorPacket(new
ReadOnlyException()));
+ Optional<DatabasePacket<?>> databasePacket =
databaseProtocolFrontendEngine.getCommandExecuteEngine().getOtherPacket(connectionSession);
+ databasePacket.ifPresent(context::writeAndFlush);
+ }
+ }
+
+ private boolean isPrimaryRoute(final SQLStatementContext<?>
sqlStatementContext) {
+ return isWriteRouteStatement(sqlStatementContext) ||
isHintWriteRouteOnly(sqlStatementContext);
+ }
+
+ private boolean isWriteRouteStatement(final SQLStatementContext<?>
sqlStatementContext) {
+ SQLStatement sqlStatement = sqlStatementContext.getSqlStatement();
+ return containsLockSegment(sqlStatement) ||
containsLastInsertIdProjection(sqlStatementContext) || !(sqlStatement
instanceof SelectStatement);
+ }
+
+ private boolean containsLockSegment(final SQLStatement sqlStatement) {
+ return sqlStatement instanceof SelectStatement &&
SelectStatementHandler.getLockSegment((SelectStatement)
sqlStatement).isPresent();
+ }
+
+ private boolean containsLastInsertIdProjection(final
SQLStatementContext<?> sqlStatementContext) {
+ return sqlStatementContext instanceof SelectStatementContext &&
((SelectStatementContext)
sqlStatementContext).getProjectionsContext().isContainsLastInsertIdProjection();
+ }
+
+ private boolean isHintWriteRouteOnly(final SQLStatementContext<?>
sqlStatementContext) {
+ return HintManager.isWriteRouteOnly() || (sqlStatementContext
instanceof CommonSQLStatementContext && ((CommonSQLStatementContext<?>)
sqlStatementContext).isHintWriteRouteOnly());
+ }
+}