This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch register-refactor
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git
The following commit(s) were added to refs/heads/register-refactor by this push:
new 891a52c Refactor es id generation mechanism.
891a52c is described below
commit 891a52c69e137b87d5dda121b56ca71a8dba7a3c
Author: Wu Sheng <[email protected]>
AuthorDate: Wed Jan 23 15:30:55 2019 +0800
Refactor es id generation mechanism.
---
.../register/worker/RegisterPersistentWorker.java | 2 +-
.../oap/server/core/storage/IRegisterLockDAO.java | 3 +-
.../StorageModuleElasticsearchProvider.java | 2 +-
.../elasticsearch/lock/RegisterLockDAOImpl.java | 66 +++++++---------------
.../elasticsearch/lock/RegisterLockIndex.java | 2 -
.../elasticsearch/lock/RegisterLockInstaller.java | 8 ---
.../plugin/jdbc/h2/dao/H2RegisterLockDAO.java | 3 +-
7 files changed, 25 insertions(+), 61 deletions(-)
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java
index 4b471b0..35a83b4 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java
@@ -78,7 +78,7 @@ public class RegisterPersistentWorker extends
AbstractWorker<RegisterSource> {
}
} else {
int sequence;
- if ((sequence = registerLockDAO.lockAndGetId(scope))
!= Const.NONE) {
+ if ((sequence = registerLockDAO.getId(scope,
registerSource)) != Const.NONE) {
try {
dbSource = registerDAO.get(modelName,
source.id());
if (Objects.nonNull(dbSource)) {
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IRegisterLockDAO.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IRegisterLockDAO.java
index 881a81d..2edaa8f 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IRegisterLockDAO.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IRegisterLockDAO.java
@@ -18,6 +18,7 @@
package org.apache.skywalking.oap.server.core.storage;
+import org.apache.skywalking.oap.server.core.register.RegisterSource;
import org.apache.skywalking.oap.server.core.source.Scope;
/**
@@ -34,5 +35,5 @@ public interface IRegisterLockDAO extends DAO {
* @param scope for the id. IDs at different scopes could be same, but
unique in same scope.
* @return Unique ID.
*/
- int lockAndGetId(Scope scope);
+ int getId(Scope scope, RegisterSource registerSource);
}
diff --git
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
index ba133d1..8f31dba 100644
---
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
+++
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
@@ -66,7 +66,7 @@ public class StorageModuleElasticsearchProvider extends
ModuleProvider {
this.registerServiceImplementation(IBatchDAO.class, new
BatchProcessEsDAO(elasticSearchClient, config.getBulkActions(),
config.getBulkSize(), config.getFlushInterval(),
config.getConcurrentRequests()));
this.registerServiceImplementation(StorageDAO.class, new
StorageEsDAO(elasticSearchClient));
- this.registerServiceImplementation(IRegisterLockDAO.class, new
RegisterLockDAOImpl(elasticSearchClient, 10 * 60 * 1000));
+ this.registerServiceImplementation(IRegisterLockDAO.class, new
RegisterLockDAOImpl(elasticSearchClient));
this.registerServiceImplementation(IHistoryDeleteDAO.class, new
HistoryDeleteEsDAO(elasticSearchClient));
this.registerServiceImplementation(IServiceInventoryCacheDAO.class,
new ServiceInventoryCacheEsDAO(elasticSearchClient));
diff --git
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/lock/RegisterLockDAOImpl.java
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/lock/RegisterLockDAOImpl.java
index 1c33d5d..3f7fdd0 100644
---
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/lock/RegisterLockDAOImpl.java
+++
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/lock/RegisterLockDAOImpl.java
@@ -20,8 +20,8 @@ package
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.lock;
import java.io.IOException;
import java.util.Map;
-import java.util.concurrent.TimeUnit;
import org.apache.skywalking.oap.server.core.Const;
+import org.apache.skywalking.oap.server.core.register.RegisterSource;
import org.apache.skywalking.oap.server.core.source.Scope;
import org.apache.skywalking.oap.server.core.storage.IRegisterLockDAO;
import
org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
@@ -37,68 +37,40 @@ public class RegisterLockDAOImpl extends EsDAO implements
IRegisterLockDAO {
private static final Logger logger =
LoggerFactory.getLogger(RegisterLockDAOImpl.class);
- private final int timeout;
-
- public RegisterLockDAOImpl(ElasticSearchClient client, int timeout) {
+ public RegisterLockDAOImpl(ElasticSearchClient client) {
super(client);
- this.timeout = timeout;
}
- @Override public int lockAndGetId(Scope scope) {
- try {
- String id = String.valueOf(scope.ordinal());
+ @Override public int getId(Scope scope, RegisterSource registerSource) {
+ String id = String.valueOf(scope.ordinal());
- int sequence = Const.NONE;
- try {
- GetResponse response = getClient().get(RegisterLockIndex.NAME,
id);
- if (response.isExists()) {
- Map<String, Object> source = response.getSource();
+ int sequence = Const.NONE;
+ try {
+ GetResponse response = getClient().get(RegisterLockIndex.NAME, id);
+ if (response.isExists()) {
+ Map<String, Object> source = response.getSource();
- long expire =
((Number)source.get(RegisterLockIndex.COLUMN_EXPIRE)).longValue();
- boolean lockable =
(boolean)source.get(RegisterLockIndex.COLUMN_LOCKABLE);
- sequence =
((Number)source.get(RegisterLockIndex.COLUMN_SEQUENCE)).intValue();
- long version = response.getVersion();
+ sequence =
((Number)source.get(RegisterLockIndex.COLUMN_SEQUENCE)).intValue();
+ long version = response.getVersion();
- sequence++;
+ sequence++;
- if (lockable || System.currentTimeMillis() > expire) {
- lock(id, sequence, timeout, version);
- } else {
- TimeUnit.SECONDS.sleep(1);
- return Const.NONE;
- }
- }
- } catch (Throwable t) {
- logger.warn("Try to lock the row with the id {} failure, error
message: {}", id, t.getMessage());
- return Const.NONE;
+ lock(id, sequence, version);
}
- return sequence;
- } finally {
- releaseLock(scope);
+ } catch (Throwable t) {
+ logger.warn("Try to lock the row with the id {} failure, error
message: {}", id, t.getMessage());
+ return Const.NONE;
}
+ return sequence;
}
- private void lock(String id, int sequence, int timeout, long version)
throws IOException {
+ private void lock(String id, int sequence, long version) throws
IOException {
XContentBuilder source = XContentFactory.jsonBuilder().startObject();
- source.field(RegisterLockIndex.COLUMN_EXPIRE,
System.currentTimeMillis() + timeout);
- source.field(RegisterLockIndex.COLUMN_LOCKABLE, false);
source.field(RegisterLockIndex.COLUMN_SEQUENCE, sequence);
source.endObject();
getClient().forceUpdate(RegisterLockIndex.NAME, id, source, version);
}
+}
- public void releaseLock(Scope scope) {
- String id = String.valueOf(scope.ordinal());
-
- try {
- XContentBuilder source =
XContentFactory.jsonBuilder().startObject();
- source.field(RegisterLockIndex.COLUMN_LOCKABLE, true);
- source.endObject();
- getClient().forceUpdate(RegisterLockIndex.NAME, id, source);
- } catch (Throwable t) {
- logger.error("{} inventory release lock failure.", scope.name(),
t);
- }
- }
-}
diff --git
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/lock/RegisterLockIndex.java
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/lock/RegisterLockIndex.java
index dc2ffa9..fd972a5 100644
---
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/lock/RegisterLockIndex.java
+++
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/lock/RegisterLockIndex.java
@@ -24,7 +24,5 @@ package
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.lock;
public class RegisterLockIndex {
public static final String NAME = "register_lock";
- public static final String COLUMN_EXPIRE = "expire";
- public static final String COLUMN_LOCKABLE = "lockable";
public static final String COLUMN_SEQUENCE = "sequence";
}
diff --git
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/lock/RegisterLockInstaller.java
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/lock/RegisterLockInstaller.java
index 2dec7cc..cf6d253 100644
---
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/lock/RegisterLockInstaller.java
+++
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/lock/RegisterLockInstaller.java
@@ -78,12 +78,6 @@ public class RegisterLockInstaller {
XContentBuilder source = XContentFactory.jsonBuilder()
.startObject()
.startObject("properties")
- .startObject(RegisterLockIndex.COLUMN_EXPIRE)
- .field("type", "long")
- .endObject()
- .startObject(RegisterLockIndex.COLUMN_LOCKABLE)
- .field("type", "boolean")
- .endObject()
.startObject(RegisterLockIndex.COLUMN_SEQUENCE)
.field("type", "integer")
.endObject()
@@ -97,8 +91,6 @@ public class RegisterLockInstaller {
GetResponse response = client.get(RegisterLockIndex.NAME,
String.valueOf(scopeId));
if (!response.isExists()) {
XContentBuilder builder =
XContentFactory.jsonBuilder().startObject();
- builder.field(RegisterLockIndex.COLUMN_EXPIRE, Long.MIN_VALUE);
- builder.field(RegisterLockIndex.COLUMN_LOCKABLE, true);
builder.field(RegisterLockIndex.COLUMN_SEQUENCE, 1);
builder.endObject();
diff --git
a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2RegisterLockDAO.java
b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2RegisterLockDAO.java
index 19224a5..58937df 100644
---
a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2RegisterLockDAO.java
+++
b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2RegisterLockDAO.java
@@ -20,6 +20,7 @@ package
org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao;
import java.sql.*;
import org.apache.skywalking.oap.server.core.Const;
+import org.apache.skywalking.oap.server.core.register.RegisterSource;
import org.apache.skywalking.oap.server.core.source.Scope;
import org.apache.skywalking.oap.server.core.storage.IRegisterLockDAO;
import
org.apache.skywalking.oap.server.library.client.jdbc.JDBCClientException;
@@ -41,7 +42,7 @@ public class H2RegisterLockDAO implements IRegisterLockDAO {
this.h2Client = h2Client;
}
- @Override public int lockAndGetId(Scope scope) {
+ @Override public int getId(Scope scope, RegisterSource registerSource) {
try (Connection connection = h2Client.getTransactionConnection()) {
ResultSet resultSet = h2Client.executeQuery(connection, "select
sequence from " + H2RegisterLockInstaller.LOCK_TABLE_NAME + " where id = " +
scope.ordinal() + " for update");
while (resultSet.next()) {