This is an automated email from the ASF dual-hosted git repository.
lujiajing pushed a commit to branch banyandb-integration-stream
in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/banyandb-integration-stream by
this push:
new ba7e9592b8 adapt client and support log query
ba7e9592b8 is described below
commit ba7e9592b8ed3e1937f53eace0cb073c1cf1b067
Author: Megrez Lu <[email protected]>
AuthorDate: Tue May 3 23:18:49 2022 +0800
adapt client and support log query
---
.../storage/plugin/banyandb/BanyanDBConverter.java | 2 +-
.../storage/plugin/banyandb/MetadataRegistry.java | 7 +--
.../banyandb/measure/BanyanDBMetadataQueryDAO.java | 66 +++++++++-------------
.../banyandb/stream/BanyanDBAlarmQueryDAO.java | 4 +-
.../stream/BanyanDBBrowserLogQueryDAO.java | 8 +--
.../banyandb/stream/BanyanDBLogQueryDAO.java | 26 +++++----
.../BanyanDBProfileThreadSnapshotQueryDAO.java | 21 ++++---
.../banyandb/stream/BanyanDBTraceQueryDAO.java | 18 +++---
8 files changed, 72 insertions(+), 80 deletions(-)
diff --git
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBConverter.java
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBConverter.java
index 5c9071f8b0..2b2a197cf5 100644
---
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBConverter.java
+++
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBConverter.java
@@ -145,7 +145,7 @@ public class BanyanDBConverter {
public void acceptID(String id) {
try {
- this.measureWrite.tag(MetadataRegistry.ID,
TagAndValue.idTagValue(id));
+ this.measureWrite.setID(id);
} catch (BanyanDBException ex) {
log.error("fail to add ID tag", ex);
}
diff --git
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java
index d3284bdf1b..6ee58c5283 100644
---
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java
+++
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java
@@ -67,7 +67,6 @@ import java.util.stream.Collectors;
public enum MetadataRegistry {
INSTANCE;
- public static final String ID = "id";
private final Map<String, Schema> registry = new ConcurrentHashMap<>();
public NamedSchema<?> registerModel(Model model, ConfigService
configService) {
@@ -102,7 +101,7 @@ public enum MetadataRegistry {
final Measure.Builder builder =
Measure.create(partialMetadata.getGroup(), partialMetadata.getName(),
downSamplingDuration(model.getDownsampling()));
if (entities.isEmpty()) { // if shardingKeys is empty, for
measure, we can use ID as a single sharding key.
- builder.setEntityRelativeTags(ID);
+ builder.setEntityRelativeTags(Measure.ID);
} else {
builder.setEntityRelativeTags(entities);
}
@@ -112,8 +111,6 @@ public enum MetadataRegistry {
Optional<ValueColumnMetadata.ValueColumn> valueColumnOpt =
ValueColumnMetadata.INSTANCE
.readValueColumnDefinition(model.getName());
valueColumnOpt.ifPresent(valueColumn ->
builder.addField(parseFieldSpec(modelColumnMap.get(valueColumn.getValueCName()),
valueColumn)));
- // register ID
- schemaBuilder.spec(ID, new ColumnSpec(ColumnType.TAG,
String.class));
registry.put(model.getName(), schemaBuilder.build());
return builder.build();
@@ -336,7 +333,7 @@ public enum MetadataRegistry {
if (this.getKind() == Kind.MEASURE &&
entry.getKey().equals(this.indexFamily())) {
// append measure ID, but it should not generate an index
in the client side.
// BanyanDB will take care of the ID index registration.
- b.addTagSpec(TagFamilySpec.TagSpec.newIDTag(ID));
+ b.addIDTagSpec();
}
tagFamilySpecs.add(b.build());
}
diff --git
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetadataQueryDAO.java
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetadataQueryDAO.java
index e6b42d4f2d..2756d56e59 100644
---
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetadataQueryDAO.java
+++
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetadataQueryDAO.java
@@ -26,7 +26,6 @@ import com.google.gson.JsonObject;
import org.apache.skywalking.banyandb.v1.client.DataPoint;
import org.apache.skywalking.banyandb.v1.client.MeasureQuery;
import org.apache.skywalking.banyandb.v1.client.MeasureQueryResponse;
-import org.apache.skywalking.banyandb.v1.client.PairQueryCondition;
import org.apache.skywalking.oap.server.core.analysis.IDManager;
import org.apache.skywalking.oap.server.core.analysis.Layer;
import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
@@ -45,7 +44,6 @@ import
org.apache.skywalking.oap.server.core.query.type.ServiceInstance;
import org.apache.skywalking.oap.server.core.storage.query.IMetadataQueryDAO;
import org.apache.skywalking.oap.server.library.util.StringUtil;
import
org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
-import
org.apache.skywalking.oap.server.storage.plugin.banyandb.MetadataRegistry;
import
org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.AbstractBanyanDBDAO;
import java.io.IOException;
@@ -72,16 +70,15 @@ public class BanyanDBMetadataQueryDAO extends
AbstractBanyanDBDAO implements IMe
ServiceTraffic.SHORT_NAME,
ServiceTraffic.GROUP,
ServiceTraffic.LAYER,
- ServiceTraffic.SERVICE_ID,
- MetadataRegistry.ID),
+ ServiceTraffic.SERVICE_ID),
Collections.emptySet(), new QueryBuilder<MeasureQuery>() {
@Override
protected void apply(MeasureQuery query) {
if (StringUtil.isNotEmpty(group)) {
- query.appendCondition(eq(ServiceTraffic.GROUP,
group));
+ query.and(eq(ServiceTraffic.GROUP, group));
}
if (StringUtil.isNotEmpty(layer)) {
- query.appendCondition(eq(ServiceTraffic.LAYER,
Layer.valueOf(layer).value()));
+ query.and(eq(ServiceTraffic.LAYER,
Layer.valueOf(layer).value()));
}
}
});
@@ -102,13 +99,12 @@ public class BanyanDBMetadataQueryDAO extends
AbstractBanyanDBDAO implements IMe
ServiceTraffic.SHORT_NAME,
ServiceTraffic.GROUP,
ServiceTraffic.LAYER,
- ServiceTraffic.SERVICE_ID,
- MetadataRegistry.ID),
+ ServiceTraffic.SERVICE_ID),
Collections.emptySet(), new QueryBuilder<MeasureQuery>() {
@Override
protected void apply(MeasureQuery query) {
if (StringUtil.isNotEmpty(serviceId)) {
-
query.appendCondition(eq(ServiceTraffic.SERVICE_ID, serviceId));
+ query.and(eq(ServiceTraffic.SERVICE_ID,
serviceId));
}
}
});
@@ -131,16 +127,15 @@ public class BanyanDBMetadataQueryDAO extends
AbstractBanyanDBDAO implements IMe
InstanceTraffic.LAYER,
InstanceTraffic.PROPERTIES,
InstanceTraffic.LAST_PING_TIME_BUCKET,
- InstanceTraffic.SERVICE_ID,
- MetadataRegistry.ID),
+ InstanceTraffic.SERVICE_ID),
Collections.emptySet(),
new QueryBuilder<MeasureQuery>() {
@Override
protected void apply(MeasureQuery query) {
if (StringUtil.isNotEmpty(serviceId)) {
-
query.appendCondition(eq(InstanceTraffic.SERVICE_ID, serviceId));
+ query.and(eq(InstanceTraffic.SERVICE_ID,
serviceId));
}
-
query.appendCondition(gte(InstanceTraffic.LAST_PING_TIME_BUCKET,
minuteTimeBucket));
+ query.and(gte(InstanceTraffic.LAST_PING_TIME_BUCKET,
minuteTimeBucket));
}
});
@@ -158,14 +153,13 @@ public class BanyanDBMetadataQueryDAO extends
AbstractBanyanDBDAO implements IMe
MeasureQueryResponse resp = query(InstanceTraffic.INDEX_NAME,
ImmutableSet.of(InstanceTraffic.NAME,
InstanceTraffic.LAYER,
- InstanceTraffic.PROPERTIES,
- MetadataRegistry.ID),
+ InstanceTraffic.PROPERTIES),
Collections.emptySet(),
new QueryBuilder<MeasureQuery>() {
@Override
protected void apply(MeasureQuery query) {
if (StringUtil.isNotEmpty(instanceId)) {
-
query.appendCondition(PairQueryCondition.IDQueryCondition.eq(MetadataRegistry.ID,
instanceId));
+ query.andWithID(instanceId);
}
}
});
@@ -177,14 +171,13 @@ public class BanyanDBMetadataQueryDAO extends
AbstractBanyanDBDAO implements IMe
public List<Endpoint> findEndpoint(String keyword, String serviceId, int
limit) throws IOException {
MeasureQueryResponse resp = query(EndpointTraffic.INDEX_NAME,
ImmutableSet.of(EndpointTraffic.NAME,
- EndpointTraffic.SERVICE_ID,
- MetadataRegistry.ID),
+ EndpointTraffic.SERVICE_ID),
Collections.emptySet(),
new QueryBuilder<MeasureQuery>() {
@Override
protected void apply(MeasureQuery query) {
if (StringUtil.isNotEmpty(serviceId)) {
-
query.appendCondition(eq(EndpointTraffic.SERVICE_ID, serviceId));
+ query.and(eq(EndpointTraffic.SERVICE_ID,
serviceId));
}
}
});
@@ -213,29 +206,28 @@ public class BanyanDBMetadataQueryDAO extends
AbstractBanyanDBDAO implements IMe
ProcessTraffic.PROPERTIES,
ProcessTraffic.LABELS_JSON,
ProcessTraffic.LAST_PING_TIME_BUCKET,
- ProcessTraffic.PROFILING_SUPPORT_STATUS,
- MetadataRegistry.ID),
+ ProcessTraffic.PROFILING_SUPPORT_STATUS),
Collections.emptySet(),
new QueryBuilder<MeasureQuery>() {
@Override
protected void apply(MeasureQuery query) {
if (StringUtil.isNotEmpty(serviceId)) {
-
query.appendCondition(eq(ProcessTraffic.SERVICE_ID, serviceId));
+ query.and(eq(ProcessTraffic.SERVICE_ID,
serviceId));
}
if (StringUtil.isNotEmpty(instanceId)) {
-
query.appendCondition(eq(ProcessTraffic.INSTANCE_ID, instanceId));
+ query.and(eq(ProcessTraffic.INSTANCE_ID,
instanceId));
}
if (StringUtil.isNotEmpty(agentId)) {
- query.appendCondition(eq(ProcessTraffic.AGENT_ID,
instanceId));
+ query.and(eq(ProcessTraffic.AGENT_ID, instanceId));
}
if (lastPingStartTimeBucket > 0) {
-
query.appendCondition(gte(ProcessTraffic.LAST_PING_TIME_BUCKET,
lastPingStartTimeBucket));
+
query.and(gte(ProcessTraffic.LAST_PING_TIME_BUCKET, lastPingStartTimeBucket));
}
if (lastPingEndTimeBucket > 0) {
-
query.appendCondition(lte(ProcessTraffic.LAST_PING_TIME_BUCKET,
lastPingEndTimeBucket));
+
query.and(lte(ProcessTraffic.LAST_PING_TIME_BUCKET, lastPingEndTimeBucket));
}
if (profilingSupportStatus != null) {
-
query.appendCondition(eq(ProcessTraffic.PROFILING_SUPPORT_STATUS,
profilingSupportStatus.value()));
+
query.and(eq(ProcessTraffic.PROFILING_SUPPORT_STATUS,
profilingSupportStatus.value()));
}
}
});
@@ -261,29 +253,28 @@ public class BanyanDBMetadataQueryDAO extends
AbstractBanyanDBDAO implements IMe
ProcessTraffic.PROPERTIES,
ProcessTraffic.LABELS_JSON,
ProcessTraffic.LAST_PING_TIME_BUCKET,
- ProcessTraffic.PROFILING_SUPPORT_STATUS,
- MetadataRegistry.ID),
+ ProcessTraffic.PROFILING_SUPPORT_STATUS),
Collections.emptySet(),
new QueryBuilder<MeasureQuery>() {
@Override
protected void apply(MeasureQuery query) {
if (StringUtil.isNotEmpty(serviceId)) {
-
query.appendCondition(eq(ProcessTraffic.SERVICE_ID, serviceId));
+ query.and(eq(ProcessTraffic.SERVICE_ID,
serviceId));
}
if (StringUtil.isNotEmpty(instanceId)) {
-
query.appendCondition(eq(ProcessTraffic.INSTANCE_ID, instanceId));
+ query.and(eq(ProcessTraffic.INSTANCE_ID,
instanceId));
}
if (StringUtil.isNotEmpty(agentId)) {
- query.appendCondition(eq(ProcessTraffic.AGENT_ID,
instanceId));
+ query.and(eq(ProcessTraffic.AGENT_ID, instanceId));
}
if (lastPingStartTimeBucket > 0) {
-
query.appendCondition(gte(ProcessTraffic.LAST_PING_TIME_BUCKET,
lastPingStartTimeBucket));
+
query.and(gte(ProcessTraffic.LAST_PING_TIME_BUCKET, lastPingStartTimeBucket));
}
if (lastPingEndTimeBucket > 0) {
-
query.appendCondition(lte(ProcessTraffic.LAST_PING_TIME_BUCKET,
lastPingEndTimeBucket));
+
query.and(lte(ProcessTraffic.LAST_PING_TIME_BUCKET, lastPingEndTimeBucket));
}
if (profilingSupportStatus != null) {
-
query.appendCondition(eq(ProcessTraffic.PROFILING_SUPPORT_STATUS,
profilingSupportStatus.value()));
+
query.and(eq(ProcessTraffic.PROFILING_SUPPORT_STATUS,
profilingSupportStatus.value()));
}
}
});
@@ -304,14 +295,13 @@ public class BanyanDBMetadataQueryDAO extends
AbstractBanyanDBDAO implements IMe
ProcessTraffic.LAYER,
ProcessTraffic.DETECT_TYPE,
ProcessTraffic.PROPERTIES,
- ProcessTraffic.LABELS_JSON,
- MetadataRegistry.ID),
+ ProcessTraffic.LABELS_JSON),
Collections.emptySet(),
new QueryBuilder<MeasureQuery>() {
@Override
protected void apply(MeasureQuery query) {
if (StringUtil.isNotEmpty(processId)) {
-
query.appendCondition(PairQueryCondition.IDQueryCondition.eq(MetadataRegistry.ID,
processId));
+ query.andWithID(processId);
}
}
});
diff --git
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBAlarmQueryDAO.java
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBAlarmQueryDAO.java
index ef1b7768a6..42a219b96f 100644
---
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBAlarmQueryDAO.java
+++
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBAlarmQueryDAO.java
@@ -62,7 +62,7 @@ public class BanyanDBAlarmQueryDAO extends
AbstractBanyanDBDAO implements IAlarm
@Override
public void apply(StreamQuery query) {
if (Objects.nonNull(scopeId)) {
- query.appendCondition(eq(AlarmRecord.SCOPE, (long)
scopeId));
+ query.and(eq(AlarmRecord.SCOPE, (long) scopeId));
}
// TODO: support keyword search
@@ -70,7 +70,7 @@ public class BanyanDBAlarmQueryDAO extends
AbstractBanyanDBDAO implements IAlarm
if (CollectionUtils.isNotEmpty(tags)) {
for (final Tag tag : tags) {
// TODO: check whether tags in the alarm are
indexed
- query.appendCondition(eq(tag.getKey(),
tag.getValue()));
+ query.and(eq(tag.getKey(), tag.getValue()));
}
}
query.setLimit(limit);
diff --git
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBBrowserLogQueryDAO.java
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBBrowserLogQueryDAO.java
index c032d864ad..7db8561463 100644
---
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBBrowserLogQueryDAO.java
+++
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBBrowserLogQueryDAO.java
@@ -58,18 +58,18 @@ public class BanyanDBBrowserLogQueryDAO extends
AbstractBanyanDBDAO implements I
BrowserErrorLogRecord.ERROR_CATEGORY,
BrowserErrorLogRecord.DATA_BINARY), tsRange, new QueryBuilder<StreamQuery>() {
@Override
public void apply(StreamQuery query) {
- query.appendCondition(eq(BrowserErrorLogRecord.SERVICE_ID,
serviceId));
+ query.and(eq(BrowserErrorLogRecord.SERVICE_ID, serviceId));
if (StringUtil.isNotEmpty(serviceVersionId)) {
-
query.appendCondition(eq(BrowserErrorLogRecord.SERVICE_VERSION_ID,
serviceVersionId));
+ query.and(eq(BrowserErrorLogRecord.SERVICE_VERSION_ID,
serviceVersionId));
}
if (StringUtil.isNotEmpty(pagePathId)) {
-
query.appendCondition(eq(BrowserErrorLogRecord.PAGE_PATH_ID, pagePathId));
+ query.and(eq(BrowserErrorLogRecord.PAGE_PATH_ID,
pagePathId));
}
if (Objects.nonNull(category)) {
-
query.appendCondition(eq(BrowserErrorLogRecord.ERROR_CATEGORY,
category.getValue()));
+ query.and(eq(BrowserErrorLogRecord.ERROR_CATEGORY,
category.getValue()));
}
query.setOffset(from);
diff --git
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBLogQueryDAO.java
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBLogQueryDAO.java
index 2274e450fd..35d97b7c08 100644
---
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBLogQueryDAO.java
+++
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBLogQueryDAO.java
@@ -62,31 +62,31 @@ public class BanyanDBLogQueryDAO extends
AbstractBanyanDBDAO implements ILogQuer
@Override
public void apply(StreamQuery query) {
if (StringUtil.isNotEmpty(serviceId)) {
- query.appendCondition(eq(AbstractLogRecord.SERVICE_ID,
serviceId));
+ query.and(eq(AbstractLogRecord.SERVICE_ID, serviceId));
}
if (StringUtil.isNotEmpty(serviceInstanceId)) {
-
query.appendCondition(eq(AbstractLogRecord.SERVICE_INSTANCE_ID,
serviceInstanceId));
+ query.and(eq(AbstractLogRecord.SERVICE_INSTANCE_ID,
serviceInstanceId));
}
if (StringUtil.isNotEmpty(endpointId)) {
- query.appendCondition(eq(AbstractLogRecord.ENDPOINT_ID,
endpointId));
+ query.and(eq(AbstractLogRecord.ENDPOINT_ID, endpointId));
}
if (Objects.nonNull(relatedTrace)) {
if (StringUtil.isNotEmpty(relatedTrace.getTraceId())) {
- query.appendCondition(eq(AbstractLogRecord.TRACE_ID,
relatedTrace.getTraceId()));
+ query.and(eq(AbstractLogRecord.TRACE_ID,
relatedTrace.getTraceId()));
}
if (StringUtil.isNotEmpty(relatedTrace.getSegmentId())) {
-
query.appendCondition(eq(AbstractLogRecord.TRACE_SEGMENT_ID,
relatedTrace.getSegmentId()));
+ query.and(eq(AbstractLogRecord.TRACE_SEGMENT_ID,
relatedTrace.getSegmentId()));
}
if (Objects.nonNull(relatedTrace.getSpanId())) {
- query.appendCondition(eq(AbstractLogRecord.SPAN_ID,
(long) relatedTrace.getSpanId()));
+ query.and(eq(AbstractLogRecord.SPAN_ID, (long)
relatedTrace.getSpanId()));
}
}
if (CollectionUtils.isNotEmpty(tags)) {
for (final Tag tag : tags) {
// TODO: check log indexed tags
- query.appendCondition(eq(tag.getKey(),
tag.getValue()));
+ query.and(eq(tag.getKey(), tag.getValue()));
}
}
}
@@ -98,9 +98,15 @@ public class BanyanDBLogQueryDAO extends AbstractBanyanDBDAO
implements ILogQuer
}
StreamQueryResponse resp = query(LogRecord.INDEX_NAME,
- ImmutableSet.of(AbstractLogRecord.SERVICE_ID,
AbstractLogRecord.SERVICE_INSTANCE_ID,
- AbstractLogRecord.ENDPOINT_ID,
AbstractLogRecord.TRACE_ID, AbstractLogRecord.TRACE_SEGMENT_ID,
- AbstractLogRecord.SPAN_ID,
AbstractLogRecord.CONTENT_TYPE, AbstractLogRecord.CONTENT,
+ ImmutableSet.of(AbstractLogRecord.SERVICE_ID,
+ AbstractLogRecord.SERVICE_INSTANCE_ID,
+ AbstractLogRecord.ENDPOINT_ID,
+ AbstractLogRecord.TRACE_ID,
+ AbstractLogRecord.TRACE_SEGMENT_ID,
+ AbstractLogRecord.SPAN_ID,
+ AbstractLogRecord.TIMESTAMP,
+ AbstractLogRecord.CONTENT_TYPE,
+ AbstractLogRecord.CONTENT,
AbstractLogRecord.TAGS_RAW_DATA), tsRange, query);
Logs logs = new Logs();
diff --git
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java
index b3d8ed77e9..fcf7228de3 100644
---
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java
+++
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBProfileThreadSnapshotQueryDAO.java
@@ -30,7 +30,6 @@ import
org.apache.skywalking.oap.server.core.storage.profiling.trace.IProfileThr
import org.apache.skywalking.oap.server.library.util.BooleanUtils;
import
org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBConverter;
import
org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
-import
org.apache.skywalking.oap.server.storage.plugin.banyandb.MetadataRegistry;
import java.io.IOException;
import java.util.ArrayList;
@@ -60,8 +59,8 @@ public class BanyanDBProfileThreadSnapshotQueryDAO extends
AbstractBanyanDBDAO i
new QueryBuilder<StreamQuery>() {
@Override
public void apply(StreamQuery query) {
-
query.appendCondition(eq(ProfileThreadSnapshotRecord.TASK_ID, taskId))
-
.appendCondition(eq(ProfileThreadSnapshotRecord.SEQUENCE, 0L));
+ query.and(eq(ProfileThreadSnapshotRecord.TASK_ID,
taskId))
+ .and(eq(ProfileThreadSnapshotRecord.SEQUENCE,
0L));
}
});
@@ -82,7 +81,7 @@ public class BanyanDBProfileThreadSnapshotQueryDAO extends
AbstractBanyanDBDAO i
new QueryBuilder<StreamQuery>() {
@Override
public void apply(StreamQuery traceQuery) {
-
traceQuery.appendCondition(eq(SegmentRecord.SEGMENT_ID, segmentID));
+ traceQuery.and(eq(SegmentRecord.SEGMENT_ID,
segmentID));
}
});
@@ -133,9 +132,9 @@ public class BanyanDBProfileThreadSnapshotQueryDAO extends
AbstractBanyanDBDAO i
new QueryBuilder<StreamQuery>() {
@Override
public void apply(StreamQuery query) {
-
query.appendCondition(eq(ProfileThreadSnapshotRecord.SEGMENT_ID, segmentId))
-
.appendCondition(lte(ProfileThreadSnapshotRecord.SEQUENCE, maxSequence))
-
.appendCondition(gte(ProfileThreadSnapshotRecord.SEQUENCE, minSequence));
+ query.and(eq(ProfileThreadSnapshotRecord.SEGMENT_ID,
segmentId))
+ .and(lte(ProfileThreadSnapshotRecord.SEQUENCE,
maxSequence))
+ .and(gte(ProfileThreadSnapshotRecord.SEQUENCE,
minSequence));
}
});
@@ -158,7 +157,7 @@ public class BanyanDBProfileThreadSnapshotQueryDAO extends
AbstractBanyanDBDAO i
new QueryBuilder<StreamQuery>() {
@Override
public void apply(StreamQuery query) {
- query.appendCondition(eq(SegmentRecord.INDEX_NAME,
segmentId));
+ query.and(eq(SegmentRecord.INDEX_NAME, segmentId));
}
});
@@ -192,9 +191,9 @@ public class BanyanDBProfileThreadSnapshotQueryDAO extends
AbstractBanyanDBDAO i
new QueryBuilder<StreamQuery>() {
@Override
public void apply(StreamQuery query) {
-
query.appendCondition(eq(ProfileThreadSnapshotRecord.SEGMENT_ID, segmentId))
-
.appendCondition(lte(ProfileThreadSnapshotRecord.DUMP_TIME, end))
-
.appendCondition(gte(ProfileThreadSnapshotRecord.DUMP_TIME, start));
+ query.and(eq(ProfileThreadSnapshotRecord.SEGMENT_ID,
segmentId))
+
.and(lte(ProfileThreadSnapshotRecord.DUMP_TIME, end))
+
.and(gte(ProfileThreadSnapshotRecord.DUMP_TIME, start));
}
});
diff --git
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java
index c2035cbc66..c0b121450c 100644
---
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java
+++
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBTraceQueryDAO.java
@@ -57,31 +57,31 @@ public class BanyanDBTraceQueryDAO extends
AbstractBanyanDBDAO implements ITrace
public void apply(StreamQuery query) {
if (minDuration != 0) {
// duration >= minDuration
- query.appendCondition(gte(SegmentRecord.LATENCY,
minDuration));
+ query.and(gte(SegmentRecord.LATENCY, minDuration));
}
if (maxDuration != 0) {
// duration <= maxDuration
- query.appendCondition(lte(SegmentRecord.LATENCY,
maxDuration));
+ query.and(lte(SegmentRecord.LATENCY, maxDuration));
}
if (!Strings.isNullOrEmpty(serviceId)) {
- query.appendCondition(eq(SegmentRecord.SERVICE_ID,
serviceId));
+ query.and(eq(SegmentRecord.SERVICE_ID, serviceId));
}
if (!Strings.isNullOrEmpty(serviceInstanceId)) {
-
query.appendCondition(eq(SegmentRecord.SERVICE_INSTANCE_ID, serviceInstanceId));
+ query.and(eq(SegmentRecord.SERVICE_INSTANCE_ID,
serviceInstanceId));
}
if (!Strings.isNullOrEmpty(endpointId)) {
- query.appendCondition(eq(SegmentRecord.ENDPOINT_ID,
endpointId));
+ query.and(eq(SegmentRecord.ENDPOINT_ID, endpointId));
}
switch (traceState) {
case ERROR:
- query.appendCondition(eq(SegmentRecord.IS_ERROR,
BooleanUtils.TRUE));
+ query.and(eq(SegmentRecord.IS_ERROR,
BooleanUtils.TRUE));
break;
case SUCCESS:
- query.appendCondition(eq(SegmentRecord.IS_ERROR,
BooleanUtils.FALSE));
+ query.and(eq(SegmentRecord.IS_ERROR,
BooleanUtils.FALSE));
break;
}
@@ -97,7 +97,7 @@ public class BanyanDBTraceQueryDAO extends
AbstractBanyanDBDAO implements ITrace
if (CollectionUtils.isNotEmpty(tags)) {
for (final Tag tag : tags) {
// TODO: check if we have this tag indexed?
- query.appendCondition(eq(tag.getKey(),
tag.getValue()));
+ query.and(eq(tag.getKey(), tag.getValue()));
}
}
@@ -160,7 +160,7 @@ public class BanyanDBTraceQueryDAO extends
AbstractBanyanDBDAO implements ITrace
new QueryBuilder<StreamQuery>() {
@Override
public void apply(StreamQuery query) {
- query.appendCondition(eq(SegmentRecord.TRACE_ID,
traceId));
+ query.and(eq(SegmentRecord.TRACE_ID, traceId));
}
});