This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch bug-fix
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git
The following commit(s) were added to refs/heads/bug-fix by this push:
new 34ac3fe Fix bug of mesh heartbeat and rest receiver.
34ac3fe is described below
commit 34ac3fe3a2c17a72330775d4cab139686d21afcf
Author: Wu Sheng <[email protected]>
AuthorDate: Thu Nov 1 14:45:59 2018 +0800
Fix bug of mesh heartbeat and rest receiver.
---
.../receiver/mesh/TelemetryDataDispatcher.java | 27 ++++++++++++++++++++++
.../v5/grpc/InstanceDiscoveryServiceHandler.java | 2 +-
.../v5/rest/InstanceHeartBeatServletHandler.java | 14 +++++++++++
.../elasticsearch/query/MetadataQueryEsDAO.java | 1 -
4 files changed, 42 insertions(+), 2 deletions(-)
diff --git
a/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/TelemetryDataDispatcher.java
b/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/TelemetryDataDispatcher.java
index 7fa6b05..7c9e749 100644
---
a/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/TelemetryDataDispatcher.java
+++
b/oap-server/server-receiver-plugin/skywalking-mesh-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/mesh/TelemetryDataDispatcher.java
@@ -18,13 +18,16 @@
package org.apache.skywalking.aop.server.receiver.mesh;
+import java.util.Objects;
import org.apache.logging.log4j.util.Strings;
import org.apache.skywalking.apm.network.servicemesh.Protocol;
import org.apache.skywalking.apm.network.servicemesh.ServiceMeshMetric;
import org.apache.skywalking.oap.server.core.CoreModule;
import
org.apache.skywalking.oap.server.core.cache.ServiceInstanceInventoryCache;
import org.apache.skywalking.oap.server.core.cache.ServiceInventoryCache;
+import org.apache.skywalking.oap.server.core.register.ServiceInstanceInventory;
import
org.apache.skywalking.oap.server.core.register.service.IServiceInstanceInventoryRegister;
+import
org.apache.skywalking.oap.server.core.register.service.IServiceInventoryRegister;
import org.apache.skywalking.oap.server.core.source.All;
import org.apache.skywalking.oap.server.core.source.DetectPoint;
import org.apache.skywalking.oap.server.core.source.Endpoint;
@@ -36,6 +39,8 @@ import
org.apache.skywalking.oap.server.core.source.ServiceRelation;
import org.apache.skywalking.oap.server.core.source.SourceReceiver;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.util.TimeBucketUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* TelemetryDataDispatcher processes the {@link ServiceMeshMetric} format
telemetry data, transfers it to source
@@ -44,11 +49,14 @@ import
org.apache.skywalking.oap.server.library.util.TimeBucketUtils;
* @author wusheng
*/
public class TelemetryDataDispatcher {
+ private static final Logger logger =
LoggerFactory.getLogger(TelemetryDataDispatcher.class);
+
private static MeshDataBufferFileCache CACHE;
private static ServiceInventoryCache SERVICE_CACHE;
private static ServiceInstanceInventoryCache SERVICE_INSTANCE_CACHE;
private static SourceReceiver SOURCE_RECEIVER;
private static IServiceInstanceInventoryRegister
SERVICE_INSTANCE_INVENTORY_REGISTER;
+ private static IServiceInventoryRegister SERVICE_INVENTORY_REGISTER;
private TelemetryDataDispatcher() {
@@ -60,6 +68,7 @@ public class TelemetryDataDispatcher {
SERVICE_INSTANCE_CACHE =
moduleManager.find(CoreModule.NAME).getService(ServiceInstanceInventoryCache.class);
SOURCE_RECEIVER =
moduleManager.find(CoreModule.NAME).getService(SourceReceiver.class);
SERVICE_INSTANCE_INVENTORY_REGISTER =
moduleManager.find(CoreModule.NAME).getService(IServiceInstanceInventoryRegister.class);
+ SERVICE_INVENTORY_REGISTER =
moduleManager.find(CoreModule.NAME).getService(IServiceInventoryRegister.class);
}
public static void preProcess(ServiceMeshMetric data) {
@@ -93,8 +102,26 @@ public class TelemetryDataDispatcher {
private static void heartbeat(ServiceMeshMetricDataDecorator decorator,
long minuteTimeBucket) {
ServiceMeshMetric metric = decorator.getMetric();
+
+ // source
SERVICE_INSTANCE_INVENTORY_REGISTER.heartbeat(metric.getSourceServiceInstanceId(),
metric.getEndTime());
+ int instanceId = metric.getSourceServiceInstanceId();
+ ServiceInstanceInventory serviceInstanceInventory =
SERVICE_INSTANCE_CACHE.get(instanceId);
+ if (Objects.nonNull(serviceInstanceInventory)) {
+
SERVICE_INVENTORY_REGISTER.heartbeat(serviceInstanceInventory.getServiceId(),
metric.getEndTime());
+ } else {
+ logger.warn("Can't found service by service instance id from
cache, service instance id is: {}", instanceId);
+ }
+
+ // dest
SERVICE_INSTANCE_INVENTORY_REGISTER.heartbeat(metric.getDestServiceInstanceId(),
metric.getEndTime());
+ instanceId = metric.getDestServiceInstanceId();
+ serviceInstanceInventory = SERVICE_INSTANCE_CACHE.get(instanceId);
+ if (Objects.nonNull(serviceInstanceInventory)) {
+
SERVICE_INVENTORY_REGISTER.heartbeat(serviceInstanceInventory.getServiceId(),
metric.getEndTime());
+ } else {
+ logger.warn("Can't found service by service instance id from
cache, service instance id is: {}", instanceId);
+ }
}
private static void toAll(ServiceMeshMetricDataDecorator decorator, long
minuteTimeBucket) {
diff --git
a/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/handler/v5/grpc/InstanceDiscoveryServiceHandler.java
b/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/handler/v5/grpc/InstanceDiscoveryServiceHandler.java
index 378162e..5da0182 100644
---
a/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/handler/v5/grpc/InstanceDiscoveryServiceHandler.java
+++
b/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/handler/v5/grpc/InstanceDiscoveryServiceHandler.java
@@ -88,7 +88,7 @@ public class InstanceDiscoveryServiceHandler extends
InstanceDiscoveryServiceGrp
if (Objects.nonNull(serviceInstanceInventory)) {
serviceInventoryRegister.heartbeat(serviceInstanceInventory.getServiceId(),
heartBeatTime);
} else {
- logger.warn("Can't found service instance by service instance id
from cache, service instance id is: {}", serviceInstanceId);
+ logger.warn("Can't found service by service instance id from
cache, service instance id is: {}", serviceInstanceId);
}
responseObserver.onNext(Downstream.getDefaultInstance());
diff --git
a/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/handler/v5/rest/InstanceHeartBeatServletHandler.java
b/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/handler/v5/rest/InstanceHeartBeatServletHandler.java
index 1c112c7..df2a7fe 100644
---
a/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/handler/v5/rest/InstanceHeartBeatServletHandler.java
+++
b/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/handler/v5/rest/InstanceHeartBeatServletHandler.java
@@ -20,9 +20,13 @@ package
org.apache.skywalking.oap.server.receiver.register.provider.handler.v5.r
import com.google.gson.*;
import java.io.IOException;
+import java.util.Objects;
import javax.servlet.http.HttpServletRequest;
import org.apache.skywalking.oap.server.core.CoreModule;
+import
org.apache.skywalking.oap.server.core.cache.ServiceInstanceInventoryCache;
+import org.apache.skywalking.oap.server.core.register.ServiceInstanceInventory;
import
org.apache.skywalking.oap.server.core.register.service.IServiceInstanceInventoryRegister;
+import
org.apache.skywalking.oap.server.core.register.service.IServiceInventoryRegister;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.server.jetty.*;
import org.slf4j.*;
@@ -35,6 +39,8 @@ public class InstanceHeartBeatServletHandler extends
JettyJsonHandler {
private static final Logger logger =
LoggerFactory.getLogger(InstanceHeartBeatServletHandler.class);
private final IServiceInstanceInventoryRegister
serviceInstanceInventoryRegister;
+ private final ServiceInstanceInventoryCache serviceInstanceInventoryCache;
+ private final IServiceInventoryRegister serviceInventoryRegister;
private final Gson gson = new Gson();
private static final String INSTANCE_ID = "ii";
@@ -42,6 +48,8 @@ public class InstanceHeartBeatServletHandler extends
JettyJsonHandler {
public InstanceHeartBeatServletHandler(ModuleManager moduleManager) {
this.serviceInstanceInventoryRegister =
moduleManager.find(CoreModule.NAME).getService(IServiceInstanceInventoryRegister.class);
+ this.serviceInstanceInventoryCache =
moduleManager.find(CoreModule.NAME).getService(ServiceInstanceInventoryCache.class);
+ this.serviceInventoryRegister =
moduleManager.find(CoreModule.NAME).getService(IServiceInventoryRegister.class);
}
@Override public String pathSpec() {
@@ -60,6 +68,12 @@ public class InstanceHeartBeatServletHandler extends
JettyJsonHandler {
long heartBeatTime = heartBeat.get(HEARTBEAT_TIME).getAsLong();
serviceInstanceInventoryRegister.heartbeat(instanceId,
heartBeatTime);
+ ServiceInstanceInventory serviceInstanceInventory =
serviceInstanceInventoryCache.get(instanceId);
+ if (Objects.nonNull(serviceInstanceInventory)) {
+
serviceInventoryRegister.heartbeat(serviceInstanceInventory.getServiceId(),
heartBeatTime);
+ } else {
+ logger.warn("Can't found service by service instance id from
cache, service instance id is: {}", instanceId);
+ }
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
diff --git
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetadataQueryEsDAO.java
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetadataQueryEsDAO.java
index ab8c727..442381a 100644
---
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetadataQueryEsDAO.java
+++
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetadataQueryEsDAO.java
@@ -61,7 +61,6 @@ public class MetadataQueryEsDAO extends EsDAO implements
IMetadataQueryDAO {
SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
- boolQueryBuilder.must().add(timeRangeQueryBuild(startTimestamp,
endTimestamp));
boolQueryBuilder.must().add(QueryBuilders.termQuery(EndpointInventory.DETECT_POINT,
DetectPoint.SERVER.ordinal()));