This is an automated email from the ASF dual-hosted git repository.
sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 1cfc95dccbb Refactor ComputeNodeInstanceContext (#31816)
1cfc95dccbb is described below
commit 1cfc95dccbba093b45c817ed39f4ee085bf3ffb3
Author: Haoran Meng <[email protected]>
AuthorDate: Mon Jun 24 10:06:06 2024 +0800
Refactor ComputeNodeInstanceContext (#31816)
---
.../infra/instance/ComputeNodeInstanceContext.java | 47 +++++++++++++++++++---
.../cluster/ClusterContextManagerBuilder.java | 9 ++---
2 files changed, 46 insertions(+), 10 deletions(-)
diff --git
a/infra/common/src/main/java/org/apache/shardingsphere/infra/instance/ComputeNodeInstanceContext.java
b/infra/common/src/main/java/org/apache/shardingsphere/infra/instance/ComputeNodeInstanceContext.java
index d0a9e53575d..b8f3b33235c 100644
---
a/infra/common/src/main/java/org/apache/shardingsphere/infra/instance/ComputeNodeInstanceContext.java
+++
b/infra/common/src/main/java/org/apache/shardingsphere/infra/instance/ComputeNodeInstanceContext.java
@@ -17,9 +17,9 @@
package org.apache.shardingsphere.infra.instance;
+import com.google.common.base.Preconditions;
import lombok.AccessLevel;
import lombok.Getter;
-import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
import org.apache.shardingsphere.infra.instance.metadata.InstanceMetaData;
import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
@@ -35,11 +35,11 @@ import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicReference;
/**
* Compute node instance context.
*/
-@RequiredArgsConstructor
@Getter
@ThreadSafe
public final class ComputeNodeInstanceContext {
@@ -47,17 +47,42 @@ public final class ComputeNodeInstanceContext {
private final ComputeNodeInstance instance;
@Getter(AccessLevel.NONE)
- private final WorkerIdGenerator workerIdGenerator;
+ private final AtomicReference<WorkerIdGenerator> workerIdGenerator = new
AtomicReference<>();
private final ModeConfiguration modeConfiguration;
@SuppressWarnings("rawtypes")
- private final LockContext lockContext;
+ @Getter(AccessLevel.NONE)
+ private final AtomicReference<LockContext> lockContext = new
AtomicReference<>();
private final EventBusContext eventBusContext;
private final Collection<ComputeNodeInstance> allClusterInstances = new
CopyOnWriteArrayList<>();
+ public ComputeNodeInstanceContext(final ComputeNodeInstance instance,
final WorkerIdGenerator workerIdGenerator, final ModeConfiguration
modeConfiguration,
+ final LockContext lockContext, final
EventBusContext eventBusContext) {
+ this.instance = instance;
+ this.workerIdGenerator.set(workerIdGenerator);
+ this.modeConfiguration = modeConfiguration;
+ this.lockContext.set(lockContext);
+ this.eventBusContext = eventBusContext;
+ }
+
+ public ComputeNodeInstanceContext(final ComputeNodeInstance instance,
final ModeConfiguration modeConfiguration, final EventBusContext
eventBusContext) {
+ this(instance, null, modeConfiguration, null, eventBusContext);
+ }
+
+ /**
+ * Initialize compute node instance context.
+ *
+ * @param workerIdGenerator worker id generator
+ * @param lockContext lock context
+ */
+ public void init(final WorkerIdGenerator workerIdGenerator, final
LockContext lockContext) {
+ this.workerIdGenerator.set(workerIdGenerator);
+ this.lockContext.set(lockContext);
+ }
+
/**
* Update instance status.
*
@@ -131,7 +156,8 @@ public final class ComputeNodeInstanceContext {
* @return worker id
*/
public int generateWorkerId(final Properties props) {
- int result = workerIdGenerator.generate(props);
+ Preconditions.checkArgument(workerIdGenerator.get() != null, "Worker
id generator is not initialized.");
+ int result = workerIdGenerator.get().generate(props);
instance.setWorkerId(result);
return result;
}
@@ -190,4 +216,15 @@ public final class ComputeNodeInstanceContext {
public boolean isCluster() {
return "Cluster".equals(modeConfiguration.getType());
}
+
+ /**
+ * Get lock context.
+ *
+ * @return lock context
+ * @throws IllegalStateException if lock context is not initialized
+ */
+ @SuppressWarnings("rawtypes")
+ public LockContext getLockContext() throws IllegalStateException {
+ return Optional.ofNullable(lockContext.get()).orElseThrow(() -> new
IllegalStateException("Lock context is not initialized."));
+ }
}
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
index 3cbd555ce6b..7ee47880de6 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
@@ -55,9 +55,10 @@ public final class ClusterContextManagerBuilder implements
ContextManagerBuilder
public ContextManager build(final ContextManagerBuilderParameter param,
final EventBusContext eventBusContext) throws SQLException {
ModeConfiguration modeConfig = param.getModeConfiguration();
ClusterPersistRepositoryConfiguration config =
(ClusterPersistRepositoryConfiguration) modeConfig.getRepository();
+ ComputeNodeInstanceContext computeNodeInstanceContext =
buildComputeNodeInstanceContext(modeConfig, param.getInstanceMetaData(),
eventBusContext, param.getLabels());
ClusterPersistRepository repository =
getClusterPersistRepository(config);
- ComputeNodeInstanceContext computeNodeInstanceContext =
buildComputeNodeInstanceContext(modeConfig, param.getInstanceMetaData(),
repository, eventBusContext, param.getLabels());
repository.init(config, computeNodeInstanceContext);
+ computeNodeInstanceContext.init(new
ClusterWorkerIdGenerator(repository, param.getInstanceMetaData().getId()), new
GlobalLockContext(new GlobalLockPersistService(repository)));
MetaDataPersistService metaDataPersistService = new
MetaDataPersistService(repository);
MetaDataContexts metaDataContexts =
MetaDataContextsFactory.create(metaDataPersistService, param,
computeNodeInstanceContext);
ContextManager result = new ContextManager(metaDataContexts,
computeNodeInstanceContext, repository);
@@ -71,10 +72,8 @@ public final class ClusterContextManagerBuilder implements
ContextManagerBuilder
}
private ComputeNodeInstanceContext buildComputeNodeInstanceContext(final
ModeConfiguration modeConfig, final InstanceMetaData instanceMetaData,
- final
ClusterPersistRepository repository, final EventBusContext eventBusContext,
final Collection<String> labels) {
- return new ComputeNodeInstanceContext(new
ComputeNodeInstance(instanceMetaData, labels),
- new ClusterWorkerIdGenerator(repository,
instanceMetaData.getId()), modeConfig,
- new GlobalLockContext(new
GlobalLockPersistService(repository)), eventBusContext);
+ final
EventBusContext eventBusContext, final Collection<String> labels) {
+ return new ComputeNodeInstanceContext(new
ComputeNodeInstance(instanceMetaData, labels), modeConfig, eventBusContext);
}
private void registerOnline(final EventBusContext eventBusContext, final
ComputeNodeInstanceContext computeNodeInstanceContext,