This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new 4103551 bugfix: Envoy error logs are not persisted when no metrics
are generated (#6911)
4103551 is described below
commit 4103551092b51f733693daa1e7f3e9aba95141e5
Author: Zhenxu Ke <[email protected]>
AuthorDate: Fri May 7 23:25:20 2021 +0800
bugfix: Envoy error logs are not persisted when no metrics are generated
(#6911)
---
CHANGES.md | 3 +-
.../src/main/resources/lal/envoy-als.yaml | 4 +-
.../envoy/AccessLogServiceGRPCHandler.java | 14 ++++--
.../receiver/envoy/als/AccessLogAnalyzer.java | 20 ++++++++-
.../envoy/als/LogEntry2MetricsAdapter.java | 2 +-
.../als/k8s/K8sALSServiceMeshHTTPAnalysis.java | 31 +++++++-------
.../envoy/als/mx/MetaExchangeALSHTTPAnalyzer.java | 27 ++++++------
.../envoy/als/tcp/TCPLogEntry2MetricsAdapter.java | 2 +-
.../als/tcp/k8s/K8sALSServiceMeshTCPAnalysis.java | 33 +++++++-------
.../tcp/mx/MetaExchangeTCPAccessLogAnalyzer.java | 27 ++++++------
.../envoy/persistence/LogsPersistence.java | 40 ++++++-----------
...ogsPersistence.java => TCPLogsPersistence.java} | 50 ++++++++--------------
...ver.receiver.envoy.als.tcp.TCPAccessLogAnalyzer | 1 +
.../als/k8s/K8SALSServiceMeshHTTPAnalysisTest.java | 29 ++++++-------
14 files changed, 140 insertions(+), 143 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index f80e5cd..cee0732 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -36,7 +36,8 @@ Release Notes.
* CVE: fix Jetty vulnerability. https://nvd.nist.gov/vuln/detail/CVE-2019-17638
* Fix: MAL function would miss samples name after creating new samples.
* perf: use iterator.remove() to remove modulesWithoutProvider
-* Support analyzing Envoy TCP access logs.
+* Support analyzing Envoy TCP access logs and persist error TCP logs.
+* Fix: Envoy error logs are not persisted when no metrics are generated
#### UI
* Add logo for kong plugin.
diff --git a/oap-server/server-bootstrap/src/main/resources/lal/envoy-als.yaml
b/oap-server/server-bootstrap/src/main/resources/lal/envoy-als.yaml
index 4be3213..96a7c1a 100644
--- a/oap-server/server-bootstrap/src/main/resources/lal/envoy-als.yaml
+++ b/oap-server/server-bootstrap/src/main/resources/lal/envoy-als.yaml
@@ -24,7 +24,9 @@ rules:
abort {}
}
extractor {
- tag 'status.code': parsed?.response?.responseCode as int
+ if (parsed?.response?.responseCode) {
+ tag 'status.code': parsed?.response?.responseCode as int
+ }
tag 'response.flag':
parsed?.commonProperties?.responseFlags?.keySet()
}
sink {
diff --git
a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/AccessLogServiceGRPCHandler.java
b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/AccessLogServiceGRPCHandler.java
index f3bc5ef..14c052e 100644
---
a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/AccessLogServiceGRPCHandler.java
+++
b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/AccessLogServiceGRPCHandler.java
@@ -31,7 +31,9 @@ import
org.apache.skywalking.aop.server.receiver.mesh.TelemetryDataDispatcher;
import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
+import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.receiver.envoy.als.ALSHTTPAnalysis;
+import org.apache.skywalking.oap.server.receiver.envoy.als.AccessLogAnalyzer;
import org.apache.skywalking.oap.server.receiver.envoy.als.Role;
import
org.apache.skywalking.oap.server.receiver.envoy.als.tcp.TCPAccessLogAnalyzer;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
@@ -129,11 +131,13 @@ public class AccessLogServiceGRPCHandler extends
AccessLogServiceGrpc.AccessLogS
StreamAccessLogsMessage.HTTPAccessLogEntries logs
= message.getHttpLogs();
for (final HTTPAccessLogEntry log :
logs.getLogEntryList()) {
- List<ServiceMeshMetric.Builder> result = new
ArrayList<>();
+ AccessLogAnalyzer.Result result =
AccessLogAnalyzer.Result.builder().build();
for (ALSHTTPAnalysis analysis :
envoyHTTPAnalysisList) {
result = analysis.analysis(result,
identifier, log, role);
}
- sourceResult.addAll(result);
+ if
(CollectionUtils.isNotEmpty(result.getMetrics())) {
+ sourceResult.addAll(result.getMetrics());
+ }
}
break;
@@ -141,11 +145,13 @@ public class AccessLogServiceGRPCHandler extends
AccessLogServiceGrpc.AccessLogS
StreamAccessLogsMessage.TCPAccessLogEntries
tcpLogs = message.getTcpLogs();
for (final TCPAccessLogEntry tcpLog :
tcpLogs.getLogEntryList()) {
- List<ServiceMeshMetric.Builder> result = new
ArrayList<>();
+ AccessLogAnalyzer.Result result =
AccessLogAnalyzer.Result.builder().build();
for (TCPAccessLogAnalyzer analyzer :
envoyTCPAnalysisList) {
result = analyzer.analysis(result,
identifier, tcpLog, role);
}
- sourceResult.addAll(result);
+ if
(CollectionUtils.isNotEmpty(result.getMetrics())) {
+ sourceResult.addAll(result.getMetrics());
+ }
}
break;
diff --git
a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/AccessLogAnalyzer.java
b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/AccessLogAnalyzer.java
index 4dcf883..36ea04c 100644
---
a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/AccessLogAnalyzer.java
+++
b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/AccessLogAnalyzer.java
@@ -21,6 +21,8 @@ package org.apache.skywalking.oap.server.receiver.envoy.als;
import io.envoyproxy.envoy.config.core.v3.Node;
import io.envoyproxy.envoy.service.accesslog.v3.StreamAccessLogsMessage;
import java.util.List;
+import lombok.Builder;
+import lombok.Data;
import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
@@ -42,8 +44,8 @@ public interface AccessLogAnalyzer<E> {
* @param role the role of the Envoy node where the logs are emitted.
* @return the analysis results.
*/
- List<ServiceMeshMetric.Builder> analysis(
- final List<ServiceMeshMetric.Builder> result,
+ Result analysis(
+ final Result result,
final StreamAccessLogsMessage.Identifier identifier,
final E entry,
final Role role
@@ -65,4 +67,18 @@ public interface AccessLogAnalyzer<E> {
}
return defaultRole;
}
+
+ @Data
+ @Builder
+ class Result {
+ /**
+ * The service representing the Envoy node.
+ */
+ private ServiceMetaInfo service;
+
+ /**
+ * The analyzed metrics result.
+ */
+ private List<ServiceMeshMetric.Builder> metrics;
+ }
}
diff --git
a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/LogEntry2MetricsAdapter.java
b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/LogEntry2MetricsAdapter.java
index 7b6f8fb..63da419 100644
---
a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/LogEntry2MetricsAdapter.java
+++
b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/LogEntry2MetricsAdapter.java
@@ -91,7 +91,7 @@ public class LogEntry2MetricsAdapter {
.setDetectPoint(DetectPoint.client);
}
- protected ServiceMeshMetric.Builder adaptCommonPart() {
+ public ServiceMeshMetric.Builder adaptCommonPart() {
final AccessLogCommon properties = entry.getCommonProperties();
final String endpoint = endpoint();
int responseCode = entry.getResponse().getResponseCode().getValue();
diff --git
a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8sALSServiceMeshHTTPAnalysis.java
b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8sALSServiceMeshHTTPAnalysis.java
index e761ff7..f521a3e 100644
---
a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8sALSServiceMeshHTTPAnalysis.java
+++
b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8sALSServiceMeshHTTPAnalysis.java
@@ -24,7 +24,6 @@ import io.envoyproxy.envoy.data.accesslog.v3.AccessLogCommon;
import io.envoyproxy.envoy.data.accesslog.v3.HTTPAccessLogEntry;
import io.envoyproxy.envoy.service.accesslog.v3.StreamAccessLogsMessage;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
@@ -60,17 +59,17 @@ public class K8sALSServiceMeshHTTPAnalysis extends
AbstractALSAnalyzer {
}
@Override
- public List<ServiceMeshMetric.Builder> analysis(
- final List<ServiceMeshMetric.Builder> result,
+ public Result analysis(
+ final Result result,
final StreamAccessLogsMessage.Identifier identifier,
final HTTPAccessLogEntry entry,
final Role role
) {
- if (isNotEmpty(result)) {
+ if (isNotEmpty(result.getMetrics())) {
return result;
}
if (serviceRegistry.isEmpty()) {
- return Collections.emptyList();
+ return Result.builder().build();
}
switch (role) {
case PROXY:
@@ -79,17 +78,17 @@ public class K8sALSServiceMeshHTTPAnalysis extends
AbstractALSAnalyzer {
return analyzeSideCar(entry);
}
- return Collections.emptyList();
+ return Result.builder().build();
}
- protected List<ServiceMeshMetric.Builder> analyzeSideCar(final
HTTPAccessLogEntry entry) {
+ protected Result analyzeSideCar(final HTTPAccessLogEntry entry) {
if (!entry.hasCommonProperties()) {
- return Collections.emptyList();
+ return Result.builder().build();
}
final AccessLogCommon properties = entry.getCommonProperties();
final String cluster = properties.getUpstreamCluster();
if (isBlank(cluster)) {
- return Collections.emptyList();
+ return Result.builder().build();
}
final List<ServiceMeshMetric.Builder> sources = new ArrayList<>();
@@ -101,7 +100,7 @@ public class K8sALSServiceMeshHTTPAnalysis extends
AbstractALSAnalyzer {
final ServiceMetaInfo downstreamService =
find(downstreamRemoteAddress.getSocketAddress().getAddress());
final Address downstreamLocalAddress =
properties.getDownstreamLocalAddress();
if (!isValid(downstreamRemoteAddress) ||
!isValid(downstreamLocalAddress)) {
- return Collections.emptyList();
+ return Result.builder().build();
}
final ServiceMetaInfo localService =
find(downstreamLocalAddress.getSocketAddress().getAddress());
@@ -125,7 +124,7 @@ public class K8sALSServiceMeshHTTPAnalysis extends
AbstractALSAnalyzer {
// sidecar(client side) -> sidecar
final Address upstreamRemoteAddress =
properties.getUpstreamRemoteAddress();
if (!isValid(upstreamRemoteAddress)) {
- return sources;
+ return
Result.builder().metrics(sources).service(localService).build();
}
final ServiceMetaInfo destService =
find(upstreamRemoteAddress.getSocketAddress().getAddress());
@@ -135,12 +134,12 @@ public class K8sALSServiceMeshHTTPAnalysis extends
AbstractALSAnalyzer {
sources.add(metric);
}
- return sources;
+ return Result.builder().metrics(sources).service(localService).build();
}
- protected List<ServiceMeshMetric.Builder> analyzeProxy(final
HTTPAccessLogEntry entry) {
+ protected Result analyzeProxy(final HTTPAccessLogEntry entry) {
if (!entry.hasCommonProperties()) {
- return Collections.emptyList();
+ return Result.builder().build();
}
final AccessLogCommon properties = entry.getCommonProperties();
final Address downstreamLocalAddress =
properties.getDownstreamLocalAddress();
@@ -148,7 +147,7 @@ public class K8sALSServiceMeshHTTPAnalysis extends
AbstractALSAnalyzer {
properties.getDownstreamDirectRemoteAddress() :
properties.getDownstreamRemoteAddress();
final Address upstreamRemoteAddress =
properties.getUpstreamRemoteAddress();
if (!isValid(downstreamLocalAddress) ||
!isValid(downstreamRemoteAddress) || !isValid(upstreamRemoteAddress)) {
- return Collections.emptyList();
+ return Result.builder().build();
}
final List<ServiceMeshMetric.Builder> result = new ArrayList<>(2);
@@ -175,7 +174,7 @@ public class K8sALSServiceMeshHTTPAnalysis extends
AbstractALSAnalyzer {
log.debug("Transformed ingress outbound mesh metric {}",
outboundMetric);
result.add(outboundMetric);
- return result;
+ return Result.builder().metrics(result).service(ingress).build();
}
/**
diff --git
a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/mx/MetaExchangeALSHTTPAnalyzer.java
b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/mx/MetaExchangeALSHTTPAnalyzer.java
index 0cb752e..81a3d10 100644
---
a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/mx/MetaExchangeALSHTTPAnalyzer.java
+++
b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/mx/MetaExchangeALSHTTPAnalyzer.java
@@ -23,8 +23,8 @@ import com.google.protobuf.TextFormat;
import io.envoyproxy.envoy.data.accesslog.v3.AccessLogCommon;
import io.envoyproxy.envoy.data.accesslog.v3.HTTPAccessLogEntry;
import io.envoyproxy.envoy.service.accesslog.v3.StreamAccessLogsMessage;
+import java.util.ArrayList;
import java.util.Base64;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -68,31 +68,32 @@ public class MetaExchangeALSHTTPAnalyzer extends
AbstractALSAnalyzer {
}
@Override
- public List<ServiceMeshMetric.Builder> analysis(
- final List<ServiceMeshMetric.Builder> result,
+ public Result analysis(
+ final Result previousResult,
final StreamAccessLogsMessage.Identifier identifier,
final HTTPAccessLogEntry entry,
final Role role
) {
- if (isNotEmpty(result)) {
- return result;
+ if (isNotEmpty(previousResult.getMetrics())) {
+ return previousResult;
}
if (!entry.hasCommonProperties()) {
- return Collections.emptyList();
- }
- final AccessLogCommon properties = entry.getCommonProperties();
- final Map<String, Any> stateMap =
properties.getFilterStateObjectsMap();
- if (stateMap.isEmpty()) {
- return Collections.emptyList();
+ return previousResult;
}
final ServiceMetaInfo currSvc;
try {
currSvc = adaptToServiceMetaInfo(identifier);
} catch (Exception e) {
log.error("Failed to inflate the ServiceMetaInfo from
identifier.node.metadata. ", e);
- return Collections.emptyList();
+ return previousResult;
+ }
+ final AccessLogCommon properties = entry.getCommonProperties();
+ final Map<String, Any> stateMap =
properties.getFilterStateObjectsMap();
+ if (stateMap.isEmpty()) {
+ return Result.builder().service(currSvc).build();
}
+ final List<ServiceMeshMetric.Builder> result = new ArrayList<>();
final AtomicBoolean downstreamExists = new AtomicBoolean();
stateMap.forEach((key, value) -> {
if (!key.equals(UPSTREAM_KEY) && !key.equals(DOWNSTREAM_KEY)) {
@@ -131,7 +132,7 @@ public class MetaExchangeALSHTTPAnalyzer extends
AbstractALSAnalyzer {
}
result.add(metric);
}
- return result;
+ return Result.builder().metrics(result).service(currSvc).build();
}
protected ServiceMetaInfo adaptToServiceMetaInfo(final Any value) throws
Exception {
diff --git
a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/TCPLogEntry2MetricsAdapter.java
b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/TCPLogEntry2MetricsAdapter.java
index e7d60d1..4a1882b 100644
---
a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/TCPLogEntry2MetricsAdapter.java
+++
b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/TCPLogEntry2MetricsAdapter.java
@@ -85,7 +85,7 @@ public class TCPLogEntry2MetricsAdapter {
.setDetectPoint(DetectPoint.client);
}
- protected ServiceMeshMetric.Builder adaptCommonPart() {
+ public ServiceMeshMetric.Builder adaptCommonPart() {
final AccessLogCommon properties = entry.getCommonProperties();
final ConnectionProperties connectionProperties =
entry.getConnectionProperties();
final String tlsMode = parseTLS(properties.getTlsProperties());
diff --git
a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/k8s/K8sALSServiceMeshTCPAnalysis.java
b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/k8s/K8sALSServiceMeshTCPAnalysis.java
index c3a8f36..fffbd83 100644
---
a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/k8s/K8sALSServiceMeshTCPAnalysis.java
+++
b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/k8s/K8sALSServiceMeshTCPAnalysis.java
@@ -24,7 +24,6 @@ import io.envoyproxy.envoy.data.accesslog.v3.AccessLogCommon;
import io.envoyproxy.envoy.data.accesslog.v3.TCPAccessLogEntry;
import io.envoyproxy.envoy.service.accesslog.v3.StreamAccessLogsMessage;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
@@ -59,36 +58,36 @@ public class K8sALSServiceMeshTCPAnalysis extends
AbstractTCPAccessLogAnalyzer {
}
@Override
- public List<ServiceMeshMetric.Builder> analysis(
- final List<ServiceMeshMetric.Builder> result,
+ public Result analysis(
+ final Result previousResult,
final StreamAccessLogsMessage.Identifier identifier,
final TCPAccessLogEntry entry,
final Role role
) {
- if (isNotEmpty(result)) {
- return result;
+ if (isNotEmpty(previousResult.getMetrics())) {
+ return previousResult;
}
if (serviceRegistry.isEmpty()) {
- return Collections.emptyList();
+ return previousResult;
}
switch (role) {
case PROXY:
- return analyzeProxy(entry);
+ return analyzeProxy(previousResult, entry);
case SIDECAR:
- return analyzeSideCar(entry);
+ return analyzeSideCar(previousResult, entry);
}
- return Collections.emptyList();
+ return previousResult;
}
- protected List<ServiceMeshMetric.Builder> analyzeSideCar(final
TCPAccessLogEntry entry) {
+ protected Result analyzeSideCar(final Result previousResult, final
TCPAccessLogEntry entry) {
final AccessLogCommon properties = entry.getCommonProperties();
if (properties == null) {
- return Collections.emptyList();
+ return previousResult;
}
final String cluster = properties.getUpstreamCluster();
if (cluster == null) {
- return Collections.emptyList();
+ return previousResult;
}
final List<ServiceMeshMetric.Builder> sources = new ArrayList<>();
@@ -128,20 +127,20 @@ public class K8sALSServiceMeshTCPAnalysis extends
AbstractTCPAccessLogAnalyzer {
sources.add(metric);
}
- return sources;
+ return Result.builder().metrics(sources).service(localService).build();
}
- protected List<ServiceMeshMetric.Builder> analyzeProxy(final
TCPAccessLogEntry entry) {
+ protected Result analyzeProxy(final Result previousResult, final
TCPAccessLogEntry entry) {
final AccessLogCommon properties = entry.getCommonProperties();
if (properties == null) {
- return Collections.emptyList();
+ return previousResult;
}
final Address downstreamLocalAddress =
properties.getDownstreamLocalAddress();
final Address downstreamRemoteAddress =
properties.hasDownstreamDirectRemoteAddress() ?
properties.getDownstreamDirectRemoteAddress() :
properties.getDownstreamRemoteAddress();
final Address upstreamRemoteAddress =
properties.getUpstreamRemoteAddress();
if (downstreamLocalAddress == null || downstreamRemoteAddress == null
|| upstreamRemoteAddress == null) {
- return Collections.emptyList();
+ return previousResult;
}
final List<ServiceMeshMetric.Builder> result = new ArrayList<>(2);
@@ -168,7 +167,7 @@ public class K8sALSServiceMeshTCPAnalysis extends
AbstractTCPAccessLogAnalyzer {
log.debug("Transformed ingress outbound mesh metric {}",
outboundMetric);
result.add(outboundMetric);
- return result;
+ return Result.builder().metrics(result).service(ingress).build();
}
/**
diff --git
a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/mx/MetaExchangeTCPAccessLogAnalyzer.java
b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/mx/MetaExchangeTCPAccessLogAnalyzer.java
index 1f4cdcc..0ac1db6 100644
---
a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/mx/MetaExchangeTCPAccessLogAnalyzer.java
+++
b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/als/tcp/mx/MetaExchangeTCPAccessLogAnalyzer.java
@@ -23,8 +23,8 @@ import com.google.protobuf.TextFormat;
import io.envoyproxy.envoy.data.accesslog.v3.AccessLogCommon;
import io.envoyproxy.envoy.data.accesslog.v3.TCPAccessLogEntry;
import io.envoyproxy.envoy.service.accesslog.v3.StreamAccessLogsMessage;
+import java.util.ArrayList;
import java.util.Base64;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -67,31 +67,32 @@ public class MetaExchangeTCPAccessLogAnalyzer extends
AbstractTCPAccessLogAnalyz
}
@Override
- public List<ServiceMeshMetric.Builder> analysis(
- final List<ServiceMeshMetric.Builder> result,
+ public Result analysis(
+ final Result previousResult,
final StreamAccessLogsMessage.Identifier identifier,
final TCPAccessLogEntry entry,
final Role role
) {
- if (isNotEmpty(result)) {
- return result;
+ if (isNotEmpty(previousResult.getMetrics())) {
+ return previousResult;
}
if (!entry.hasCommonProperties()) {
- return Collections.emptyList();
- }
- final AccessLogCommon properties = entry.getCommonProperties();
- final Map<String, Any> stateMap =
properties.getFilterStateObjectsMap();
- if (stateMap.isEmpty()) {
- return Collections.emptyList();
+ return previousResult;
}
final ServiceMetaInfo currSvc;
try {
currSvc = adaptToServiceMetaInfo(identifier);
} catch (Exception e) {
log.error("Failed to inflate the ServiceMetaInfo from
identifier.node.metadata. ", e);
- return Collections.emptyList();
+ return previousResult;
+ }
+ final AccessLogCommon properties = entry.getCommonProperties();
+ final Map<String, Any> stateMap =
properties.getFilterStateObjectsMap();
+ if (stateMap.isEmpty()) {
+ return Result.builder().service(currSvc).build();
}
+ final List<ServiceMeshMetric.Builder> result = new ArrayList<>();
final AtomicBoolean downstreamExists = new AtomicBoolean();
stateMap.forEach((key, value) -> {
if (!key.equals(UPSTREAM_KEY) && !key.equals(DOWNSTREAM_KEY)) {
@@ -130,7 +131,7 @@ public class MetaExchangeTCPAccessLogAnalyzer extends
AbstractTCPAccessLogAnalyz
}
result.add(metric);
}
- return result;
+ return Result.builder().metrics(result).service(currSvc).build();
}
protected ServiceMetaInfo adaptToServiceMetaInfo(final Any value) throws
Exception {
diff --git
a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/persistence/LogsPersistence.java
b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/persistence/LogsPersistence.java
index 132911b..4f1e266 100644
---
a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/persistence/LogsPersistence.java
+++
b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/persistence/LogsPersistence.java
@@ -18,13 +18,9 @@
package org.apache.skywalking.oap.server.receiver.envoy.persistence;
-import com.google.protobuf.TextFormat;
import io.envoyproxy.envoy.data.accesslog.v3.HTTPAccessLogEntry;
import io.envoyproxy.envoy.service.accesslog.v3.StreamAccessLogsMessage;
-import java.io.IOException;
-import java.util.List;
import lombok.extern.slf4j.Slf4j;
-import org.apache.skywalking.apm.network.common.v3.DetectPoint;
import org.apache.skywalking.apm.network.logging.v3.JSONLog;
import org.apache.skywalking.apm.network.logging.v3.LogData;
import org.apache.skywalking.apm.network.logging.v3.LogDataBody;
@@ -35,7 +31,9 @@ import
org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import
org.apache.skywalking.oap.server.receiver.envoy.EnvoyMetricReceiverConfig;
import org.apache.skywalking.oap.server.receiver.envoy.als.ALSHTTPAnalysis;
+import
org.apache.skywalking.oap.server.receiver.envoy.als.LogEntry2MetricsAdapter;
import org.apache.skywalking.oap.server.receiver.envoy.als.Role;
+import org.apache.skywalking.oap.server.receiver.envoy.als.ServiceMetaInfo;
import static
org.apache.skywalking.oap.server.library.util.ProtoBufJsonUtils.toJSON;
@@ -59,27 +57,15 @@ public class LogsPersistence implements ALSHTTPAnalysis {
}
@Override
- public List<ServiceMeshMetric.Builder> analysis(
- final List<ServiceMeshMetric.Builder> result,
+ public Result analysis(
+ final Result result,
final StreamAccessLogsMessage.Identifier identifier,
final HTTPAccessLogEntry entry,
final Role role
) {
try {
- result.stream()
- .findFirst()
- .ifPresent(metrics -> {
- try {
- final LogData logData = convertToLogData(entry,
metrics);
- logAnalyzerService.doAnalysis(logData);
- } catch (IOException e) {
- log.error(
- "Failed to parse error log entry to log data:
{}",
- TextFormat.shortDebugString(entry),
- e
- );
- }
- });
+ final LogData logData = convertToLogData(entry, result);
+ logAnalyzerService.doAnalysis(logData);
} catch (final Exception e) {
log.error("Failed to persist Envoy access log", e);
}
@@ -92,16 +78,16 @@ public class LogsPersistence implements ALSHTTPAnalysis {
}
public LogData convertToLogData(final HTTPAccessLogEntry logEntry,
- final ServiceMeshMetric.Builder metrics)
throws IOException {
- final boolean isServerSide = metrics.getDetectPoint() ==
DetectPoint.server;
- final String svc = isServerSide ? metrics.getDestServiceName() :
metrics.getSourceServiceName();
- final String svcInst = isServerSide ? metrics.getDestServiceInstance()
: metrics.getSourceServiceInstance();
+ final Result result) throws Exception {
+
+ final ServiceMetaInfo service = result.getService();
+
+ final ServiceMeshMetric.Builder metrics = new
LogEntry2MetricsAdapter(logEntry, null, null).adaptCommonPart();
return LogData
.newBuilder()
- .setService(svc)
- .setServiceInstance(svcInst)
- .setEndpoint(metrics.getEndpoint())
+ .setService(service.getServiceName())
+ .setServiceInstance(service.getServiceInstanceName())
.setTimestamp(metrics.getEndTime())
.setBody(
LogDataBody
diff --git
a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/persistence/LogsPersistence.java
b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/persistence/TCPLogsPersistence.java
similarity index 64%
copy from
oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/persistence/LogsPersistence.java
copy to
oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/persistence/TCPLogsPersistence.java
index 132911b..8d3d0b6 100644
---
a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/persistence/LogsPersistence.java
+++
b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/persistence/TCPLogsPersistence.java
@@ -18,13 +18,9 @@
package org.apache.skywalking.oap.server.receiver.envoy.persistence;
-import com.google.protobuf.TextFormat;
-import io.envoyproxy.envoy.data.accesslog.v3.HTTPAccessLogEntry;
+import io.envoyproxy.envoy.data.accesslog.v3.TCPAccessLogEntry;
import io.envoyproxy.envoy.service.accesslog.v3.StreamAccessLogsMessage;
-import java.io.IOException;
-import java.util.List;
import lombok.extern.slf4j.Slf4j;
-import org.apache.skywalking.apm.network.common.v3.DetectPoint;
import org.apache.skywalking.apm.network.logging.v3.JSONLog;
import org.apache.skywalking.apm.network.logging.v3.LogData;
import org.apache.skywalking.apm.network.logging.v3.LogDataBody;
@@ -34,8 +30,10 @@ import
org.apache.skywalking.oap.log.analyzer.provider.log.ILogAnalyzerService;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import
org.apache.skywalking.oap.server.receiver.envoy.EnvoyMetricReceiverConfig;
-import org.apache.skywalking.oap.server.receiver.envoy.als.ALSHTTPAnalysis;
import org.apache.skywalking.oap.server.receiver.envoy.als.Role;
+import org.apache.skywalking.oap.server.receiver.envoy.als.ServiceMetaInfo;
+import
org.apache.skywalking.oap.server.receiver.envoy.als.tcp.TCPAccessLogAnalyzer;
+import
org.apache.skywalking.oap.server.receiver.envoy.als.tcp.TCPLogEntry2MetricsAdapter;
import static
org.apache.skywalking.oap.server.library.util.ProtoBufJsonUtils.toJSON;
@@ -43,7 +41,7 @@ import static
org.apache.skywalking.oap.server.library.util.ProtoBufJsonUtils.to
* {@code LogsPersistence} analyzes the error logs and persists them to the
log system.
*/
@Slf4j
-public class LogsPersistence implements ALSHTTPAnalysis {
+public class TCPLogsPersistence implements TCPAccessLogAnalyzer {
private ILogAnalyzerService logAnalyzerService;
@Override
@@ -59,27 +57,15 @@ public class LogsPersistence implements ALSHTTPAnalysis {
}
@Override
- public List<ServiceMeshMetric.Builder> analysis(
- final List<ServiceMeshMetric.Builder> result,
+ public Result analysis(
+ final Result result,
final StreamAccessLogsMessage.Identifier identifier,
- final HTTPAccessLogEntry entry,
+ final TCPAccessLogEntry entry,
final Role role
) {
try {
- result.stream()
- .findFirst()
- .ifPresent(metrics -> {
- try {
- final LogData logData = convertToLogData(entry,
metrics);
- logAnalyzerService.doAnalysis(logData);
- } catch (IOException e) {
- log.error(
- "Failed to parse error log entry to log data:
{}",
- TextFormat.shortDebugString(entry),
- e
- );
- }
- });
+ final LogData logData = convertToLogData(entry, result);
+ logAnalyzerService.doAnalysis(logData);
} catch (final Exception e) {
log.error("Failed to persist Envoy access log", e);
}
@@ -91,17 +77,17 @@ public class LogsPersistence implements ALSHTTPAnalysis {
return prev;
}
- public LogData convertToLogData(final HTTPAccessLogEntry logEntry,
- final ServiceMeshMetric.Builder metrics)
throws IOException {
- final boolean isServerSide = metrics.getDetectPoint() ==
DetectPoint.server;
- final String svc = isServerSide ? metrics.getDestServiceName() :
metrics.getSourceServiceName();
- final String svcInst = isServerSide ? metrics.getDestServiceInstance()
: metrics.getSourceServiceInstance();
+ public LogData convertToLogData(final TCPAccessLogEntry logEntry,
+ final Result result) throws Exception {
+
+ final ServiceMetaInfo service = result.getService();
+
+ final ServiceMeshMetric.Builder metrics = new
TCPLogEntry2MetricsAdapter(logEntry, null, null).adaptCommonPart();
return LogData
.newBuilder()
- .setService(svc)
- .setServiceInstance(svcInst)
- .setEndpoint(metrics.getEndpoint())
+ .setService(service.getServiceName())
+ .setServiceInstance(service.getServiceInstanceName())
.setTimestamp(metrics.getEndTime())
.setBody(
LogDataBody
diff --git
a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.receiver.envoy.als.tcp.TCPAccessLogAnalyzer
b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.receiver.envoy.als.tcp.TCPAccessLogAnalyzer
index 5290deb..e207746 100644
---
a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.receiver.envoy.als.tcp.TCPAccessLogAnalyzer
+++
b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/resources/META-INF/services/org.apache.skywalking.oap.server.receiver.envoy.als.tcp.TCPAccessLogAnalyzer
@@ -18,3 +18,4 @@
org.apache.skywalking.oap.server.receiver.envoy.als.tcp.k8s.K8sALSServiceMeshTCPAnalysis
org.apache.skywalking.oap.server.receiver.envoy.als.tcp.mx.MetaExchangeTCPAccessLogAnalyzer
+org.apache.skywalking.oap.server.receiver.envoy.persistence.TCPLogsPersistence
diff --git
a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8SALSServiceMeshHTTPAnalysisTest.java
b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8SALSServiceMeshHTTPAnalysisTest.java
index 2ebf581..abe7073 100644
---
a/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8SALSServiceMeshHTTPAnalysisTest.java
+++
b/oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/test/java/org/apache/skywalking/oap/server/receiver/envoy/als/k8s/K8SALSServiceMeshHTTPAnalysisTest.java
@@ -23,13 +23,12 @@ import
io.envoyproxy.envoy.service.accesslog.v3.StreamAccessLogsMessage;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
-import java.util.ArrayList;
-import java.util.List;
import org.apache.skywalking.apm.network.common.v3.DetectPoint;
import org.apache.skywalking.apm.network.servicemesh.v3.ServiceMeshMetric;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import
org.apache.skywalking.oap.server.receiver.envoy.EnvoyMetricReceiverConfig;
import
org.apache.skywalking.oap.server.receiver.envoy.MetricServiceGRPCHandlerTestMain;
+import org.apache.skywalking.oap.server.receiver.envoy.als.AccessLogAnalyzer;
import org.apache.skywalking.oap.server.receiver.envoy.als.Role;
import org.apache.skywalking.oap.server.receiver.envoy.als.ServiceMetaInfo;
import org.junit.Assert;
@@ -78,16 +77,16 @@ public class K8SALSServiceMeshHTTPAnalysisTest {
StreamAccessLogsMessage.Builder requestBuilder =
StreamAccessLogsMessage.newBuilder();
JsonFormat.parser().merge(isr, requestBuilder);
- List<ServiceMeshMetric.Builder> result =
this.analysis.analysis(new ArrayList<>(), requestBuilder.getIdentifier(),
requestBuilder.getHttpLogs().getLogEntry(0), Role.PROXY);
+ AccessLogAnalyzer.Result result =
this.analysis.analysis(AccessLogAnalyzer.Result.builder().build(),
requestBuilder.getIdentifier(), requestBuilder.getHttpLogs().getLogEntry(0),
Role.PROXY);
- Assert.assertEquals(2, result.size());
+ Assert.assertEquals(2, result.getMetrics().size());
- ServiceMeshMetric.Builder incoming = result.get(0);
+ ServiceMeshMetric.Builder incoming = result.getMetrics().get(0);
Assert.assertEquals("UNKNOWN", incoming.getSourceServiceName());
Assert.assertEquals("ingress", incoming.getDestServiceName());
Assert.assertEquals(DetectPoint.server, incoming.getDetectPoint());
- ServiceMeshMetric.Builder outgoing = result.get(1);
+ ServiceMeshMetric.Builder outgoing = result.getMetrics().get(1);
Assert.assertEquals("ingress", outgoing.getSourceServiceName());
Assert.assertEquals("productpage", outgoing.getDestServiceName());
Assert.assertEquals(DetectPoint.client, outgoing.getDetectPoint());
@@ -100,11 +99,11 @@ public class K8SALSServiceMeshHTTPAnalysisTest {
StreamAccessLogsMessage.Builder requestBuilder =
StreamAccessLogsMessage.newBuilder();
JsonFormat.parser().merge(isr, requestBuilder);
- List<ServiceMeshMetric.Builder> result =
this.analysis.analysis(new ArrayList<>(), requestBuilder.getIdentifier(),
requestBuilder.getHttpLogs().getLogEntry(0), Role.SIDECAR);
+ AccessLogAnalyzer.Result result =
this.analysis.analysis(AccessLogAnalyzer.Result.builder().build(),
requestBuilder.getIdentifier(), requestBuilder.getHttpLogs().getLogEntry(0),
Role.SIDECAR);
- Assert.assertEquals(1, result.size());
+ Assert.assertEquals(1, result.getMetrics().size());
- ServiceMeshMetric.Builder incoming = result.get(0);
+ ServiceMeshMetric.Builder incoming = result.getMetrics().get(0);
Assert.assertEquals("", incoming.getSourceServiceName());
Assert.assertEquals("productpage", incoming.getDestServiceName());
Assert.assertEquals(DetectPoint.server, incoming.getDetectPoint());
@@ -117,11 +116,11 @@ public class K8SALSServiceMeshHTTPAnalysisTest {
StreamAccessLogsMessage.Builder requestBuilder =
StreamAccessLogsMessage.newBuilder();
JsonFormat.parser().merge(isr, requestBuilder);
- List<ServiceMeshMetric.Builder> result =
this.analysis.analysis(new ArrayList<>(), requestBuilder.getIdentifier(),
requestBuilder.getHttpLogs().getLogEntry(0), Role.SIDECAR);
+ AccessLogAnalyzer.Result result =
this.analysis.analysis(AccessLogAnalyzer.Result.builder().build(),
requestBuilder.getIdentifier(), requestBuilder.getHttpLogs().getLogEntry(0),
Role.SIDECAR);
- Assert.assertEquals(1, result.size());
+ Assert.assertEquals(1, result.getMetrics().size());
- ServiceMeshMetric.Builder incoming = result.get(0);
+ ServiceMeshMetric.Builder incoming = result.getMetrics().get(0);
Assert.assertEquals("productpage",
incoming.getSourceServiceName());
Assert.assertEquals("review", incoming.getDestServiceName());
Assert.assertEquals(DetectPoint.server, incoming.getDetectPoint());
@@ -134,11 +133,11 @@ public class K8SALSServiceMeshHTTPAnalysisTest {
StreamAccessLogsMessage.Builder requestBuilder =
StreamAccessLogsMessage.newBuilder();
JsonFormat.parser().merge(isr, requestBuilder);
- List<ServiceMeshMetric.Builder> result =
this.analysis.analysis(new ArrayList<>(), requestBuilder.getIdentifier(),
requestBuilder.getHttpLogs().getLogEntry(0), Role.SIDECAR);
+ AccessLogAnalyzer.Result result =
this.analysis.analysis(AccessLogAnalyzer.Result.builder().build(),
requestBuilder.getIdentifier(), requestBuilder.getHttpLogs().getLogEntry(0),
Role.SIDECAR);
- Assert.assertEquals(1, result.size());
+ Assert.assertEquals(1, result.getMetrics().size());
- ServiceMeshMetric.Builder incoming = result.get(0);
+ ServiceMeshMetric.Builder incoming = result.getMetrics().get(0);
Assert.assertEquals("productpage",
incoming.getSourceServiceName());
Assert.assertEquals("detail", incoming.getDestServiceName());
Assert.assertEquals(DetectPoint.client, incoming.getDetectPoint());