This is an automated email from the ASF dual-hosted git repository.

zhaojinchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 590b043c1ca Simplify worker id generator (#18941)
590b043c1ca is described below

commit 590b043c1ca0cb153dbbe16e507aacb239c1b70b
Author: gin <[email protected]>
AuthorDate: Thu Jul 7 22:05:47 2022 +0800

    Simplify worker id generator (#18941)
---
 .../keygen/fixture/WorkerIdGeneratorFixture.java   |  5 ----
 .../keygen/fixture/WorkerIdGeneratorFixture.java   |  5 ----
 .../infra/instance/InstanceContext.java            | 12 ++++----
 .../infra/instance/workerid/WorkerIdGenerator.java | 11 ++------
 .../infra/instance/InstanceContextTest.java        | 11 ++------
 .../instance/fixture/WorkerIdGeneratorFixture.java |  5 ----
 .../builder/global/GlobalRulesBuilderTest.java     |  3 +-
 .../mode/metadata/persist/node/ComputeNode.java    |  9 ++++++
 .../compute/service/ComputeNodeStatusService.java  | 17 ++++++++++++
 .../generator/ClusterWorkerIdGenerator.java        | 32 +++++++++++++++++-----
 .../ClusterContextManagerCoordinatorTest.java      |  8 ------
 .../generator/MemoryWorkerIdGenerator.java         |  6 ----
 .../generator/StandaloneWorkerIdGenerator.java     |  6 ----
 13 files changed, 64 insertions(+), 66 deletions(-)

diff --git 
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/java/org/apache/shardingsphere/sharding/algorithm/keygen/fixture/WorkerIdGeneratorFixture.java
 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/java/org/apache/shardingsphere/sharding/algorithm/keygen/fixture/WorkerIdGeneratorFixture.java
index f9d76e364fa..07d8e765d19 100644
--- 
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/java/org/apache/shardingsphere/sharding/algorithm/keygen/fixture/WorkerIdGeneratorFixture.java
+++ 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-core/src/test/java/org/apache/shardingsphere/sharding/algorithm/keygen/fixture/WorkerIdGeneratorFixture.java
@@ -27,11 +27,6 @@ public final class WorkerIdGeneratorFixture implements 
WorkerIdGenerator {
     
     private final long fixtureWorkerId;
     
-    @Override
-    public long generate() {
-        return fixtureWorkerId;
-    }
-    
     @Override
     public long generate(final Properties props) {
         return fixtureWorkerId;
diff --git 
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-plugin/shardingsphere-sharding-cosid/src/test/java/org/apache/shardingsphere/sharding/cosid/algorithm/keygen/fixture/WorkerIdGeneratorFixture.java
 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-plugin/shardingsphere-sharding-cosid/src/test/java/org/apache/shardingsphere/sharding/cosid/algorithm/keygen/fixture/WorkerIdGeneratorFixture.java
index 57010608c06..3673013cfd5 100644
--- 
a/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-plugin/shardingsphere-sharding-cosid/src/test/java/org/apache/shardingsphere/sharding/cosid/algorithm/keygen/fixture/WorkerIdGeneratorFixture.java
+++ 
b/shardingsphere-features/shardingsphere-sharding/shardingsphere-sharding-plugin/shardingsphere-sharding-cosid/src/test/java/org/apache/shardingsphere/sharding/cosid/algorithm/keygen/fixture/WorkerIdGeneratorFixture.java
@@ -27,11 +27,6 @@ public final class WorkerIdGeneratorFixture implements 
WorkerIdGenerator {
     
     private final long fixtureWorkerId;
     
-    @Override
-    public long generate() {
-        return fixtureWorkerId;
-    }
-    
     @Override
     public long generate(final Properties props) {
         return fixtureWorkerId;
diff --git 
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/InstanceContext.java
 
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/InstanceContext.java
index 9f2eb4c4fad..352bb7c8f2d 100644
--- 
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/InstanceContext.java
+++ 
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/InstanceContext.java
@@ -105,13 +105,8 @@ public final class InstanceContext {
      * Get worker id.
      *
      * @return worker id
-     * @deprecated remove it when worker-id refactor was completed
      */
-    @Deprecated
     public long getWorkerId() {
-        if (null == instance.getWorkerId()) {
-            
Optional.of(workerIdGenerator.generate()).ifPresent(instance::setWorkerId);
-        }
         return instance.getWorkerId();
     }
     
@@ -122,7 +117,12 @@ public final class InstanceContext {
      * @return worker id
      */
     public long generateWorkerId(final Properties props) {
-        return workerIdGenerator.generate(props);
+        Long result = instance.getWorkerId();
+        if (null == result) {
+            result = workerIdGenerator.generate(props);
+            instance.setWorkerId(result);
+        }
+        return result;
     }
     
     /**
diff --git 
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/workerid/WorkerIdGenerator.java
 
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/workerid/WorkerIdGenerator.java
index 1b2d3b1c280..a3694ec7a8c 100644
--- 
a/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/workerid/WorkerIdGenerator.java
+++ 
b/shardingsphere-infra/shardingsphere-infra-common/src/main/java/org/apache/shardingsphere/infra/instance/workerid/WorkerIdGenerator.java
@@ -27,16 +27,9 @@ public interface WorkerIdGenerator {
     
     String WORKER_ID_KEY = "worker-id";
     
-    long DEFAULT_WORKER_ID = 0;
+    long DEFAULT_WORKER_ID = 0L;
     
-    /**
-     * Generate worker id.
-     *
-     * @return worker id
-     * @deprecated remove it when worker-id refactor was completed
-     */
-    @Deprecated
-    long generate();
+    long MAX_WORKER_ID = 1023L;
     
     /**
      * Generate worker id.
diff --git 
a/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/instance/InstanceContextTest.java
 
b/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/instance/InstanceContextTest.java
index 9d627394dbb..bfb32c6eac0 100644
--- 
a/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/instance/InstanceContextTest.java
+++ 
b/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/instance/InstanceContextTest.java
@@ -29,7 +29,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedHashSet;
-import java.util.Random;
+import java.util.Properties;
 import java.util.Set;
 
 import static org.hamcrest.CoreMatchers.is;
@@ -62,15 +62,10 @@ public final class InstanceContextTest {
     }
     
     @Test
-    public void assertUpdateWorkerId() {
+    public void assertGenerateWorkerId() {
         InstanceContext context = new InstanceContext(new 
ComputeNodeInstance(mock(InstanceMetaData.class)), new 
WorkerIdGeneratorFixture(Long.MIN_VALUE), modeConfig, lockContext);
-        long actual = context.getWorkerId();
+        long actual = context.generateWorkerId(new Properties());
         assertThat(actual, is(Long.MIN_VALUE));
-        Random random = new Random();
-        Long expected = random.nextLong();
-        context.updateWorkerId(expected);
-        actual = context.getWorkerId();
-        assertThat(actual, is(expected));
     }
     
     @Test
diff --git 
a/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/instance/fixture/WorkerIdGeneratorFixture.java
 
b/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/instance/fixture/WorkerIdGeneratorFixture.java
index ead4fa2527a..e851c61bd2b 100644
--- 
a/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/instance/fixture/WorkerIdGeneratorFixture.java
+++ 
b/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/instance/fixture/WorkerIdGeneratorFixture.java
@@ -27,11 +27,6 @@ public final class WorkerIdGeneratorFixture implements 
WorkerIdGenerator {
     
     private final long fixtureWorkerId;
     
-    @Override
-    public long generate() {
-        return fixtureWorkerId;
-    }
-    
     @Override
     public long generate(final Properties props) {
         return fixtureWorkerId;
diff --git 
a/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/rule/builder/global/GlobalRulesBuilderTest.java
 
b/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/rule/builder/global/GlobalRulesBuilderTest.java
index 05d23973e97..49a65934ad3 100644
--- 
a/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/rule/builder/global/GlobalRulesBuilderTest.java
+++ 
b/shardingsphere-infra/shardingsphere-infra-common/src/test/java/org/apache/shardingsphere/infra/rule/builder/global/GlobalRulesBuilderTest.java
@@ -33,6 +33,7 @@ import org.junit.Test;
 
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Properties;
 import java.util.UUID;
 
 import static org.hamcrest.CoreMatchers.is;
@@ -70,7 +71,7 @@ public final class GlobalRulesBuilderTest {
     
     private WorkerIdGenerator createWorkerIdGenerator() {
         WorkerIdGenerator result = mock(WorkerIdGenerator.class);
-        when(result.generate()).thenReturn(0L);
+        when(result.generate(new Properties())).thenReturn(0L);
         return result;
     }
 }
diff --git 
a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/node/ComputeNode.java
 
b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/node/ComputeNode.java
index 9aac8dd9494..835eb42400f 100644
--- 
a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/node/ComputeNode.java
+++ 
b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/node/ComputeNode.java
@@ -120,6 +120,15 @@ public final class ComputeNode {
         return String.join("/", "", ROOT_NODE, COMPUTE_NODE, WORKER_ID, 
instanceId);
     }
     
+    /**
+     * Get instance worker id root node path.
+     *
+     * @return worker id root node path
+     */
+    public static String getInstanceWorkerIdRootNodePath() {
+        return String.join("/", "", ROOT_NODE, COMPUTE_NODE, WORKER_ID);
+    }
+    
     /**
      * Get instance id by compute node path.
      * 
diff --git 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java
 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java
index e311a4e9a0f..c197e90a2d7 100644
--- 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java
+++ 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/status/compute/service/ComputeNodeStatusService.java
@@ -30,8 +30,11 @@ import 
org.apache.shardingsphere.mode.repository.cluster.ClusterPersistRepositor
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.LinkedHashSet;
 import java.util.LinkedList;
+import java.util.List;
 import java.util.Optional;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 /**
@@ -146,4 +149,18 @@ public final class ComputeNodeStatusService {
         
loadInstanceWorkerId(instanceMetaData.getId()).ifPresent(result::setWorkerId);
         return result;
     }
+    
+    /**
+     * Get used worker ids.
+     *
+     * @return used worker ids
+     */
+    public Set<Long> getUsedWorkerIds() {
+        Set<Long> result = new LinkedHashSet<>();
+        List<String> childrenKeys = 
repository.getChildrenKeys(ComputeNode.getInstanceWorkerIdRootNodePath());
+        for (String each : childrenKeys) {
+            
result.add(Long.parseLong(repository.get(ComputeNode.getInstanceWorkerIdNodePath(each))));
+        }
+        return result;
+    }
 }
diff --git 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/workerid/generator/ClusterWorkerIdGenerator.java
 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/workerid/generator/ClusterWorkerIdGenerator.java
index ceeabe48a47..a69a369a82a 100644
--- 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/workerid/generator/ClusterWorkerIdGenerator.java
+++ 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/workerid/generator/ClusterWorkerIdGenerator.java
@@ -19,6 +19,7 @@ package 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.work
 
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.infra.exception.ShardingSphereException;
 import org.apache.shardingsphere.infra.instance.metadata.InstanceMetaData;
 import org.apache.shardingsphere.infra.instance.workerid.WorkerIdGenerator;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.RegistryCenter;
@@ -35,30 +36,47 @@ import java.util.Properties;
 @RequiredArgsConstructor
 public final class ClusterWorkerIdGenerator implements WorkerIdGenerator {
     
+    private static final int MAX_RE_TRY = 3;
+    
     private final ClusterPersistRepository repository;
     
     private final RegistryCenter registryCenter;
     
     private final InstanceMetaData instanceMetaData;
     
-    @Override
-    public long generate() {
-        return 
registryCenter.getComputeNodeStatusService().loadInstanceWorkerId(instanceMetaData.getId()).orElseGet(this::reGenerate);
-    }
-    
     @Override
     public long generate(final Properties props) {
-        long result = generate();
+        long result = 
registryCenter.getComputeNodeStatusService().loadInstanceWorkerId(instanceMetaData.getId()).orElseGet(this::reGenerate);
         checkConfigured(result, props);
         return result;
     }
     
     private long reGenerate() {
-        long result = 
Long.parseLong(Optional.ofNullable(repository.getSequentialId(WorkerIdNode.getWorkerIdGeneratorPath(instanceMetaData.getId()),
 "")).orElse("0"));
+        long result;
+        int reTryCount = 0;
+        do {
+            reTryCount++;
+            result = generateSequentialId();
+            if (result > MAX_WORKER_ID) {
+                result = result % 1024L;
+            }
+            if (reTryCount > MAX_RE_TRY) {
+                throw new ShardingSphereException("System assigned work-id 
failed, assigned work-id was {}", result);
+            }
+        } while (isExist(result));
         
registryCenter.getComputeNodeStatusService().persistInstanceWorkerId(instanceMetaData.getId(),
 result);
         return result;
     }
     
+    private long generateSequentialId() {
+        String sequentialId = 
repository.getSequentialId(WorkerIdNode.getWorkerIdGeneratorPath(instanceMetaData.getId()),
 "");
+        return null == sequentialId ? DEFAULT_WORKER_ID : 
Long.parseLong(sequentialId);
+    }
+    
+    private boolean isExist(final long workerId) {
+        return 
registryCenter.getComputeNodeStatusService().getUsedWorkerIds().contains(workerId);
+    }
+    
     private void checkConfigured(final long generatedWorkerId, final 
Properties props) {
         Optional<Long> configuredWorkerId = parseWorkerId(props);
         if (configuredWorkerId.isPresent()) {
diff --git 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinatorTest.java
 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinatorTest.java
index 3648d1079e2..7e54e2ee929 100644
--- 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinatorTest.java
+++ 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/test/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinatorTest.java
@@ -68,7 +68,6 @@ import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.statu
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ShowProcessListTriggerEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.ShowProcessListUnitCompleteEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.StateEvent;
-import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.compute.event.WorkerIdEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.event.PrimaryStateChangedEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.status.storage.event.StorageNodeChangedEvent;
 import 
org.apache.shardingsphere.mode.manager.cluster.coordinator.util.ReflectionUtil;
@@ -302,13 +301,6 @@ public final class ClusterContextManagerCoordinatorTest {
         
assertThat(contextManager.getInstanceContext().getInstance().getState().getCurrentState(),
 is(StateType.CIRCUIT_BREAK));
     }
     
-    @Test
-    public void assertRenewWorkerIdChange() {
-        WorkerIdEvent mockWorkerIdEvent = new 
WorkerIdEvent(contextManager.getInstanceContext().getInstance().getInstanceMetaData().getId(),
 12223L);
-        coordinator.renew(mockWorkerIdEvent);
-        assertThat(contextManager.getInstanceContext().getWorkerId(), 
is(12223L));
-    }
-    
     @Test
     public void assertRenewInstanceLabels() {
         Collection<String> labels = Collections.singleton("test");
diff --git 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-memory-mode/shardingsphere-memory-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/memory/workerid/generator/MemoryWorkerIdGenerator.java
 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-memory-mode/shardingsphere-memory-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/memory/workerid/generator/MemoryWorkerIdGenerator.java
index 18d5417c382..9fd0b0b3f53 100644
--- 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-memory-mode/shardingsphere-memory-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/memory/workerid/generator/MemoryWorkerIdGenerator.java
+++ 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-memory-mode/shardingsphere-memory-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/memory/workerid/generator/MemoryWorkerIdGenerator.java
@@ -26,12 +26,6 @@ import java.util.Properties;
  */
 public final class MemoryWorkerIdGenerator implements WorkerIdGenerator {
     
-    @Override
-    public long generate() {
-        // TODO need to support custom configuration of worker-id
-        return DEFAULT_WORKER_ID;
-    }
-    
     @Override
     public long generate(final Properties props) {
         return parseWorkerId(props).orElse(DEFAULT_WORKER_ID);
diff --git 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-standalone-mode/shardingsphere-standalone-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/workerid/generator/StandaloneWorkerIdGenerator.java
 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-standalone-mode/shardingsphere-standalone-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/workerid/generator/StandaloneWorkerIdGenerator.java
index 481a63d5b8f..e46c112013a 100644
--- 
a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-standalone-mode/shardingsphere-standalone-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/workerid/generator/StandaloneWorkerIdGenerator.java
+++ 
b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-standalone-mode/shardingsphere-standalone-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/standalone/workerid/generator/StandaloneWorkerIdGenerator.java
@@ -28,12 +28,6 @@ import java.util.Properties;
  */
 public final class StandaloneWorkerIdGenerator implements WorkerIdGenerator {
     
-    @Override
-    public long generate() {
-        // TODO need to support custom configuration of worker-id
-        return DEFAULT_WORKER_ID;
-    }
-    
     @Override
     public long generate(final Properties props) {
         Optional<Long> result = parseWorkerId(props);

Reply via email to