This is an automated email from the ASF dual-hosted git repository.
menghaoran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 48413303752 Add ReservationPersistService to reserve worker id for
ClusterWorkerIdGenerator (#31981)
48413303752 is described below
commit 48413303752bb1f6853e9890fd7524c9993531ba
Author: Haoran Meng <[email protected]>
AuthorDate: Thu Jul 4 17:35:49 2024 +0800
Add ReservationPersistService to reserve worker id for
ClusterWorkerIdGenerator (#31981)
---
.../metadata/persist/node/ReservationNode.java | 6 +--
.../listener/MetaDataChangedListener.java | 4 +-
.../ComputeNodeStateDispatchEventBuilder.java | 4 +-
.../dispatch/CacheEvictedSubscriber.java | 2 +-
.../workerid/ClusterWorkerIdGenerator.java | 17 +++-----
.../cluster/persist/ReservationPersistService.java | 49 ++++++++++++++++++++++
...ationNodeTest.java => ReservationNodeTest.java} | 5 ++-
.../dispatch/CacheEvictedSubscriberTest.java | 2 +-
8 files changed, 67 insertions(+), 22 deletions(-)
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/workerid/node/WorkerIdReservationNode.java
b/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/node/ReservationNode.java
similarity index 89%
rename from
mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/workerid/node/WorkerIdReservationNode.java
rename to
kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/node/ReservationNode.java
index 0cbfa6b6127..44611bdae95 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/workerid/node/WorkerIdReservationNode.java
+++
b/kernel/metadata/core/src/main/java/org/apache/shardingsphere/metadata/persist/node/ReservationNode.java
@@ -15,16 +15,16 @@
* limitations under the License.
*/
-package
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.workerid.node;
+package org.apache.shardingsphere.metadata.persist.node;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
/**
- * Worker id reservation node.
+ * Reservation node.
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
-public final class WorkerIdReservationNode {
+public final class ReservationNode {
private static final String ROOT_NODE = "reservation";
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/listener/MetaDataChangedListener.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/listener/MetaDataChangedListener.java
index ecdc89b566a..18ac369a014 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/listener/MetaDataChangedListener.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/listener/MetaDataChangedListener.java
@@ -56,10 +56,10 @@ public final class MetaDataChangedListener implements
DataChangedEventListener {
@Override
public void onChange(final DataChangedEvent event) {
- createGovernanceEvent(event).ifPresent(eventBusContext::post);
+ createDispatchEvent(event).ifPresent(eventBusContext::post);
}
- private Optional<DispatchEvent> createGovernanceEvent(final
DataChangedEvent event) {
+ private Optional<DispatchEvent> createDispatchEvent(final DataChangedEvent
event) {
String key = event.getKey();
Optional<String> databaseName =
DatabaseMetaDataNode.getDatabaseNameBySchemaNode(key);
Optional<String> schemaName = DatabaseMetaDataNode.getSchemaName(key);
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/builder/ComputeNodeStateDispatchEventBuilder.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/builder/ComputeNodeStateDispatchEventBuilder.java
index 4c21fb1a2b4..7c334108657 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/builder/ComputeNodeStateDispatchEventBuilder.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/builder/ComputeNodeStateDispatchEventBuilder.java
@@ -58,7 +58,7 @@ public final class ComputeNodeStateDispatchEventBuilder
implements DispatchEvent
public Optional<DispatchEvent> build(final DataChangedEvent event) {
String instanceId =
ComputeNode.getInstanceIdByComputeNode(event.getKey());
if (!Strings.isNullOrEmpty(instanceId)) {
- Optional<DispatchEvent> result =
createInstanceGovernanceEvent(event, instanceId);
+ Optional<DispatchEvent> result =
createInstanceDispatchEvent(event, instanceId);
if (result.isPresent()) {
return result;
}
@@ -73,7 +73,7 @@ public final class ComputeNodeStateDispatchEventBuilder
implements DispatchEvent
}
@SuppressWarnings("unchecked")
- private Optional<DispatchEvent> createInstanceGovernanceEvent(final
DataChangedEvent event, final String instanceId) {
+ private Optional<DispatchEvent> createInstanceDispatchEvent(final
DataChangedEvent event, final String instanceId) {
if
(event.getKey().equals(ComputeNode.getComputeNodeStateNodePath(instanceId)) &&
Type.DELETED != event.getType()) {
return Optional.of(new
ComputeNodeInstanceStateChangedEvent(instanceId, event.getValue()));
}
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/subscriber/dispatch/CacheEvictedSubscriber.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/subscriber/dispatch/CacheEvictedSubscriber.java
index 142bb48a91d..a64b6a4d800 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/subscriber/dispatch/CacheEvictedSubscriber.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/event/subscriber/dispatch/CacheEvictedSubscriber.java
@@ -34,7 +34,7 @@ public final class CacheEvictedSubscriber implements
EventSubscriber {
* @param ignored unused
*/
@Subscribe
- public void onGovernanceEvent(final DispatchEvent ignored) {
+ public void cleanCache(final DispatchEvent ignored) {
OrderedServicesCache.clearCache();
}
}
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/instance/workerid/ClusterWorkerIdGenerator.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/instance/workerid/ClusterWorkerIdGenerator.java
index dd449f523dd..b2d2a11908c 100644
---
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/instance/workerid/ClusterWorkerIdGenerator.java
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/instance/workerid/ClusterWorkerIdGenerator.java
@@ -22,10 +22,9 @@ import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
import
org.apache.shardingsphere.infra.instance.workerid.WorkerIdAssignedException;
import org.apache.shardingsphere.infra.instance.workerid.WorkerIdGenerator;
-import
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.workerid.node.WorkerIdReservationNode;
-import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
-import
org.apache.shardingsphere.mode.repository.cluster.exception.ClusterPersistRepositoryException;
+import
org.apache.shardingsphere.mode.manager.cluster.persist.ReservationPersistService;
import
org.apache.shardingsphere.mode.persist.service.ComputeNodePersistService;
+import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
import java.util.Collection;
import java.util.Optional;
@@ -41,18 +40,18 @@ import java.util.stream.IntStream;
@Slf4j
public final class ClusterWorkerIdGenerator implements WorkerIdGenerator {
- private final ClusterPersistRepository repository;
-
private final String instanceId;
private final ComputeNodePersistService computeNodePersistService;
+ private final ReservationPersistService reservationPersistService;
+
private final AtomicBoolean isWarned = new AtomicBoolean(false);
public ClusterWorkerIdGenerator(final ClusterPersistRepository repository,
final String instanceId) {
- this.repository = repository;
this.instanceId = instanceId;
computeNodePersistService = new ComputeNodePersistService(repository);
+ reservationPersistService = new ReservationPersistService(repository);
}
@Override
@@ -82,11 +81,7 @@ public final class ClusterWorkerIdGenerator implements
WorkerIdGenerator {
PriorityQueue<Integer> availableWorkerIds = IntStream.range(0,
1024).boxed().filter(each ->
!assignedWorkerIds.contains(each)).collect(Collectors.toCollection(PriorityQueue::new));
Integer preselectedWorkerId = availableWorkerIds.poll();
Preconditions.checkNotNull(preselectedWorkerId, "Preselected worker-id
can not be null.");
- try {
- return
repository.persistExclusiveEphemeral(WorkerIdReservationNode.getWorkerIdReservationPath(preselectedWorkerId),
instanceId) ? Optional.of(preselectedWorkerId) : Optional.empty();
- } catch (final ClusterPersistRepositoryException ignore) {
- return Optional.empty();
- }
+ return reservationPersistService.reserveWorkerId(preselectedWorkerId,
instanceId);
}
private void logWarning(final int generatedWorkerId, final Properties
props) {
diff --git
a/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/persist/ReservationPersistService.java
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/persist/ReservationPersistService.java
new file mode 100644
index 00000000000..90058e37eb1
--- /dev/null
+++
b/mode/type/cluster/core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/persist/ReservationPersistService.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.manager.cluster.persist;
+
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.metadata.persist.node.ReservationNode;
+import
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepository;
+import
org.apache.shardingsphere.mode.repository.cluster.exception.ClusterPersistRepositoryException;
+
+import java.util.Optional;
+
+/**
+ * Reservation persist service.
+ */
+@RequiredArgsConstructor
+public final class ReservationPersistService {
+
+ private final ClusterPersistRepository repository;
+
+ /**
+ * Reserve worker ID.
+ *
+ * @param preselectedWorkerId preselected worker ID
+ * @param instanceId instance ID
+ * @return worker ID
+ */
+ public Optional<Integer> reserveWorkerId(final Integer
preselectedWorkerId, final String instanceId) {
+ try {
+ return
repository.persistExclusiveEphemeral(ReservationNode.getWorkerIdReservationPath(preselectedWorkerId),
instanceId) ? Optional.of(preselectedWorkerId) : Optional.empty();
+ } catch (final ClusterPersistRepositoryException ignore) {
+ return Optional.empty();
+ }
+ }
+}
diff --git
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/workerid/node/WorkerIdReservationNodeTest.java
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/workerid/node/ReservationNodeTest.java
similarity index 84%
rename from
mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/workerid/node/WorkerIdReservationNodeTest.java
rename to
mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/workerid/node/ReservationNodeTest.java
index 07ec342f810..c569f535065 100644
---
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/workerid/node/WorkerIdReservationNodeTest.java
+++
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/workerid/node/ReservationNodeTest.java
@@ -17,15 +17,16 @@
package
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.workerid.node;
+import org.apache.shardingsphere.metadata.persist.node.ReservationNode;
import org.junit.jupiter.api.Test;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
-class WorkerIdReservationNodeTest {
+class ReservationNodeTest {
@Test
void assertGetWorkerIdReservationPath() {
- assertThat(WorkerIdReservationNode.getWorkerIdReservationPath(1),
is("/reservation/worker_id/1"));
+ assertThat(ReservationNode.getWorkerIdReservationPath(1),
is("/reservation/worker_id/1"));
}
}
diff --git
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/event/subscriber/dispatch/CacheEvictedSubscriberTest.java
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/event/subscriber/dispatch/CacheEvictedSubscriberTest.java
index b81437212b3..db0a4c2b025 100644
---
a/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/event/subscriber/dispatch/CacheEvictedSubscriberTest.java
+++
b/mode/type/cluster/core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/event/subscriber/dispatch/CacheEvictedSubscriberTest.java
@@ -29,7 +29,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
class CacheEvictedSubscriberTest {
@Test
- void assertOnGovernanceEvent() {
+ void assertClearCache() {
EventBusContext eventBusContext = new EventBusContext();
eventBusContext.register(new CacheEvictedSubscriber());
OrderedServicesCache.cacheServices(getClass(),
Collections.emptyList(), Collections.emptyMap());