This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new aa5ea904ac3 Remove ComputeNodeInstanceContextAware interface for
ClusterPersistRepository (#31450)
aa5ea904ac3 is described below
commit aa5ea904ac37516553010dc3346d700e6ba7bf7e
Author: Haoran Meng <[email protected]>
AuthorDate: Thu May 30 22:59:52 2024 +0800
Remove ComputeNodeInstanceContextAware interface for
ClusterPersistRepository (#31450)
* Remove ComputeNodeInstanceContextAware interface for
ClusterPersistRepository
* Remove ComputeNodeInstanceContextAware interface for
ClusterPersistRepository
* Remove ComputeNodeInstanceContextAware interface for
ClusterPersistRepository
---
.../manager/cluster/ClusterContextManagerBuilder.java | 12 ++++--------
.../fixture/ClusterPersistRepositoryFixture.java | 3 ++-
.../ProcessListClusterPersistRepositoryFixture.java | 3 ++-
.../repository/cluster/ClusterPersistRepository.java | 4 +++-
.../mode/repository/cluster/etcd/EtcdRepository.java | 7 ++++---
.../cluster/zookeeper/ZookeeperRepository.java | 11 +++--------
.../cluster/zookeeper/ZookeeperRepositoryTest.java | 16 +++++++++++-----
.../proxy/fixture/ClusterPersistRepositoryFixture.java | 3 ++-
.../it/data/pipeline/core/util/PipelineContextUtils.java | 5 ++++-
9 files changed, 35 insertions(+), 29 deletions(-)
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
index f6409ee689d..9c17de92bf4 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/ClusterContextManagerBuilder.java
@@ -21,7 +21,6 @@ import
org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
import
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
import org.apache.shardingsphere.infra.instance.ComputeNodeInstance;
import org.apache.shardingsphere.infra.instance.ComputeNodeInstanceContext;
-import
org.apache.shardingsphere.infra.instance.ComputeNodeInstanceContextAware;
import org.apache.shardingsphere.infra.instance.metadata.InstanceMetaData;
import
org.apache.shardingsphere.infra.instance.metadata.jdbc.JDBCInstanceMetaData;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
@@ -60,11 +59,10 @@ public final class ClusterContextManagerBuilder implements
ContextManagerBuilder
@Override
public ContextManager build(final ContextManagerBuilderParameter param,
final EventBusContext eventBusContext) throws SQLException {
ModeConfiguration modeConfig = param.getModeConfiguration();
- ClusterPersistRepository repository =
getClusterPersistRepository((ClusterPersistRepositoryConfiguration)
modeConfig.getRepository());
+ ClusterPersistRepositoryConfiguration config =
(ClusterPersistRepositoryConfiguration) modeConfig.getRepository();
+ ClusterPersistRepository repository =
getClusterPersistRepository(config);
ComputeNodeInstanceContext computeNodeInstanceContext =
buildComputeNodeInstanceContext(modeConfig, param.getInstanceMetaData(),
repository, eventBusContext);
- if (repository instanceof ComputeNodeInstanceContextAware) {
- ((ComputeNodeInstanceContextAware)
repository).setComputeNodeInstanceContext(computeNodeInstanceContext);
- }
+ repository.init(config, computeNodeInstanceContext);
MetaDataPersistService metaDataPersistService = new
MetaDataPersistService(repository);
MetaDataContexts metaDataContexts =
MetaDataContextsFactory.create(metaDataPersistService, param,
computeNodeInstanceContext, new
QualifiedDataSourceStatusService(repository).loadStatus());
ContextManager result = new ContextManager(metaDataContexts,
computeNodeInstanceContext, repository);
@@ -76,9 +74,7 @@ public final class ClusterContextManagerBuilder implements
ContextManagerBuilder
private ClusterPersistRepository getClusterPersistRepository(final
ClusterPersistRepositoryConfiguration config) {
ShardingSpherePreconditions.checkNotNull(config,
MissingRequiredClusterRepositoryConfigurationException::new);
- ClusterPersistRepository result =
TypedSPILoader.getService(ClusterPersistRepository.class, config.getType(),
config.getProps());
- result.init(config);
- return result;
+ return TypedSPILoader.getService(ClusterPersistRepository.class,
config.getType(), config.getProps());
}
private ComputeNodeInstanceContext buildComputeNodeInstanceContext(final
ModeConfiguration modeConfig,
diff --git
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/fixture/ClusterPersistRepositoryFixture.java
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/fixture/ClusterPersistRepositoryFixture.java
index 66e0ae46492..b2d716bb810 100644
---
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/fixture/ClusterPersistRepositoryFixture.java
+++
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/fixture/ClusterPersistRepositoryFixture.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.mode.manager.cluster.coordinator.fixture;
+import org.apache.shardingsphere.infra.instance.ComputeNodeInstanceContext;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
import
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
@@ -28,7 +29,7 @@ import java.util.List;
public final class ClusterPersistRepositoryFixture implements
ClusterPersistRepository {
@Override
- public void init(final ClusterPersistRepositoryConfiguration config) {
+ public void init(final ClusterPersistRepositoryConfiguration config, final
ComputeNodeInstanceContext computeNodeInstanceContext) {
}
@Override
diff --git
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/ProcessListClusterPersistRepositoryFixture.java
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/ProcessListClusterPersistRepositoryFixture.java
index c4b40c48700..9e63ea850fa 100644
---
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/ProcessListClusterPersistRepositoryFixture.java
+++
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/process/ProcessListClusterPersistRepositoryFixture.java
@@ -17,6 +17,7 @@
package
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.process;
+import org.apache.shardingsphere.infra.instance.ComputeNodeInstanceContext;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
import
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
@@ -32,7 +33,7 @@ public final class ProcessListClusterPersistRepositoryFixture
implements Cluster
private static final Map<String, String> REGISTRY_DATA = new
LinkedHashMap<>();
@Override
- public void init(final ClusterPersistRepositoryConfiguration config) {
+ public void init(final ClusterPersistRepositoryConfiguration config, final
ComputeNodeInstanceContext computeNodeInstanceContext) {
}
@Override
diff --git
a/mode/type/cluster/repository/api/src/main/java/org/apache/shardingsphere/mode/repository/cluster/ClusterPersistRepository.java
b/mode/type/cluster/repository/api/src/main/java/org/apache/shardingsphere/mode/repository/cluster/ClusterPersistRepository.java
index b240ecea27b..4a9e5fec657 100644
---
a/mode/type/cluster/repository/api/src/main/java/org/apache/shardingsphere/mode/repository/cluster/ClusterPersistRepository.java
+++
b/mode/type/cluster/repository/api/src/main/java/org/apache/shardingsphere/mode/repository/cluster/ClusterPersistRepository.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.mode.repository.cluster;
+import org.apache.shardingsphere.infra.instance.ComputeNodeInstanceContext;
import
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
import
org.apache.shardingsphere.mode.repository.cluster.lock.holder.DistributedLockHolder;
import org.apache.shardingsphere.mode.spi.PersistRepository;
@@ -30,8 +31,9 @@ public interface ClusterPersistRepository extends
PersistRepository {
* Initialize registry center.
*
* @param config cluster persist repository configuration
+ * @param computeNodeInstanceContext compute node instance context
*/
- void init(ClusterPersistRepositoryConfiguration config);
+ void init(ClusterPersistRepositoryConfiguration config,
ComputeNodeInstanceContext computeNodeInstanceContext);
/**
* Persist exclusive ephemeral data.
diff --git
a/mode/type/cluster/repository/provider/etcd/src/main/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepository.java
b/mode/type/cluster/repository/provider/etcd/src/main/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepository.java
index 71ef6405467..999457f7324 100644
---
a/mode/type/cluster/repository/provider/etcd/src/main/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepository.java
+++
b/mode/type/cluster/repository/provider/etcd/src/main/java/org/apache/shardingsphere/mode/repository/cluster/etcd/EtcdRepository.java
@@ -35,12 +35,13 @@ import io.etcd.jetcd.watch.WatchEvent;
import lombok.Getter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.infra.instance.ComputeNodeInstanceContext;
+import org.apache.shardingsphere.mode.event.DataChangedEvent;
+import org.apache.shardingsphere.mode.event.DataChangedEvent.Type;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
import
org.apache.shardingsphere.mode.repository.cluster.etcd.props.EtcdProperties;
import
org.apache.shardingsphere.mode.repository.cluster.etcd.props.EtcdPropertyKey;
-import org.apache.shardingsphere.mode.event.DataChangedEvent;
-import org.apache.shardingsphere.mode.event.DataChangedEvent.Type;
import
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
import
org.apache.shardingsphere.mode.repository.cluster.lock.holder.DistributedLockHolder;
@@ -68,7 +69,7 @@ public final class EtcdRepository implements
ClusterPersistRepository {
private DistributedLockHolder distributedLockHolder;
@Override
- public void init(final ClusterPersistRepositoryConfiguration config) {
+ public void init(final ClusterPersistRepositoryConfiguration config, final
ComputeNodeInstanceContext computeNodeInstanceContext) {
etcdProps = new EtcdProperties(config.getProps());
client =
Client.builder().endpoints(Util.toURIs(Splitter.on(",").trimResults().splitToList(config.getServerLists())))
.namespace(ByteSequence.from(config.getNamespace(),
StandardCharsets.UTF_8))
diff --git
a/mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/ZookeeperRepository.java
b/mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/ZookeeperRepository.java
index 76e20a97f79..1cfae70b242 100644
---
a/mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/ZookeeperRepository.java
+++
b/mode/type/cluster/repository/provider/zookeeper/src/main/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/ZookeeperRepository.java
@@ -28,7 +28,6 @@ import
org.apache.curator.framework.recipes.cache.CuratorCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.CloseableUtils;
import org.apache.shardingsphere.infra.instance.ComputeNodeInstanceContext;
-import
org.apache.shardingsphere.infra.instance.ComputeNodeInstanceContextAware;
import org.apache.shardingsphere.mode.event.DataChangedEvent;
import org.apache.shardingsphere.mode.event.DataChangedEvent.Type;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
@@ -59,7 +58,7 @@ import java.util.concurrent.TimeUnit;
/**
* Registry repository of ZooKeeper.
*/
-public final class ZookeeperRepository implements ClusterPersistRepository,
ComputeNodeInstanceContextAware {
+public final class ZookeeperRepository implements ClusterPersistRepository {
private final Map<String, CuratorCache> caches = new ConcurrentHashMap<>();
@@ -71,10 +70,11 @@ public final class ZookeeperRepository implements
ClusterPersistRepository, Comp
private DistributedLockHolder distributedLockHolder;
@Override
- public void init(final ClusterPersistRepositoryConfiguration config) {
+ public void init(final ClusterPersistRepositoryConfiguration config, final
ComputeNodeInstanceContext computeNodeInstanceContext) {
ZookeeperProperties zookeeperProps = new
ZookeeperProperties(config.getProps());
client = buildCuratorClient(config, zookeeperProps);
distributedLockHolder = new DistributedLockHolder(getType(), client,
zookeeperProps);
+ client.getConnectionStateListenable().addListener(new
SessionConnectionReconnectListener(computeNodeInstanceContext, this));
initCuratorClient(zookeeperProps);
}
@@ -274,11 +274,6 @@ public final class ZookeeperRepository implements
ClusterPersistRepository, Comp
}
}
- @Override
- public void setComputeNodeInstanceContext(final ComputeNodeInstanceContext
computeNodeInstanceContext) {
- client.getConnectionStateListenable().addListener(new
SessionConnectionReconnectListener(computeNodeInstanceContext, this));
- }
-
@Override
public String getType() {
return "ZooKeeper";
diff --git
a/mode/type/cluster/repository/provider/zookeeper/src/test/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/ZookeeperRepositoryTest.java
b/mode/type/cluster/repository/provider/zookeeper/src/test/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/ZookeeperRepositoryTest.java
index 6d9f952818f..c282b74c4c4 100644
---
a/mode/type/cluster/repository/provider/zookeeper/src/test/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/ZookeeperRepositoryTest.java
+++
b/mode/type/cluster/repository/provider/zookeeper/src/test/java/org/apache/shardingsphere/mode/repository/cluster/zookeeper/ZookeeperRepositoryTest.java
@@ -29,6 +29,8 @@ import org.apache.curator.framework.api.ExistsBuilder;
import org.apache.curator.framework.api.GetChildrenBuilder;
import
org.apache.curator.framework.api.ProtectACLCreateModeStatPathAndBytesable;
import org.apache.curator.framework.api.SetDataBuilder;
+import org.apache.curator.framework.listen.Listenable;
+import org.apache.shardingsphere.infra.instance.ComputeNodeInstanceContext;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
import
org.apache.shardingsphere.mode.repository.cluster.lock.holder.DistributedLockHolder;
import
org.apache.shardingsphere.mode.repository.cluster.zookeeper.lock.ZookeeperDistributedLock;
@@ -107,7 +109,7 @@ class ZookeeperRepositoryTest {
mockClient();
mockBuilder();
ClusterPersistRepositoryConfiguration config = new
ClusterPersistRepositoryConfiguration(REPOSITORY.getType(), "governance",
SERVER_LISTS, new Properties());
- REPOSITORY.init(config);
+ REPOSITORY.init(config, mock(ComputeNodeInstanceContext.class));
mockDistributedLockHolder();
}
@@ -141,6 +143,7 @@ class ZookeeperRepositoryTest {
when(client.delete()).thenReturn(deleteBuilder);
when(deleteBuilder.deletingChildrenIfNeeded()).thenReturn(backgroundVersionable);
when(client.getChildren()).thenReturn(getChildrenBuilder);
+
when(client.getConnectionStateListenable()).thenReturn(mock(Listenable.class));
}
@Test
@@ -188,25 +191,28 @@ class ZookeeperRepositoryTest {
new Property(ZookeeperPropertyKey.MAX_RETRIES.getKey(), "1"),
new
Property(ZookeeperPropertyKey.TIME_TO_LIVE_SECONDS.getKey(), "1000"),
new
Property(ZookeeperPropertyKey.OPERATION_TIMEOUT_MILLISECONDS.getKey(), "2000"));
- assertDoesNotThrow(() -> REPOSITORY.init(new
ClusterPersistRepositoryConfiguration(REPOSITORY.getType(), "governance",
SERVER_LISTS, props)));
+ assertDoesNotThrow(() -> REPOSITORY.init(new
ClusterPersistRepositoryConfiguration(REPOSITORY.getType(), "governance",
SERVER_LISTS, props),
+ mock(ComputeNodeInstanceContext.class)));
}
@Test
void assertBuildCuratorClientWithTimeToLiveSecondsEqualsZero() {
assertDoesNotThrow(() -> REPOSITORY.init(new
ClusterPersistRepositoryConfiguration(
- REPOSITORY.getType(), "governance", SERVER_LISTS,
PropertiesBuilder.build(new
Property(ZookeeperPropertyKey.TIME_TO_LIVE_SECONDS.getKey(), "0")))));
+ REPOSITORY.getType(), "governance", SERVER_LISTS,
PropertiesBuilder.build(new
Property(ZookeeperPropertyKey.TIME_TO_LIVE_SECONDS.getKey(), "0"))),
+ mock(ComputeNodeInstanceContext.class)));
}
@Test
void assertBuildCuratorClientWithOperationTimeoutMillisecondsEqualsZero() {
assertDoesNotThrow(() -> REPOSITORY.init(new
ClusterPersistRepositoryConfiguration(
- REPOSITORY.getType(), "governance", SERVER_LISTS,
PropertiesBuilder.build(new
Property(ZookeeperPropertyKey.OPERATION_TIMEOUT_MILLISECONDS.getKey(), "0")))));
+ REPOSITORY.getType(), "governance", SERVER_LISTS,
PropertiesBuilder.build(new
Property(ZookeeperPropertyKey.OPERATION_TIMEOUT_MILLISECONDS.getKey(), "0"))),
+ mock(ComputeNodeInstanceContext.class)));
}
@Test
void assertBuildCuratorClientWithDigest() {
REPOSITORY.init(new
ClusterPersistRepositoryConfiguration(REPOSITORY.getType(), "governance",
SERVER_LISTS,
- PropertiesBuilder.build(new
Property(ZookeeperPropertyKey.DIGEST.getKey(), "any"))));
+ PropertiesBuilder.build(new
Property(ZookeeperPropertyKey.DIGEST.getKey(), "any"))),
mock(ComputeNodeInstanceContext.class));
verify(builder).aclProvider(any(ACLProvider.class));
}
diff --git
a/proxy/bootstrap/src/test/java/org/apache/shardingsphere/proxy/fixture/ClusterPersistRepositoryFixture.java
b/proxy/bootstrap/src/test/java/org/apache/shardingsphere/proxy/fixture/ClusterPersistRepositoryFixture.java
index 497530abe69..cf9924afaba 100644
---
a/proxy/bootstrap/src/test/java/org/apache/shardingsphere/proxy/fixture/ClusterPersistRepositoryFixture.java
+++
b/proxy/bootstrap/src/test/java/org/apache/shardingsphere/proxy/fixture/ClusterPersistRepositoryFixture.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.proxy.fixture;
+import org.apache.shardingsphere.infra.instance.ComputeNodeInstanceContext;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositoryConfiguration;
import
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
@@ -32,7 +33,7 @@ public final class ClusterPersistRepositoryFixture implements
ClusterPersistRepo
private static final Map<String, String> REGISTRY_DATA = new
LinkedHashMap<>();
@Override
- public void init(final ClusterPersistRepositoryConfiguration config) {
+ public void init(final ClusterPersistRepositoryConfiguration config, final
ComputeNodeInstanceContext computeNodeInstanceContext) {
}
@Override
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java
index c744d85ca0a..9f4c2847fd6 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java
@@ -50,6 +50,7 @@ import
org.apache.shardingsphere.infra.config.rule.RuleConfiguration;
import
org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
import org.apache.shardingsphere.infra.datanode.DataNode;
+import org.apache.shardingsphere.infra.instance.ComputeNodeInstanceContext;
import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
import
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveIdentifier;
import
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveQualifiedTable;
@@ -82,6 +83,8 @@ import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
+import static org.mockito.Mockito.mock;
+
/**
* Pipeline context utility class.
*/
@@ -125,7 +128,7 @@ public final class PipelineContextUtils {
private static ClusterPersistRepository getClusterPersistRepository(final
ClusterPersistRepositoryConfiguration repositoryConfig) {
ClusterPersistRepository result =
TypedSPILoader.getService(ClusterPersistRepository.class,
repositoryConfig.getType(), repositoryConfig.getProps());
- result.init(repositoryConfig);
+ result.init(repositoryConfig, mock(ComputeNodeInstanceContext.class));
return result;
}