This is an automated email from the ASF dual-hosted git repository. wusheng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/master by this push: new d7d5812d8a Fix the query time range in the metadata API. (#13332) d7d5812d8a is described below commit d7d5812d8a75f49b4529be377e52add59f52e0d7 Author: Wan Kai <wankai...@foxmail.com> AuthorDate: Thu Jun 19 12:45:02 2025 +0800 Fix the query time range in the metadata API. (#13332) --- docs/en/changes/changes.md | 1 + .../banyandb/measure/BanyanDBMetadataQueryDAO.java | 40 +++++++++++++++++----- .../elasticsearch/query/MetadataQueryEsDAO.java | 13 +++---- .../jdbc/common/dao/JDBCMetadataQueryDAO.java | 11 ++++-- 4 files changed, 49 insertions(+), 16 deletions(-) diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md index b568abb716..6c69b4db64 100644 --- a/docs/en/changes/changes.md +++ b/docs/en/changes/changes.md @@ -35,6 +35,7 @@ * refactor: implement OTEL handler with SPI for extensibility. * chore: add `toString` implementation for `StorageID`. * chore: add a warning log when connecting to ES takes too long. +* Fix the query time range in the metadata API. #### UI diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetadataQueryDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetadataQueryDAO.java index 78938c649e..a2bb2ff05d 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetadataQueryDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetadataQueryDAO.java @@ -114,6 +114,8 @@ public class BanyanDBMetadataQueryDAO extends AbstractBanyanDBDAO implements IMe public List<ServiceInstance> listInstances(Duration duration, String serviceId) throws IOException { TimestampRange timestampRange = null; if (duration != null) { + // The data time should <= endTimeBucket. + // It's equals to the condition `query.and(lte(InstanceTraffic.TIME_BUCKET, endTimeBucket))` timestampRange = new TimestampRange(0, duration.getEndTimestamp()); } MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(InstanceTraffic.INDEX_NAME, DownSampling.Minute); @@ -127,8 +129,8 @@ public class BanyanDBMetadataQueryDAO extends AbstractBanyanDBDAO implements IMe if (StringUtil.isNotEmpty(serviceId)) { query.and(eq(InstanceTraffic.SERVICE_ID, serviceId)); } - final var minuteTimeBucket = TimeBucket.getMinuteTimeBucket(duration.getStartTimestamp()); - query.and(gte(InstanceTraffic.LAST_PING_TIME_BUCKET, minuteTimeBucket)); + final var startTimeBucket = TimeBucket.getMinuteTimeBucket(duration.getStartTimestamp()); + query.and(gte(InstanceTraffic.LAST_PING_TIME_BUCKET, startTimeBucket)); query.limit(limit); } }); @@ -184,9 +186,16 @@ public class BanyanDBMetadataQueryDAO extends AbstractBanyanDBDAO implements IMe @Override public List<Endpoint> findEndpoint(String keyword, String serviceId, int limit, Duration duration) throws IOException { MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(EndpointTraffic.INDEX_NAME, DownSampling.Minute); + TimestampRange timestampRange = null; + if (duration != null) { + // The data time should <= endTimeBucket. + // It's equals to the condition `query.and(lte(EndpointTraffic.TIME_BUCKET, endTimeBucket))` + timestampRange = new TimestampRange(0, duration.getEndTimestamp()); + } MeasureQueryResponse resp = query(false, schema, ENDPOINT_TRAFFIC_TAGS, Collections.emptySet(), + timestampRange, new QueryBuilder<MeasureQuery>() { @Override protected void apply(MeasureQuery query) { @@ -201,9 +210,7 @@ public class BanyanDBMetadataQueryDAO extends AbstractBanyanDBDAO implements IMe } if (duration != null) { final var startTimeBucket = TimeBucket.getMinuteTimeBucket(duration.getStartTimestamp()); - final var endTimeBucket = TimeBucket.getMinuteTimeBucket(duration.getEndTimestamp()); query.and(gte(EndpointTraffic.LAST_PING_TIME_BUCKET, startTimeBucket)); - query.and(lte(EndpointTraffic.LAST_PING_TIME_BUCKET, endTimeBucket)); } query.setOrderBy(new AbstractQuery.OrderBy(AbstractQuery.Sort.DESC)); query.limit(limit); @@ -220,9 +227,16 @@ public class BanyanDBMetadataQueryDAO extends AbstractBanyanDBDAO implements IMe @Override public List<Process> listProcesses(String serviceId, ProfilingSupportStatus supportStatus, long lastPingStartTimeBucket, long lastPingEndTimeBucket) throws IOException { MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(ProcessTraffic.INDEX_NAME, DownSampling.Minute); + TimestampRange timestampRange = null; + if (lastPingEndTimeBucket > 0) { + // The data time should <= endTimeBucket. + // It's equals to the condition `query.and(lte(ProcessTraffic.TIME_BUCKET, endTimeBucket))` + timestampRange = new TimestampRange(0, TimeBucket.getTimestamp(lastPingEndTimeBucket)); + } MeasureQueryResponse resp = query(false, schema, PROCESS_TRAFFIC_TAGS, Collections.emptySet(), + timestampRange, new QueryBuilder<MeasureQuery>() { @Override protected void apply(MeasureQuery query) { @@ -230,9 +244,6 @@ public class BanyanDBMetadataQueryDAO extends AbstractBanyanDBDAO implements IMe if (lastPingStartTimeBucket > 0) { query.and(gte(ProcessTraffic.LAST_PING_TIME_BUCKET, lastPingStartTimeBucket)); } - if (lastPingEndTimeBucket > 0) { - query.and(lte(ProcessTraffic.LAST_PING_TIME_BUCKET, lastPingEndTimeBucket)); - } if (supportStatus != null) { query.and(eq(ProcessTraffic.PROFILING_SUPPORT_STATUS, supportStatus.value())); } @@ -252,10 +263,17 @@ public class BanyanDBMetadataQueryDAO extends AbstractBanyanDBDAO implements IMe @Override public List<Process> listProcesses(String serviceInstanceId, Duration duration, boolean includeVirtual) throws IOException { MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(ProcessTraffic.INDEX_NAME, DownSampling.Minute); + TimestampRange timestampRange = null; + if (duration != null) { + // The data time should <= endTimeBucket. + // It's equals to the condition `query.and(lte(ProcessTraffic.TIME_BUCKET, endTimeBucket))` + timestampRange = new TimestampRange(0, duration.getEndTimestamp()); + } long lastPingStartTimeBucket = duration.getStartTimeBucket(); MeasureQueryResponse resp = query(false, schema, PROCESS_TRAFFIC_TAGS, Collections.emptySet(), + timestampRange, new QueryBuilder<MeasureQuery>() { @Override protected void apply(MeasureQuery query) { @@ -279,15 +297,21 @@ public class BanyanDBMetadataQueryDAO extends AbstractBanyanDBDAO implements IMe @Override public List<Process> listProcesses(String agentId, long startPingTimeBucket, long endPingTimeBucket) throws IOException { MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(ProcessTraffic.INDEX_NAME, DownSampling.Minute); + TimestampRange timestampRange = null; + if (endPingTimeBucket > 0) { + // The data time should <= endTimeBucket. + // It's equals to the condition `query.and(lte(ProcessTraffic.TIME_BUCKET, endTimeBucket))` + timestampRange = new TimestampRange(0, TimeBucket.getTimestamp(endPingTimeBucket)); + } MeasureQueryResponse resp = query(false, schema, PROCESS_TRAFFIC_TAGS, Collections.emptySet(), + timestampRange, new QueryBuilder<MeasureQuery>() { @Override protected void apply(MeasureQuery query) { query.and(eq(ProcessTraffic.AGENT_ID, agentId)); query.and(gte(ProcessTraffic.LAST_PING_TIME_BUCKET, startPingTimeBucket)); - query.and(lte(ProcessTraffic.LAST_PING_TIME_BUCKET, endPingTimeBucket)); query.and(ne(ProcessTraffic.DETECT_TYPE, ProcessDetectType.VIRTUAL.value())); query.limit(limit); } diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetadataQueryEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetadataQueryEsDAO.java index a6a25f87c0..0220b9f9c2 100644 --- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetadataQueryEsDAO.java +++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetadataQueryEsDAO.java @@ -31,7 +31,6 @@ import javax.annotation.Nullable; import org.apache.commons.lang3.StringUtils; import org.apache.skywalking.library.elasticsearch.requests.search.BoolQueryBuilder; import org.apache.skywalking.library.elasticsearch.requests.search.Query; -import org.apache.skywalking.library.elasticsearch.requests.search.RangeQueryBuilder; import org.apache.skywalking.library.elasticsearch.requests.search.Search; import org.apache.skywalking.library.elasticsearch.requests.search.SearchBuilder; import org.apache.skywalking.library.elasticsearch.requests.search.Sort; @@ -161,7 +160,7 @@ public class MetadataQueryEsDAO extends EsDAO implements IMetadataQueryDAO { final long startMinuteTimeBucket = TimeBucket.getMinuteTimeBucket(duration.getStartTimestamp()); final long endMinuteTimeBucket = TimeBucket.getMinuteTimeBucket(duration.getEndTimestamp()); query.must(Query.range(InstanceTraffic.LAST_PING_TIME_BUCKET).gte(startMinuteTimeBucket)) - .must(Query.range(InstanceTraffic.TIME_BUCKET).lt(endMinuteTimeBucket)); + .must(Query.range(InstanceTraffic.TIME_BUCKET).lte(endMinuteTimeBucket)); } if (IndexController.LogicIndicesRegister.isMergedTable(InstanceTraffic.INDEX_NAME)) { query.must(Query.term(IndexController.LogicIndicesRegister.METRIC_TABLE_NAME, InstanceTraffic.INDEX_NAME)); @@ -237,7 +236,8 @@ public class MetadataQueryEsDAO extends EsDAO implements IMetadataQueryDAO { if (duration != null) { final long startMinuteTimeBucket = TimeBucket.getMinuteTimeBucket(duration.getStartTimestamp()); final long endMinuteTimeBucket = TimeBucket.getMinuteTimeBucket(duration.getEndTimestamp()); - query.must(Query.range(EndpointTraffic.LAST_PING_TIME_BUCKET).gte(startMinuteTimeBucket).lte(endMinuteTimeBucket)); + query.must(Query.range(EndpointTraffic.LAST_PING_TIME_BUCKET).gte(startMinuteTimeBucket)) + .must(Query.range(EndpointTraffic.TIME_BUCKET).lte(endMinuteTimeBucket)); } final var search = Search.builder().query(query).size(limit).sort( @@ -389,9 +389,10 @@ public class MetadataQueryEsDAO extends EsDAO implements IMetadataQueryDAO { query.must(Query.term(ProcessTraffic.PROFILING_SUPPORT_STATUS, profilingSupportStatus.value())); } if (lastPingStartTimeBucket > 0) { - final RangeQueryBuilder rangeQuery = Query.range(ProcessTraffic.LAST_PING_TIME_BUCKET); - rangeQuery.gte(lastPingStartTimeBucket); - query.must(rangeQuery); + query.must(Query.range(ProcessTraffic.LAST_PING_TIME_BUCKET).gte(lastPingStartTimeBucket)); + } + if (lastPingEndTimeBucket > 0) { + query.must(Query.range(ProcessTraffic.TIME_BUCKET).lte(lastPingEndTimeBucket)); } if (!includeVirtual) { query.mustNot(Query.term(ProcessTraffic.DETECT_TYPE, ProcessDetectType.VIRTUAL.value())); diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCMetadataQueryDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCMetadataQueryDAO.java index ec0f7f87de..8dbb702a32 100644 --- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCMetadataQueryDAO.java +++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCMetadataQueryDAO.java @@ -139,7 +139,7 @@ public class JDBCMetadataQueryDAO implements IMetadataQueryDAO { final var endMinuteTimeBucket = TimeBucket.getMinuteTimeBucket(duration.getEndTimestamp()); sql.append(" and ").append(InstanceTraffic.LAST_PING_TIME_BUCKET).append(" >= ?"); parameters.add(startMinuteTimeBucket); - sql.append(" and ").append(InstanceTraffic.TIME_BUCKET).append(" < ?"); + sql.append(" and ").append(InstanceTraffic.TIME_BUCKET).append(" <= ?"); parameters.add(endMinuteTimeBucket); } sql.append(" and ").append(InstanceTraffic.SERVICE_ID).append("=?"); @@ -228,7 +228,7 @@ public class JDBCMetadataQueryDAO implements IMetadataQueryDAO { final var endMinuteTimeBucket = TimeBucket.getMinuteTimeBucket(duration.getEndTimestamp()); sql.append(" and ").append(EndpointTraffic.LAST_PING_TIME_BUCKET).append(" >= ?"); condition.add(startMinuteTimeBucket); - sql.append(" and ").append(EndpointTraffic.LAST_PING_TIME_BUCKET).append(" <= ?"); + sql.append(" and ").append(EndpointTraffic.TIME_BUCKET).append(" <= ?"); condition.add(endMinuteTimeBucket); } sql.append(" order by ").append(EndpointTraffic.TIME_BUCKET).append(" desc"); @@ -493,6 +493,13 @@ public class JDBCMetadataQueryDAO implements IMetadataQueryDAO { sql.append(ProcessTraffic.LAST_PING_TIME_BUCKET).append(">=?"); condition.add(lastPingStartTimeBucket); } + if (lastPingEndTimeBucket > 0) { + if (!condition.isEmpty()) { + sql.append(" and "); + } + sql.append(ProcessTraffic.TIME_BUCKET).append("<=?"); + condition.add(lastPingEndTimeBucket); + } if (!includeVirtual) { if (!condition.isEmpty()) { sql.append(" and ");