This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new 16b8236 1. All the inventories add last_update_time attribute,
delete the mapping_last_update_time attribute in service inventory. (#3168)
16b8236 is described below
commit 16b82365067632d291421aad0a6ac14fc025bcae
Author: 彭勇升 pengys <[email protected]>
AuthorDate: Fri Jul 26 14:02:27 2019 +0800
1. All the inventories add last_update_time attribute, delete the
mapping_last_update_time attribute in service inventory. (#3168)
2. Fixed the service inventory and net address inventory update bugs.
---
.../apache/skywalking/oap/server/core/Const.java | 1 +
.../oap/server/core/cache/CacheUpdateTimer.java | 54 ++++++++++++++++------
.../server/core/register/EndpointInventory.java | 14 ++++--
.../core/register/NetworkAddressInventory.java | 21 +++++----
.../oap/server/core/register/RegisterSource.java | 17 +++++--
.../core/register/ServiceInstanceInventory.java | 19 ++++----
.../oap/server/core/register/ServiceInventory.java | 45 +++++++++---------
.../service/NetworkAddressInventoryRegister.java | 2 +-
.../register/service/ServiceInventoryRegister.java | 8 ++--
.../cache/INetworkAddressInventoryCacheDAO.java | 3 ++
.../storage/cache/IServiceInventoryCacheDAO.java | 2 +-
.../standardization/ReferenceIdExchanger.java | 4 --
.../parser/standardization/SpanIdExchanger.java | 3 +-
.../server-starter/src/main/resources/log4j2.xml | 2 +-
.../cache/NetworkAddressInventoryCacheEsDAO.java | 21 +++++++++
.../cache/ServiceInventoryCacheEsDAO.java | 6 +--
.../h2/dao/H2NetworkAddressInventoryCacheDAO.java | 35 ++++++++++++--
.../jdbc/h2/dao/H2ServiceInventoryCacheDAO.java | 6 +--
18 files changed, 178 insertions(+), 85 deletions(-)
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/Const.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/Const.java
index 566198c..d91a47a 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/Const.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/Const.java
@@ -36,5 +36,6 @@ public class Const {
public static final String SEGMENT_SPAN_SPLIT = "S";
public static final String UNKNOWN = "Unknown";
public static final String EMPTY_STRING = "";
+ public static final String EMPTY_JSON_OBJECT_STRING = "{}";
public static final String DOMAIN_OPERATION_NAME = "{domain}";
}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cache/CacheUpdateTimer.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cache/CacheUpdateTimer.java
index 5aa9dbd..edd4ccf 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cache/CacheUpdateTimer.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cache/CacheUpdateTimer.java
@@ -22,10 +22,10 @@ import java.util.*;
import java.util.concurrent.*;
import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
import org.apache.skywalking.oap.server.core.CoreModule;
-import org.apache.skywalking.oap.server.core.register.ServiceInventory;
+import org.apache.skywalking.oap.server.core.register.*;
import org.apache.skywalking.oap.server.core.storage.StorageModule;
-import
org.apache.skywalking.oap.server.core.storage.cache.IServiceInventoryCacheDAO;
-import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.apache.skywalking.oap.server.core.storage.cache.*;
+import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.slf4j.*;
/**
@@ -38,34 +38,60 @@ public enum CacheUpdateTimer {
private Boolean isStarted = false;
- public void start(ModuleManager moduleManager) {
- logger.info("Cache update timer start");
+ public void start(ModuleDefineHolder moduleDefineHolder) {
+ logger.info("Cache updateServiceInventory timer start");
- final long timeInterval = 3;
+ final long timeInterval = 10;
if (!isStarted) {
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(
- new RunnableWithExceptionProtection(() ->
update(moduleManager),
+ new RunnableWithExceptionProtection(() ->
update(moduleDefineHolder),
t -> logger.error("Cache update failure.", t)), 1,
timeInterval, TimeUnit.SECONDS);
this.isStarted = true;
}
}
- private void update(ModuleManager moduleManager) {
- IServiceInventoryCacheDAO serviceInventoryCacheDAO =
moduleManager.find(StorageModule.NAME).provider().getService(IServiceInventoryCacheDAO.class);
- ServiceInventoryCache serviceInventoryCache =
moduleManager.find(CoreModule.NAME).provider().getService(ServiceInventoryCache.class);
- List<ServiceInventory> serviceInventories =
serviceInventoryCacheDAO.loadLastMappingUpdate();
+ private void update(ModuleDefineHolder moduleDefineHolder) {
+ updateServiceInventory(moduleDefineHolder);
+ updateNetAddressInventory(moduleDefineHolder);
+ }
+
+ private void updateServiceInventory(ModuleDefineHolder moduleDefineHolder)
{
+ IServiceInventoryCacheDAO serviceInventoryCacheDAO =
moduleDefineHolder.find(StorageModule.NAME).provider().getService(IServiceInventoryCacheDAO.class);
+ ServiceInventoryCache serviceInventoryCache =
moduleDefineHolder.find(CoreModule.NAME).provider().getService(ServiceInventoryCache.class);
+ List<ServiceInventory> serviceInventories =
serviceInventoryCacheDAO.loadLastUpdate(System.currentTimeMillis() - 60000);
serviceInventories.forEach(serviceInventory -> {
- logger.info("Update mapping service id in the cache of service
inventory, service id: {}, mapping service id: {}",
serviceInventory.getSequence(), serviceInventory.getMappingServiceId());
ServiceInventory cache =
serviceInventoryCache.get(serviceInventory.getSequence());
if (Objects.nonNull(cache)) {
-
cache.setMappingServiceId(serviceInventory.getMappingServiceId());
-
cache.setMappingLastUpdateTime(serviceInventory.getMappingLastUpdateTime());
+ if (cache.getMappingServiceId() !=
serviceInventory.getMappingServiceId()) {
+
cache.setMappingServiceId(serviceInventory.getMappingServiceId());
+
cache.setServiceNodeType(serviceInventory.getServiceNodeType());
+ cache.setProperties(serviceInventory.getProperties());
+ logger.info("Update the cache of service inventory,
service id: {}", serviceInventory.getSequence());
+ }
} else {
logger.warn("Unable to found the id of {} in service inventory
cache.", serviceInventory.getSequence());
}
});
}
+
+ private void updateNetAddressInventory(ModuleDefineHolder
moduleDefineHolder) {
+ INetworkAddressInventoryCacheDAO addressInventoryCacheDAO =
moduleDefineHolder.find(StorageModule.NAME).provider().getService(INetworkAddressInventoryCacheDAO.class);
+ NetworkAddressInventoryCache addressInventoryCache =
moduleDefineHolder.find(CoreModule.NAME).provider().getService(NetworkAddressInventoryCache.class);
+ List<NetworkAddressInventory> addressInventories =
addressInventoryCacheDAO.loadLastUpdate(System.currentTimeMillis() - 60000);
+
+ addressInventories.forEach(addressInventory -> {
+ NetworkAddressInventory cache =
addressInventoryCache.get(addressInventory.getSequence());
+ if (Objects.nonNull(cache)) {
+ if
(!cache.getNetworkAddressNodeType().equals(addressInventory.getNetworkAddressNodeType()))
{
+
cache.setNetworkAddressNodeType(addressInventory.getNetworkAddressNodeType());
+ logger.info("Update the cache of net address inventory,
address id: {}", addressInventory.getSequence());
+ }
+ } else {
+ logger.warn("Unable to found the id of {} in net address
inventory cache.", addressInventory.getSequence());
+ }
+ });
+ }
}
\ No newline at end of file
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/EndpointInventory.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/EndpointInventory.java
index 8e906bc..7f0edae 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/EndpointInventory.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/EndpointInventory.java
@@ -91,6 +91,7 @@ public class EndpointInventory extends RegisterSource {
remoteBuilder.addDataLongs(getRegisterTime());
remoteBuilder.addDataLongs(getHeartbeatTime());
+ remoteBuilder.addDataLongs(getLastUpdateTime());
remoteBuilder.addDataStrings(Strings.isNullOrEmpty(name) ?
Const.EMPTY_STRING : name);
return remoteBuilder;
@@ -103,6 +104,7 @@ public class EndpointInventory extends RegisterSource {
setRegisterTime(remoteData.getDataLongs(0));
setHeartbeatTime(remoteData.getDataLongs(1));
+ setLastUpdateTime(remoteData.getDataLongs(2));
setName(remoteData.getDataStrings(0));
}
@@ -115,12 +117,13 @@ public class EndpointInventory extends RegisterSource {
@Override public EndpointInventory map2Data(Map<String, Object> dbMap)
{
EndpointInventory inventory = new EndpointInventory();
- inventory.setSequence((Integer)dbMap.get(SEQUENCE));
- inventory.setServiceId((Integer)dbMap.get(SERVICE_ID));
+ inventory.setSequence(((Number)dbMap.get(SEQUENCE)).intValue());
+ inventory.setServiceId(((Number)dbMap.get(SERVICE_ID)).intValue());
inventory.setName((String)dbMap.get(NAME));
- inventory.setDetectPoint((Integer)dbMap.get(DETECT_POINT));
- inventory.setRegisterTime((Long)dbMap.get(REGISTER_TIME));
- inventory.setHeartbeatTime((Long)dbMap.get(HEARTBEAT_TIME));
+
inventory.setDetectPoint(((Number)dbMap.get(DETECT_POINT)).intValue());
+
inventory.setRegisterTime(((Number)dbMap.get(REGISTER_TIME)).longValue());
+
inventory.setHeartbeatTime(((Number)dbMap.get(HEARTBEAT_TIME)).longValue());
+
inventory.setLastUpdateTime(((Number)dbMap.get(LAST_UPDATE_TIME)).longValue());
return inventory;
}
@@ -132,6 +135,7 @@ public class EndpointInventory extends RegisterSource {
map.put(DETECT_POINT, storageData.getDetectPoint());
map.put(REGISTER_TIME, storageData.getRegisterTime());
map.put(HEARTBEAT_TIME, storageData.getHeartbeatTime());
+ map.put(LAST_UPDATE_TIME, storageData.getLastUpdateTime());
return map;
}
}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/NetworkAddressInventory.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/NetworkAddressInventory.java
index d0f09fa..7a4eb73 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/NetworkAddressInventory.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/NetworkAddressInventory.java
@@ -88,6 +88,7 @@ public class NetworkAddressInventory extends RegisterSource {
inventory.setSequence(getSequence());
inventory.setRegisterTime(getRegisterTime());
inventory.setHeartbeatTime(getHeartbeatTime());
+ inventory.setLastUpdateTime(getLastUpdateTime());
inventory.setName(name);
inventory.setNodeType(nodeType);
@@ -95,14 +96,14 @@ public class NetworkAddressInventory extends RegisterSource
{
}
@Override public boolean combine(RegisterSource registerSource) {
- boolean isCombine = super.combine(registerSource);
+ boolean isChanged = super.combine(registerSource);
NetworkAddressInventory inventory =
(NetworkAddressInventory)registerSource;
- if (nodeType != inventory.nodeType) {
- setNodeType(inventory.nodeType);
+ if (this.nodeType != inventory.getNodeType() &&
inventory.getLastUpdateTime() >= this.getLastUpdateTime()) {
+ setNodeType(inventory.getNodeType());
return true;
} else {
- return isCombine;
+ return isChanged;
}
}
@@ -113,6 +114,7 @@ public class NetworkAddressInventory extends RegisterSource
{
remoteBuilder.addDataLongs(getRegisterTime());
remoteBuilder.addDataLongs(getHeartbeatTime());
+ remoteBuilder.addDataLongs(getLastUpdateTime());
remoteBuilder.addDataStrings(Strings.isNullOrEmpty(name) ?
Const.EMPTY_STRING : name);
return remoteBuilder;
@@ -124,6 +126,7 @@ public class NetworkAddressInventory extends RegisterSource
{
setRegisterTime(remoteData.getDataLongs(0));
setHeartbeatTime(remoteData.getDataLongs(1));
+ setLastUpdateTime(remoteData.getDataLongs(2));
setName(remoteData.getDataStrings(0));
}
@@ -136,11 +139,12 @@ public class NetworkAddressInventory extends
RegisterSource {
@Override public NetworkAddressInventory map2Data(Map<String, Object>
dbMap) {
NetworkAddressInventory inventory = new NetworkAddressInventory();
- inventory.setSequence((Integer)dbMap.get(SEQUENCE));
+ inventory.setSequence(((Number)dbMap.get(SEQUENCE)).intValue());
inventory.setName((String)dbMap.get(NAME));
- inventory.setNodeType((Integer)dbMap.get(NODE_TYPE));
- inventory.setRegisterTime((Long)dbMap.get(REGISTER_TIME));
- inventory.setHeartbeatTime((Long)dbMap.get(HEARTBEAT_TIME));
+ inventory.setNodeType(((Number)dbMap.get(NODE_TYPE)).intValue());
+
inventory.setRegisterTime(((Number)dbMap.get(REGISTER_TIME)).longValue());
+
inventory.setHeartbeatTime(((Number)dbMap.get(HEARTBEAT_TIME)).longValue());
+
inventory.setLastUpdateTime(((Number)dbMap.get(LAST_UPDATE_TIME)).longValue());
return inventory;
}
@@ -151,6 +155,7 @@ public class NetworkAddressInventory extends RegisterSource
{
map.put(NODE_TYPE, storageData.getNodeType());
map.put(REGISTER_TIME, storageData.getRegisterTime());
map.put(HEARTBEAT_TIME, storageData.getHeartbeatTime());
+ map.put(LAST_UPDATE_TIME, storageData.getLastUpdateTime());
return map;
}
}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/RegisterSource.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/RegisterSource.java
index 210c170..328a060 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/RegisterSource.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/RegisterSource.java
@@ -31,17 +31,24 @@ public abstract class RegisterSource extends StreamData
implements StorageData {
public static final String SEQUENCE = "sequence";
public static final String REGISTER_TIME = "register_time";
public static final String HEARTBEAT_TIME = "heartbeat_time";
+ public static final String LAST_UPDATE_TIME = "last_update_time";
@Getter @Setter @Column(columnName = SEQUENCE) private int sequence;
- @Getter @Setter @Column(columnName = REGISTER_TIME) private long
registerTime;
- @Getter @Setter @Column(columnName = HEARTBEAT_TIME) private long
heartbeatTime;
+ @Getter @Setter @Column(columnName = REGISTER_TIME) private long
registerTime = 0L;
+ @Getter @Setter @Column(columnName = HEARTBEAT_TIME) private long
heartbeatTime = 0L;
+ @Setter @Getter @Column(columnName = LAST_UPDATE_TIME) private long
lastUpdateTime = 0L;
public boolean combine(RegisterSource registerSource) {
+ boolean isChanged = false;
if (heartbeatTime < registerSource.getHeartbeatTime()) {
heartbeatTime = registerSource.getHeartbeatTime();
- return true;
- } else {
- return false;
+ isChanged = true;
}
+
+ if (lastUpdateTime < registerSource.getLastUpdateTime()) {
+ lastUpdateTime = registerSource.getLastUpdateTime();
+ isChanged = true;
+ }
+ return isChanged;
}
}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/ServiceInstanceInventory.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/ServiceInstanceInventory.java
index 28cd596..f5254f0 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/ServiceInstanceInventory.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/ServiceInstanceInventory.java
@@ -51,8 +51,7 @@ public class ServiceInstanceInventory extends RegisterSource {
public static final String PROPERTIES = "properties";
private static final Gson GSON = new Gson();
- @Setter @Getter @Column(columnName = INSTANCE_UUID, matchQuery = true)
- private String instanceUUID = Const.EMPTY_STRING;
+ @Setter @Getter @Column(columnName = INSTANCE_UUID, matchQuery = true)
private String instanceUUID = Const.EMPTY_STRING;
@Setter @Getter @Column(columnName = NAME) private String name =
Const.EMPTY_STRING;
@Setter @Getter @Column(columnName = SERVICE_ID) private int serviceId;
@Setter @Getter @Column(columnName = IS_ADDRESS) private int isAddress;
@@ -133,6 +132,7 @@ public class ServiceInstanceInventory extends
RegisterSource {
remoteBuilder.addDataLongs(getRegisterTime());
remoteBuilder.addDataLongs(getHeartbeatTime());
+ remoteBuilder.addDataLongs(getLastUpdateTime());
remoteBuilder.addDataStrings(Strings.isNullOrEmpty(name) ?
Const.EMPTY_STRING : name);
remoteBuilder.addDataStrings(Strings.isNullOrEmpty(instanceUUID) ?
Const.EMPTY_STRING : instanceUUID);
@@ -148,6 +148,7 @@ public class ServiceInstanceInventory extends
RegisterSource {
setRegisterTime(remoteData.getDataLongs(0));
setHeartbeatTime(remoteData.getDataLongs(1));
+ setLastUpdateTime(remoteData.getDataLongs(2));
setName(remoteData.getDataStrings(0));
setInstanceUUID(remoteData.getDataStrings(1));
@@ -162,13 +163,14 @@ public class ServiceInstanceInventory extends
RegisterSource {
@Override public ServiceInstanceInventory map2Data(Map<String, Object>
dbMap) {
ServiceInstanceInventory inventory = new
ServiceInstanceInventory();
- inventory.setSequence((Integer)dbMap.get(SEQUENCE));
- inventory.setServiceId((Integer)dbMap.get(SERVICE_ID));
- inventory.setIsAddress((Integer)dbMap.get(IS_ADDRESS));
- inventory.setAddressId((Integer)dbMap.get(ADDRESS_ID));
+ inventory.setSequence(((Number)dbMap.get(SEQUENCE)).intValue());
+ inventory.setServiceId(((Number)dbMap.get(SERVICE_ID)).intValue());
+ inventory.setIsAddress(((Number)dbMap.get(IS_ADDRESS)).intValue());
+ inventory.setAddressId(((Number)dbMap.get(ADDRESS_ID)).intValue());
- inventory.setRegisterTime((Long)dbMap.get(REGISTER_TIME));
- inventory.setHeartbeatTime((Long)dbMap.get(HEARTBEAT_TIME));
+
inventory.setRegisterTime(((Number)dbMap.get(REGISTER_TIME)).longValue());
+
inventory.setHeartbeatTime(((Number)dbMap.get(HEARTBEAT_TIME)).longValue());
+
inventory.setLastUpdateTime(((Number)dbMap.get(LAST_UPDATE_TIME)).longValue());
inventory.setName((String)dbMap.get(NAME));
inventory.setInstanceUUID((String)dbMap.get(INSTANCE_UUID));
@@ -185,6 +187,7 @@ public class ServiceInstanceInventory extends
RegisterSource {
map.put(REGISTER_TIME, storageData.getRegisterTime());
map.put(HEARTBEAT_TIME, storageData.getHeartbeatTime());
+ map.put(LAST_UPDATE_TIME, storageData.getLastUpdateTime());
map.put(NAME, storageData.getName());
map.put(INSTANCE_UUID, storageData.getInstanceUUID());
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/ServiceInventory.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/ServiceInventory.java
index 380d479..5441cc1 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/ServiceInventory.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/ServiceInventory.java
@@ -47,7 +47,6 @@ public class ServiceInventory extends RegisterSource {
private static final String ADDRESS_ID = "address_id";
public static final String NODE_TYPE = "node_type";
public static final String MAPPING_SERVICE_ID = "mapping_service_id";
- public static final String MAPPING_LAST_UPDATE_TIME =
"mapping_last_update_time";
public static final String PROPERTIES = "properties";
private static final Gson GSON = new Gson();
@@ -56,8 +55,7 @@ public class ServiceInventory extends RegisterSource {
@Setter @Getter @Column(columnName = ADDRESS_ID) private int addressId;
@Setter(AccessLevel.PRIVATE) @Getter(AccessLevel.PRIVATE)
@Column(columnName = NODE_TYPE) private int nodeType;
@Setter @Getter @Column(columnName = MAPPING_SERVICE_ID) private int
mappingServiceId;
- @Setter @Getter @Column(columnName = MAPPING_LAST_UPDATE_TIME) private
long mappingLastUpdateTime;
- @Getter(AccessLevel.PRIVATE) @Column(columnName = PROPERTIES) private
String prop;
+ @Getter(AccessLevel.PRIVATE) @Column(columnName = PROPERTIES) private
String prop = Const.EMPTY_JSON_OBJECT_STRING;
@Getter private JsonObject properties;
public NodeType getServiceNodeType() {
@@ -119,7 +117,7 @@ public class ServiceInventory extends RegisterSource {
inventory.setIsAddress(isAddress);
inventory.setNodeType(nodeType);
inventory.setAddressId(addressId);
- inventory.setMappingLastUpdateTime(mappingLastUpdateTime);
+ inventory.setLastUpdateTime(getLastUpdateTime());
inventory.setMappingServiceId(mappingServiceId);
inventory.setProp(prop);
@@ -155,7 +153,7 @@ public class ServiceInventory extends RegisterSource {
remoteBuilder.addDataLongs(getRegisterTime());
remoteBuilder.addDataLongs(getHeartbeatTime());
- remoteBuilder.addDataLongs(getMappingLastUpdateTime());
+ remoteBuilder.addDataLongs(getLastUpdateTime());
remoteBuilder.addDataStrings(Strings.isNullOrEmpty(name) ?
Const.EMPTY_STRING : name);
remoteBuilder.addDataStrings(Strings.isNullOrEmpty(prop) ?
Const.EMPTY_STRING : prop);
@@ -171,7 +169,7 @@ public class ServiceInventory extends RegisterSource {
setRegisterTime(remoteData.getDataLongs(0));
setHeartbeatTime(remoteData.getDataLongs(1));
- setMappingLastUpdateTime(remoteData.getDataLongs(2));
+ setLastUpdateTime(remoteData.getDataLongs(2));
setName(remoteData.getDataStrings(0));
setProp(remoteData.getDataStrings(1));
@@ -183,21 +181,22 @@ public class ServiceInventory extends RegisterSource {
}
@Override public boolean combine(RegisterSource registerSource) {
- super.combine(registerSource);
+ boolean isChanged = super.combine(registerSource);
ServiceInventory serviceInventory = (ServiceInventory)registerSource;
- nodeType = serviceInventory.nodeType;
- setProp(serviceInventory.getProp());
- if (Const.NONE != serviceInventory.getMappingServiceId() &&
serviceInventory.getMappingLastUpdateTime() >= this.getMappingLastUpdateTime())
{
- this.mappingServiceId = serviceInventory.getMappingServiceId();
- this.mappingLastUpdateTime =
serviceInventory.getMappingLastUpdateTime();
+ if (serviceInventory.getLastUpdateTime() >= this.getLastUpdateTime()) {
+ this.nodeType = serviceInventory.getNodeType();
+ setProp(serviceInventory.getProp());
+ if (Const.NONE != serviceInventory.getMappingServiceId()) {
+ this.mappingServiceId = serviceInventory.getMappingServiceId();
+ }
+ isChanged = true;
}
- return true;
+ return isChanged;
}
public static class PropertyUtil {
-
public static final String DATABASE = "database";
}
@@ -205,15 +204,15 @@ public class ServiceInventory extends RegisterSource {
@Override public ServiceInventory map2Data(Map<String, Object> dbMap) {
ServiceInventory inventory = new ServiceInventory();
- inventory.setSequence((Integer)dbMap.get(SEQUENCE));
- inventory.setIsAddress((Integer)dbMap.get(IS_ADDRESS));
-
inventory.setMappingServiceId((Integer)dbMap.get(MAPPING_SERVICE_ID));
+ inventory.setSequence(((Number)dbMap.get(SEQUENCE)).intValue());
+ inventory.setIsAddress(((Number)dbMap.get(IS_ADDRESS)).intValue());
+
inventory.setMappingServiceId(((Number)dbMap.get(MAPPING_SERVICE_ID)).intValue());
inventory.setName((String)dbMap.get(NAME));
- inventory.setAddressId((Integer)dbMap.get(ADDRESS_ID));
- inventory.setNodeType((Integer)dbMap.get(NODE_TYPE));
- inventory.setRegisterTime((Long)dbMap.get(REGISTER_TIME));
- inventory.setHeartbeatTime((Long)dbMap.get(HEARTBEAT_TIME));
-
inventory.setMappingLastUpdateTime((Long)dbMap.get(MAPPING_LAST_UPDATE_TIME));
+ inventory.setAddressId(((Number)dbMap.get(ADDRESS_ID)).intValue());
+ inventory.setNodeType(((Number)dbMap.get(NODE_TYPE)).intValue());
+
inventory.setRegisterTime(((Number)dbMap.get(REGISTER_TIME)).longValue());
+
inventory.setHeartbeatTime(((Number)dbMap.get(HEARTBEAT_TIME)).longValue());
+
inventory.setLastUpdateTime(((Number)dbMap.get(LAST_UPDATE_TIME)).longValue());
inventory.setProp((String)dbMap.get(PROPERTIES));
return inventory;
}
@@ -228,7 +227,7 @@ public class ServiceInventory extends RegisterSource {
map.put(NODE_TYPE, storageData.getNodeType());
map.put(REGISTER_TIME, storageData.getRegisterTime());
map.put(HEARTBEAT_TIME, storageData.getHeartbeatTime());
- map.put(MAPPING_LAST_UPDATE_TIME,
storageData.getMappingLastUpdateTime());
+ map.put(LAST_UPDATE_TIME, storageData.getLastUpdateTime());
map.put(PROPERTIES, storageData.getProp());
return map;
}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/service/NetworkAddressInventoryRegister.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/service/NetworkAddressInventoryRegister.java
index 4cf831d..2f3e4e6 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/service/NetworkAddressInventoryRegister.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/service/NetworkAddressInventoryRegister.java
@@ -115,7 +115,7 @@ public class NetworkAddressInventoryRegister implements
INetworkAddressInventory
if (!this.compare(networkAddress, nodeType)) {
NetworkAddressInventory newNetworkAddress =
networkAddress.getClone();
newNetworkAddress.setNetworkAddressNodeType(nodeType);
- newNetworkAddress.setHeartbeatTime(System.currentTimeMillis());
+ newNetworkAddress.setLastUpdateTime(System.currentTimeMillis());
InventoryStreamProcessor.getInstance().in(newNetworkAddress);
}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/service/ServiceInventoryRegister.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/service/ServiceInventoryRegister.java
index bc6deb8..b5640b3 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/service/ServiceInventoryRegister.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/service/ServiceInventoryRegister.java
@@ -64,7 +64,7 @@ public class ServiceInventoryRegister implements
IServiceInventoryRegister {
serviceInventory.setRegisterTime(now);
serviceInventory.setHeartbeatTime(now);
serviceInventory.setMappingServiceId(Const.NONE);
- serviceInventory.setMappingLastUpdateTime(now);
+ serviceInventory.setLastUpdateTime(now);
serviceInventory.setProperties(properties);
InventoryStreamProcessor.getInstance().in(serviceInventory);
@@ -84,7 +84,7 @@ public class ServiceInventoryRegister implements
IServiceInventoryRegister {
long now = System.currentTimeMillis();
serviceInventory.setRegisterTime(now);
serviceInventory.setHeartbeatTime(now);
- serviceInventory.setMappingLastUpdateTime(now);
+ serviceInventory.setLastUpdateTime(now);
InventoryStreamProcessor.getInstance().in(serviceInventory);
}
@@ -98,7 +98,7 @@ public class ServiceInventoryRegister implements
IServiceInventoryRegister {
serviceInventory = serviceInventory.getClone();
serviceInventory.setServiceNodeType(nodeType);
serviceInventory.setProperties(properties);
-
serviceInventory.setMappingLastUpdateTime(System.currentTimeMillis());
+ serviceInventory.setLastUpdateTime(System.currentTimeMillis());
InventoryStreamProcessor.getInstance().in(serviceInventory);
}
@@ -124,7 +124,7 @@ public class ServiceInventoryRegister implements
IServiceInventoryRegister {
if (Objects.nonNull(serviceInventory)) {
serviceInventory = serviceInventory.getClone();
serviceInventory.setMappingServiceId(mappingServiceId);
-
serviceInventory.setMappingLastUpdateTime(System.currentTimeMillis());
+ serviceInventory.setLastUpdateTime(System.currentTimeMillis());
InventoryStreamProcessor.getInstance().in(serviceInventory);
} else {
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/cache/INetworkAddressInventoryCacheDAO.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/cache/INetworkAddressInventoryCacheDAO.java
index 614f97e..f14bd3c 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/cache/INetworkAddressInventoryCacheDAO.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/cache/INetworkAddressInventoryCacheDAO.java
@@ -18,6 +18,7 @@
package org.apache.skywalking.oap.server.core.storage.cache;
+import java.util.List;
import org.apache.skywalking.oap.server.core.register.NetworkAddressInventory;
import org.apache.skywalking.oap.server.core.storage.DAO;
@@ -29,4 +30,6 @@ public interface INetworkAddressInventoryCacheDAO extends DAO
{
int getAddressId(String networkAddress);
NetworkAddressInventory get(int addressId);
+
+ List<NetworkAddressInventory> loadLastUpdate(long lastUpdateTime);
}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/cache/IServiceInventoryCacheDAO.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/cache/IServiceInventoryCacheDAO.java
index a1ea367..e547dbd 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/cache/IServiceInventoryCacheDAO.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/cache/IServiceInventoryCacheDAO.java
@@ -33,5 +33,5 @@ public interface IServiceInventoryCacheDAO extends DAO {
ServiceInventory get(int serviceId);
- List<ServiceInventory> loadLastMappingUpdate();
+ List<ServiceInventory> loadLastUpdate(long lastUpdateTime);
}
diff --git
a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/ReferenceIdExchanger.java
b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/ReferenceIdExchanger.java
index 2e8b98c..f1d72ae 100644
---
a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/ReferenceIdExchanger.java
+++
b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/ReferenceIdExchanger.java
@@ -116,10 +116,6 @@ public class ReferenceIdExchanger implements
IdExchanger<ReferenceDecorator> {
* Need to try to get the id by assuming the endpoint name is detected at
server, local or client.
*
* If agent does the exchange, then always use endpoint id.
- *
- * @param serviceId
- * @param endpointName
- * @return
*/
private int getEndpointId(int serviceId, String endpointName) {
int endpointId = endpointInventoryRegister.get(serviceId,
endpointName, DetectPoint.SERVER.ordinal());
diff --git
a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SpanIdExchanger.java
b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SpanIdExchanger.java
index 42bf56c..f61850b 100644
---
a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SpanIdExchanger.java
+++
b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SpanIdExchanger.java
@@ -103,12 +103,11 @@ public class SpanIdExchanger implements
IdExchanger<SpanDecorator> {
NodeType nodeType = NodeType.fromSpanLayerValue(spanLayerValue);
networkAddressInventoryRegister.update(peerId, nodeType);
- /**
+ /*
* In some case, conjecture node, such as Database node, could be
registered by agents.
* At here, if the target service properties need to be updated,
* it will only be updated at the first time for now.
*/
-
JsonObject properties = null;
ServiceInventory newServiceInventory =
serviceInventoryCacheDAO.get(serviceInventoryCacheDAO.getServiceId(peerId));
if (SpanLayer.Database.equals(standardBuilder.getSpanLayer())) {
diff --git a/oap-server/server-starter/src/main/resources/log4j2.xml
b/oap-server/server-starter/src/main/resources/log4j2.xml
index cf83c5b..72dcf43 100644
--- a/oap-server/server-starter/src/main/resources/log4j2.xml
+++ b/oap-server/server-starter/src/main/resources/log4j2.xml
@@ -36,7 +36,7 @@
<logger name="org.apache.skywalking.oap.server.core.remote.client"
level="DEBUG"/>
<logger name="org.apache.skywalking.oap.server.library.buffer"
level="INFO"/>
<logger name="org.apache.skywalking.oap.server.receiver.so11y"
level="DEBUG" />
- <Root level="ERROR">
+ <Root level="DEBUG">
<AppenderRef ref="Console"/>
</Root>
</Loggers>
diff --git
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/cache/NetworkAddressInventoryCacheEsDAO.java
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/cache/NetworkAddressInventoryCacheEsDAO.java
index 5dfc591..b53d024 100644
---
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/cache/NetworkAddressInventoryCacheEsDAO.java
+++
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/cache/NetworkAddressInventoryCacheEsDAO.java
@@ -18,6 +18,7 @@
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.cache;
+import java.util.*;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.register.NetworkAddressInventory;
import
org.apache.skywalking.oap.server.core.storage.cache.INetworkAddressInventoryCacheDAO;
@@ -76,4 +77,24 @@ public class NetworkAddressInventoryCacheEsDAO extends EsDAO
implements INetwork
return null;
}
}
+
+ @Override public List<NetworkAddressInventory> loadLastUpdate(long
lastUpdateTime) {
+ List<NetworkAddressInventory> addressInventories = new ArrayList<>();
+
+ try {
+ SearchSourceBuilder searchSourceBuilder = new
SearchSourceBuilder();
+
searchSourceBuilder.query(QueryBuilders.rangeQuery(NetworkAddressInventory.LAST_UPDATE_TIME).gte(lastUpdateTime));
+ searchSourceBuilder.size(500);
+
+ SearchResponse response =
getClient().search(NetworkAddressInventory.INDEX_NAME, searchSourceBuilder);
+
+ for (SearchHit searchHit : response.getHits().getHits()) {
+
addressInventories.add(this.builder.map2Data(searchHit.getSourceAsMap()));
+ }
+ } catch (Throwable t) {
+ logger.error(t.getMessage(), t);
+ }
+
+ return addressInventories;
+ }
}
diff --git
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/cache/ServiceInventoryCacheEsDAO.java
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/cache/ServiceInventoryCacheEsDAO.java
index f1f4de3..395a98b 100644
---
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/cache/ServiceInventoryCacheEsDAO.java
+++
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/cache/ServiceInventoryCacheEsDAO.java
@@ -88,7 +88,7 @@ public class ServiceInventoryCacheEsDAO extends EsDAO
implements IServiceInvento
}
}
- @Override public List<ServiceInventory> loadLastMappingUpdate() {
+ @Override public List<ServiceInventory> loadLastUpdate(long
lastUpdateTime) {
List<ServiceInventory> serviceInventories = new ArrayList<>();
try {
@@ -96,10 +96,10 @@ public class ServiceInventoryCacheEsDAO extends EsDAO
implements IServiceInvento
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
boolQuery.must().add(QueryBuilders.termQuery(ServiceInventory.IS_ADDRESS,
BooleanUtils.TRUE));
-
boolQuery.must().add(QueryBuilders.rangeQuery(ServiceInventory.MAPPING_LAST_UPDATE_TIME).gte(System.currentTimeMillis()
- 30 * 60 * 1000));
+
boolQuery.must().add(QueryBuilders.rangeQuery(ServiceInventory.LAST_UPDATE_TIME).gte(lastUpdateTime));
searchSourceBuilder.query(boolQuery);
- searchSourceBuilder.size(50);
+ searchSourceBuilder.size(500);
SearchResponse response =
getClient().search(ServiceInventory.INDEX_NAME, searchSourceBuilder);
diff --git
a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2NetworkAddressInventoryCacheDAO.java
b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2NetworkAddressInventoryCacheDAO.java
index d390088..d30df13 100644
---
a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2NetworkAddressInventoryCacheDAO.java
+++
b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2NetworkAddressInventoryCacheDAO.java
@@ -19,11 +19,12 @@
package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao;
import java.io.IOException;
-import org.apache.skywalking.oap.server.core.register.NetworkAddressInventory;
+import java.sql.*;
+import java.util.*;
+import org.apache.skywalking.oap.server.core.register.*;
import
org.apache.skywalking.oap.server.core.storage.cache.INetworkAddressInventoryCacheDAO;
import
org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.slf4j.*;
/**
* @author wusheng
@@ -49,4 +50,32 @@ public class H2NetworkAddressInventoryCacheDAO extends
H2SQLExecutor implements
return null;
}
}
+
+ @Override public List<NetworkAddressInventory> loadLastUpdate(long
lastUpdateTime) {
+ List<NetworkAddressInventory> addressInventories = new ArrayList<>();
+
+ try {
+ StringBuilder sql = new StringBuilder("select * from ");
+ sql.append(NetworkAddressInventory.INDEX_NAME);
+ sql.append(" where
").append(NetworkAddressInventory.LAST_UPDATE_TIME).append(">?");
+
+ try (Connection connection = h2Client.getConnection()) {
+ try (ResultSet resultSet = h2Client.executeQuery(connection,
sql.toString(), lastUpdateTime)) {
+ NetworkAddressInventory addressInventory;
+ do {
+ addressInventory =
(NetworkAddressInventory)toStorageData(resultSet,
NetworkAddressInventory.INDEX_NAME, new ServiceInventory.Builder());
+ if (addressInventory != null) {
+ addressInventories.add(addressInventory);
+ }
+ }
+ while (addressInventory != null);
+ }
+ } catch (SQLException e) {
+ throw new IOException(e);
+ }
+ } catch (Throwable t) {
+ logger.error(t.getMessage(), t);
+ }
+ return addressInventories;
+ }
}
diff --git
a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2ServiceInventoryCacheDAO.java
b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2ServiceInventoryCacheDAO.java
index a52e216..bf8bd5f 100644
---
a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2ServiceInventoryCacheDAO.java
+++
b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2ServiceInventoryCacheDAO.java
@@ -57,17 +57,17 @@ public class H2ServiceInventoryCacheDAO extends
H2SQLExecutor implements IServic
}
}
- @Override public List<ServiceInventory> loadLastMappingUpdate() {
+ @Override public List<ServiceInventory> loadLastUpdate(long
lastUpdateTime) {
List<ServiceInventory> serviceInventories = new ArrayList<>();
try {
StringBuilder sql = new StringBuilder("select * from ");
sql.append(ServiceInventory.INDEX_NAME);
sql.append(" where
").append(ServiceInventory.IS_ADDRESS).append("=? ");
- sql.append(" and
").append(ServiceInventory.MAPPING_LAST_UPDATE_TIME).append(">?");
+ sql.append(" and
").append(ServiceInventory.LAST_UPDATE_TIME).append(">?");
try (Connection connection = h2Client.getConnection()) {
- try (ResultSet resultSet = h2Client.executeQuery(connection,
sql.toString(), BooleanUtils.TRUE, System.currentTimeMillis() - 30 * 60 *
1000)) {
+ try (ResultSet resultSet = h2Client.executeQuery(connection,
sql.toString(), BooleanUtils.TRUE, lastUpdateTime)) {
ServiceInventory serviceInventory;
do {
serviceInventory =
(ServiceInventory)toStorageData(resultSet, ServiceInventory.INDEX_NAME, new
ServiceInventory.Builder());