This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch register-refactor
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git


The following commit(s) were added to refs/heads/register-refactor by this push:
     new 891a52c  Refactor es id generation mechanism.
891a52c is described below

commit 891a52c69e137b87d5dda121b56ca71a8dba7a3c
Author: Wu Sheng <[email protected]>
AuthorDate: Wed Jan 23 15:30:55 2019 +0800

    Refactor es id generation mechanism.
---
 .../register/worker/RegisterPersistentWorker.java  |  2 +-
 .../oap/server/core/storage/IRegisterLockDAO.java  |  3 +-
 .../StorageModuleElasticsearchProvider.java        |  2 +-
 .../elasticsearch/lock/RegisterLockDAOImpl.java    | 66 +++++++---------------
 .../elasticsearch/lock/RegisterLockIndex.java      |  2 -
 .../elasticsearch/lock/RegisterLockInstaller.java  |  8 ---
 .../plugin/jdbc/h2/dao/H2RegisterLockDAO.java      |  3 +-
 7 files changed, 25 insertions(+), 61 deletions(-)

diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java
index 4b471b0..35a83b4 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java
@@ -78,7 +78,7 @@ public class RegisterPersistentWorker extends 
AbstractWorker<RegisterSource> {
                         }
                     } else {
                         int sequence;
-                        if ((sequence = registerLockDAO.lockAndGetId(scope)) 
!= Const.NONE) {
+                        if ((sequence = registerLockDAO.getId(scope, 
registerSource)) != Const.NONE) {
                             try {
                                 dbSource = registerDAO.get(modelName, 
source.id());
                                 if (Objects.nonNull(dbSource)) {
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IRegisterLockDAO.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IRegisterLockDAO.java
index 881a81d..2edaa8f 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IRegisterLockDAO.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IRegisterLockDAO.java
@@ -18,6 +18,7 @@
 
 package org.apache.skywalking.oap.server.core.storage;
 
+import org.apache.skywalking.oap.server.core.register.RegisterSource;
 import org.apache.skywalking.oap.server.core.source.Scope;
 
 /**
@@ -34,5 +35,5 @@ public interface IRegisterLockDAO extends DAO {
      * @param scope for the id. IDs at different scopes could be same, but 
unique in same scope.
      * @return Unique ID.
      */
-    int lockAndGetId(Scope scope);
+    int getId(Scope scope, RegisterSource registerSource);
 }
diff --git 
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
index ba133d1..8f31dba 100644
--- 
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
+++ 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
@@ -66,7 +66,7 @@ public class StorageModuleElasticsearchProvider extends 
ModuleProvider {
 
         this.registerServiceImplementation(IBatchDAO.class, new 
BatchProcessEsDAO(elasticSearchClient, config.getBulkActions(), 
config.getBulkSize(), config.getFlushInterval(), 
config.getConcurrentRequests()));
         this.registerServiceImplementation(StorageDAO.class, new 
StorageEsDAO(elasticSearchClient));
-        this.registerServiceImplementation(IRegisterLockDAO.class, new 
RegisterLockDAOImpl(elasticSearchClient, 10 * 60 * 1000));
+        this.registerServiceImplementation(IRegisterLockDAO.class, new 
RegisterLockDAOImpl(elasticSearchClient));
         this.registerServiceImplementation(IHistoryDeleteDAO.class, new 
HistoryDeleteEsDAO(elasticSearchClient));
 
         this.registerServiceImplementation(IServiceInventoryCacheDAO.class, 
new ServiceInventoryCacheEsDAO(elasticSearchClient));
diff --git 
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/lock/RegisterLockDAOImpl.java
 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/lock/RegisterLockDAOImpl.java
index 1c33d5d..3f7fdd0 100644
--- 
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/lock/RegisterLockDAOImpl.java
+++ 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/lock/RegisterLockDAOImpl.java
@@ -20,8 +20,8 @@ package 
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.lock;
 
 import java.io.IOException;
 import java.util.Map;
-import java.util.concurrent.TimeUnit;
 import org.apache.skywalking.oap.server.core.Const;
+import org.apache.skywalking.oap.server.core.register.RegisterSource;
 import org.apache.skywalking.oap.server.core.source.Scope;
 import org.apache.skywalking.oap.server.core.storage.IRegisterLockDAO;
 import 
org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
@@ -37,68 +37,40 @@ public class RegisterLockDAOImpl extends EsDAO implements 
IRegisterLockDAO {
 
     private static final Logger logger = 
LoggerFactory.getLogger(RegisterLockDAOImpl.class);
 
-    private final int timeout;
-
-    public RegisterLockDAOImpl(ElasticSearchClient client, int timeout) {
+    public RegisterLockDAOImpl(ElasticSearchClient client) {
         super(client);
-        this.timeout = timeout;
     }
 
-    @Override public int lockAndGetId(Scope scope) {
-        try {
-            String id = String.valueOf(scope.ordinal());
+    @Override public int getId(Scope scope, RegisterSource registerSource) {
+        String id = String.valueOf(scope.ordinal());
 
-            int sequence = Const.NONE;
-            try {
-                GetResponse response = getClient().get(RegisterLockIndex.NAME, 
id);
-                if (response.isExists()) {
-                    Map<String, Object> source = response.getSource();
+        int sequence = Const.NONE;
+        try {
+            GetResponse response = getClient().get(RegisterLockIndex.NAME, id);
+            if (response.isExists()) {
+                Map<String, Object> source = response.getSource();
 
-                    long expire = 
((Number)source.get(RegisterLockIndex.COLUMN_EXPIRE)).longValue();
-                    boolean lockable = 
(boolean)source.get(RegisterLockIndex.COLUMN_LOCKABLE);
-                    sequence = 
((Number)source.get(RegisterLockIndex.COLUMN_SEQUENCE)).intValue();
-                    long version = response.getVersion();
+                sequence = 
((Number)source.get(RegisterLockIndex.COLUMN_SEQUENCE)).intValue();
+                long version = response.getVersion();
 
-                    sequence++;
+                sequence++;
 
-                    if (lockable || System.currentTimeMillis() > expire) {
-                        lock(id, sequence, timeout, version);
-                    } else {
-                        TimeUnit.SECONDS.sleep(1);
-                        return Const.NONE;
-                    }
-                }
-            } catch (Throwable t) {
-                logger.warn("Try to lock the row with the id {} failure, error 
message: {}", id, t.getMessage());
-                return Const.NONE;
+                lock(id, sequence, version);
             }
-            return sequence;
-        } finally {
-            releaseLock(scope);
+        } catch (Throwable t) {
+            logger.warn("Try to lock the row with the id {} failure, error 
message: {}", id, t.getMessage());
+            return Const.NONE;
         }
+        return sequence;
     }
 
-    private void lock(String id, int sequence, int timeout, long version) 
throws IOException {
+    private void lock(String id, int sequence, long version) throws 
IOException {
         XContentBuilder source = XContentFactory.jsonBuilder().startObject();
-        source.field(RegisterLockIndex.COLUMN_EXPIRE, 
System.currentTimeMillis() + timeout);
-        source.field(RegisterLockIndex.COLUMN_LOCKABLE, false);
         source.field(RegisterLockIndex.COLUMN_SEQUENCE, sequence);
         source.endObject();
 
         getClient().forceUpdate(RegisterLockIndex.NAME, id, source, version);
     }
+}
 
-    public void releaseLock(Scope scope) {
-        String id = String.valueOf(scope.ordinal());
-
-        try {
-            XContentBuilder source = 
XContentFactory.jsonBuilder().startObject();
-            source.field(RegisterLockIndex.COLUMN_LOCKABLE, true);
-            source.endObject();
 
-            getClient().forceUpdate(RegisterLockIndex.NAME, id, source);
-        } catch (Throwable t) {
-            logger.error("{} inventory release lock failure.", scope.name(), 
t);
-        }
-    }
-}
diff --git 
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/lock/RegisterLockIndex.java
 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/lock/RegisterLockIndex.java
index dc2ffa9..fd972a5 100644
--- 
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/lock/RegisterLockIndex.java
+++ 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/lock/RegisterLockIndex.java
@@ -24,7 +24,5 @@ package 
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.lock;
 public class RegisterLockIndex {
 
     public static final String NAME = "register_lock";
-    public static final String COLUMN_EXPIRE = "expire";
-    public static final String COLUMN_LOCKABLE = "lockable";
     public static final String COLUMN_SEQUENCE = "sequence";
 }
diff --git 
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/lock/RegisterLockInstaller.java
 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/lock/RegisterLockInstaller.java
index 2dec7cc..cf6d253 100644
--- 
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/lock/RegisterLockInstaller.java
+++ 
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/lock/RegisterLockInstaller.java
@@ -78,12 +78,6 @@ public class RegisterLockInstaller {
         XContentBuilder source = XContentFactory.jsonBuilder()
             .startObject()
             .startObject("properties")
-            .startObject(RegisterLockIndex.COLUMN_EXPIRE)
-            .field("type", "long")
-            .endObject()
-            .startObject(RegisterLockIndex.COLUMN_LOCKABLE)
-            .field("type", "boolean")
-            .endObject()
             .startObject(RegisterLockIndex.COLUMN_SEQUENCE)
             .field("type", "integer")
             .endObject()
@@ -97,8 +91,6 @@ public class RegisterLockInstaller {
         GetResponse response = client.get(RegisterLockIndex.NAME, 
String.valueOf(scopeId));
         if (!response.isExists()) {
             XContentBuilder builder = 
XContentFactory.jsonBuilder().startObject();
-            builder.field(RegisterLockIndex.COLUMN_EXPIRE, Long.MIN_VALUE);
-            builder.field(RegisterLockIndex.COLUMN_LOCKABLE, true);
             builder.field(RegisterLockIndex.COLUMN_SEQUENCE, 1);
             builder.endObject();
 
diff --git 
a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2RegisterLockDAO.java
 
b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2RegisterLockDAO.java
index 19224a5..58937df 100644
--- 
a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2RegisterLockDAO.java
+++ 
b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2RegisterLockDAO.java
@@ -20,6 +20,7 @@ package 
org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao;
 
 import java.sql.*;
 import org.apache.skywalking.oap.server.core.Const;
+import org.apache.skywalking.oap.server.core.register.RegisterSource;
 import org.apache.skywalking.oap.server.core.source.Scope;
 import org.apache.skywalking.oap.server.core.storage.IRegisterLockDAO;
 import 
org.apache.skywalking.oap.server.library.client.jdbc.JDBCClientException;
@@ -41,7 +42,7 @@ public class H2RegisterLockDAO implements IRegisterLockDAO {
         this.h2Client = h2Client;
     }
 
-    @Override public int lockAndGetId(Scope scope) {
+    @Override public int getId(Scope scope, RegisterSource registerSource) {
         try (Connection connection = h2Client.getTransactionConnection()) {
             ResultSet resultSet = h2Client.executeQuery(connection, "select 
sequence from " + H2RegisterLockInstaller.LOCK_TABLE_NAME + " where id = " + 
scope.ordinal() + " for update");
             while (resultSet.next()) {

Reply via email to