This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository
https://gitbox.apache.org/repos/asf/skywalking-banyandb-java-client.git
The following commit(s) were added to refs/heads/main by this push:
new 5fed3cc Use client-local metadata cache (#42)
5fed3cc is described below
commit 5fed3cc07c715db87ae25146a19111be24c27ae9
Author: Jiajing LU <[email protected]>
AuthorDate: Mon Jun 19 09:13:30 2023 +0800
Use client-local metadata cache (#42)
---
CHANGES.md | 1 +
README.md | 4 +-
.../banyandb/v1/client/AbstractQuery.java | 13 +++----
.../banyandb/v1/client/AbstractWrite.java | 33 ++++++-----------
.../banyandb/v1/client/BanyanDBClient.java | 43 +++++++++++++++++++---
.../banyandb/v1/client/MeasureQuery.java | 13 +++++--
.../banyandb/v1/client/MeasureWrite.java | 5 ++-
.../skywalking/banyandb/v1/client/StreamQuery.java | 8 +++-
.../skywalking/banyandb/v1/client/StreamWrite.java | 9 +++--
.../banyandb/v1/client/metadata/MetadataCache.java | 12 +++---
.../v1/client/BanyanDBClientMeasureWriteTest.java | 2 +-
.../v1/client/BanyanDBClientStreamWriteTest.java | 4 +-
.../v1/client/ITBanyanDBMeasureQueryTests.java | 2 +-
.../v1/client/ITBanyanDBStreamQueryTests.java | 2 +-
14 files changed, 90 insertions(+), 61 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 5f403a7..d3249f3 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -8,6 +8,7 @@ Release Notes.
### Features
* Support new TopN query protocol
* Remove ID type of TAG
+* Make the global singleton MetadataCache client-local
0.3.1
------------------
diff --git a/README.md b/README.md
index fed2c99..895ef24 100644
--- a/README.md
+++ b/README.md
@@ -254,7 +254,7 @@ the order of tags must exactly be the same with that
defined in the schema.
And the non-existing tags must be fulfilled (with NullValue) instead of
compacting all non-null tag values.
```java
-StreamWrite streamWrite = new StreamWrite("default", "sw", segmentId,
now.toEpochMilli())
+StreamWrite streamWrite = client.createStreamWrite("default", "sw", segmentId,
now.toEpochMilli())
.tag("data_binary", Value.binaryTagValue(byteData))
.tag("trace_id", Value.stringTagValue(traceId)) // 0
.tag("state", Value.longTagValue(state)) // 1
@@ -287,7 +287,7 @@ A `BulkWriteProcessor` is created by calling
`buildMeasureWriteProcessor`. Then
```java
Instant now = Instant.now();
-MeasureWrite measureWrite = new MeasureWrite("sw_metric",
"service_cpm_minute", now.toEpochMilli());
+MeasureWrite measureWrite = client.createMeasureWrite("sw_metric",
"service_cpm_minute", now.toEpochMilli());
measureWrite.tag("id", TagAndValue.idTagValue("1"))
.tag("entity_id", TagAndValue.stringTagValue("entity_1"))
.field("total", TagAndValue.longFieldValue(100))
diff --git
a/src/main/java/org/apache/skywalking/banyandb/v1/client/AbstractQuery.java
b/src/main/java/org/apache/skywalking/banyandb/v1/client/AbstractQuery.java
index afe74eb..d2788c4 100644
--- a/src/main/java/org/apache/skywalking/banyandb/v1/client/AbstractQuery.java
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/AbstractQuery.java
@@ -58,8 +58,6 @@ public abstract class AbstractQuery<T> {
*/
protected final Set<String> tagProjections;
- @Getter(AccessLevel.PACKAGE)
- protected final MetadataCache.EntityMetadata metadata;
/**
* Query criteria.
*/
@@ -71,7 +69,6 @@ public abstract class AbstractQuery<T> {
this.timestampRange = timestampRange;
this.conditions = new ArrayList<>(10);
this.tagProjections = tagProjections;
- this.metadata = MetadataCache.INSTANCE.findMetadata(this.group,
this.name);
}
/**
@@ -108,7 +105,7 @@ public abstract class AbstractQuery<T> {
* @return QueryRequest for gRPC level query.
* @throws BanyanDBException thrown from entity build, e.g. invalid
reference to non-exist fields or tags.
*/
- abstract T build() throws BanyanDBException;
+ abstract T build(MetadataCache.EntityMetadata entityMetadata) throws
BanyanDBException;
protected BanyandbCommon.Metadata buildMetadata() {
return BanyandbCommon.Metadata.newBuilder()
@@ -139,14 +136,14 @@ public abstract class AbstractQuery<T> {
}, (first, second) -> second));
}
- protected BanyandbModel.TagProjection buildTagProjections() throws
BanyanDBException {
- return this.buildTagProjections(this.tagProjections);
+ protected BanyandbModel.TagProjection
buildTagProjections(MetadataCache.EntityMetadata entityMetadata) throws
BanyanDBException {
+ return this.buildTagProjections(entityMetadata, this.tagProjections);
}
- protected BanyandbModel.TagProjection buildTagProjections(Iterable<String>
tagProjections) throws BanyanDBException {
+ protected BanyandbModel.TagProjection
buildTagProjections(MetadataCache.EntityMetadata entityMetadata,
Iterable<String> tagProjections) throws BanyanDBException {
final ListMultimap<String, String> projectionMap =
ArrayListMultimap.create();
for (final String tagName : tagProjections) {
- final Optional<MetadataCache.TagInfo> tagInfo =
this.metadata.findTagInfo(tagName);
+ final Optional<MetadataCache.TagInfo> tagInfo =
entityMetadata.findTagInfo(tagName);
if (!tagInfo.isPresent()) {
throw InvalidReferenceException.fromInvalidTag(tagName);
}
diff --git
a/src/main/java/org/apache/skywalking/banyandb/v1/client/AbstractWrite.java
b/src/main/java/org/apache/skywalking/banyandb/v1/client/AbstractWrite.java
index 4e07af9..dd317d7 100644
--- a/src/main/java/org/apache/skywalking/banyandb/v1/client/AbstractWrite.java
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/AbstractWrite.java
@@ -22,6 +22,7 @@ import com.google.protobuf.Timestamp;
import java.util.Map;
import java.util.Optional;
+
import lombok.Getter;
import org.apache.skywalking.banyandb.common.v1.BanyandbCommon;
import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
@@ -31,16 +32,6 @@ import
org.apache.skywalking.banyandb.v1.client.metadata.MetadataCache;
import org.apache.skywalking.banyandb.v1.client.metadata.Serializable;
public abstract class AbstractWrite<P extends
com.google.protobuf.GeneratedMessageV3> {
- /**
- * Group name of the current entity
- */
- @Getter
- protected final String group;
- /**
- * Owner name of the current entity
- */
- @Getter
- protected final String name;
/**
* Timestamp represents the time of current stream
* in the timeunit of milliseconds.
@@ -52,22 +43,20 @@ public abstract class AbstractWrite<P extends
com.google.protobuf.GeneratedMessa
protected final MetadataCache.EntityMetadata entityMetadata;
- public AbstractWrite(String group, String name, long timestamp) {
- this.group = group;
- this.name = name;
- this.timestamp = timestamp;
- this.entityMetadata = MetadataCache.INSTANCE.findMetadata(group, name);
- if (this.entityMetadata == null) {
+ public AbstractWrite(MetadataCache.EntityMetadata entityMetadata, long
timestamp) {
+ if (entityMetadata == null) {
throw new IllegalArgumentException("metadata not found");
}
+ this.entityMetadata = entityMetadata;
+ this.timestamp = timestamp;
this.tags = new Object[this.entityMetadata.getTotalTags()];
}
/**
- * Build a write without initial timestamp.
+ * Build a write request without initial timestamp.
*/
- public AbstractWrite(String group, String name) {
- this(group, name, 0);
+ AbstractWrite(MetadataCache.EntityMetadata entityMetadata) {
+ this(entityMetadata, 0);
}
public AbstractWrite<P> tag(String tagName,
Serializable<BanyandbModel.TagValue> tagValue) throws BanyanDBException {
@@ -85,7 +74,7 @@ public abstract class AbstractWrite<P extends
com.google.protobuf.GeneratedMessa
}
BanyandbCommon.Metadata metadata = BanyandbCommon.Metadata.newBuilder()
- .setGroup(this.group).setName(this.name).build();
+
.setGroup(entityMetadata.getGroup()).setName(entityMetadata.getName()).build();
Timestamp ts = Timestamp.newBuilder()
.setSeconds(timestamp / 1000)
.setNanos((int) (timestamp % 1000 * 1_000_000)).build();
@@ -97,8 +86,8 @@ public abstract class AbstractWrite<P extends
com.google.protobuf.GeneratedMessa
@Override
public String toString() {
StringBuilder stringBuilder = new StringBuilder();
- stringBuilder.append("group=").append(group).append(",
").append("name=")
- .append(name).append(",
").append("timestamp=").append(timestamp).append(", ");
+
stringBuilder.append("group=").append(entityMetadata.getGroup()).append(",
").append("name=")
+ .append(entityMetadata.getName()).append(",
").append("timestamp=").append(timestamp).append(", ");
for (int i = 0; i < this.tags.length; i++) {
final int index = i;
Map<String, MetadataCache.TagInfo> tagMap =
this.entityMetadata.getTagOffset();
diff --git
a/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java
b/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java
index 4fbe8ea..04a16f3 100644
--- a/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java
@@ -125,6 +125,11 @@ public class BanyanDBClient implements Closeable {
*/
private final ReentrantLock connectionEstablishLock;
+ /**
+ * Client local metadata cache.
+ */
+ private final MetadataCache metadataCache;
+
/**
* Create a BanyanDB client instance with a default options.
*
@@ -147,6 +152,7 @@ public class BanyanDBClient implements Closeable {
this.port = port;
this.options = options;
this.connectionEstablishLock = new ReentrantLock();
+ this.metadataCache = new MetadataCache();
}
/**
@@ -251,6 +257,31 @@ public class BanyanDBClient implements Closeable {
return new MeasureBulkWriteProcessor(this.measureServiceStub,
maxBulkSize, flushInterval, concurrency);
}
+ /**
+ * Build a MeasureWrite request.
+ *
+ * @param group the group of the measure
+ * @param name the name of the measure
+ * @param timestamp the timestamp of the measure
+ * @return the request to be built
+ */
+ public MeasureWrite createMeasureWrite(String group, String name, long
timestamp) {
+ return new MeasureWrite(this.metadataCache.findMetadata(group, name),
timestamp);
+ }
+
+ /**
+ * Build a StreamWrite request.
+ *
+ * @param group the group of the stream
+ * @param name the name of the stream
+ * @param elementId the primary key of the stream
+ * @param timestamp the timestamp of the stream
+ * @return the request to be built
+ */
+ public StreamWrite createStreamWrite(String group, String name, final
String elementId, long timestamp) {
+ return new StreamWrite(this.metadataCache.findMetadata(group, name),
elementId, timestamp);
+ }
+
/**
* Query streams according to given conditions
*
@@ -263,7 +294,7 @@ public class BanyanDBClient implements Closeable {
final BanyandbStream.QueryResponse response =
HandleExceptionsWith.callAndTranslateApiException(() ->
this.streamServiceBlockingStub
.withDeadlineAfter(this.getOptions().getDeadline(),
TimeUnit.SECONDS)
- .query(streamQuery.build()));
+
.query(streamQuery.build(this.metadataCache.findMetadata(streamQuery.group,
streamQuery.name))));
return new StreamQueryResponse(response);
}
@@ -295,7 +326,7 @@ public class BanyanDBClient implements Closeable {
final BanyandbMeasure.QueryResponse response =
HandleExceptionsWith.callAndTranslateApiException(() ->
this.measureServiceBlockingStub
.withDeadlineAfter(this.getOptions().getDeadline(),
TimeUnit.SECONDS)
- .query(measureQuery.build()));
+
.query(measureQuery.build(this.metadataCache.findMetadata(measureQuery.group,
measureQuery.name))));
return new MeasureQueryResponse(response);
}
@@ -320,7 +351,7 @@ public class BanyanDBClient implements Closeable {
StreamMetadataRegistry streamRegistry = new
StreamMetadataRegistry(checkNotNull(this.channel));
streamRegistry.create(stream);
defineIndexRules(stream, stream.indexRules());
- MetadataCache.INSTANCE.register(stream);
+ this.metadataCache.register(stream);
}
/**
@@ -332,7 +363,7 @@ public class BanyanDBClient implements Closeable {
MeasureMetadataRegistry measureRegistry = new
MeasureMetadataRegistry(checkNotNull(this.channel));
measureRegistry.create(measure);
defineIndexRules(measure, measure.indexRules());
- MetadataCache.INSTANCE.register(measure);
+ this.metadataCache.register(measure);
}
/**
@@ -539,7 +570,7 @@ public class BanyanDBClient implements Closeable {
Stream s = new
StreamMetadataRegistry(checkNotNull(this.channel)).get(group, name);
s = s.withIndexRules(findIndexRulesByGroupAndBindingName(group,
IndexRuleBinding.defaultBindingRule(name)));
- MetadataCache.INSTANCE.register(s);
+ this.metadataCache.register(s);
return s;
}
@@ -556,7 +587,7 @@ public class BanyanDBClient implements Closeable {
Measure m = new
MeasureMetadataRegistry(checkNotNull(this.channel)).get(group, name);
m = m.withIndexRules(findIndexRulesByGroupAndBindingName(group,
IndexRuleBinding.defaultBindingRule(name)));
- MetadataCache.INSTANCE.register(m);
+ this.metadataCache.register(m);
return m;
}
diff --git
a/src/main/java/org/apache/skywalking/banyandb/v1/client/MeasureQuery.java
b/src/main/java/org/apache/skywalking/banyandb/v1/client/MeasureQuery.java
index 4ae04af..63e83b7 100644
--- a/src/main/java/org/apache/skywalking/banyandb/v1/client/MeasureQuery.java
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/MeasureQuery.java
@@ -25,6 +25,7 @@ import lombok.Setter;
import org.apache.skywalking.banyandb.measure.v1.BanyandbMeasure;
import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
import
org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException;
+import org.apache.skywalking.banyandb.v1.client.metadata.MetadataCache;
import java.util.Set;
@@ -142,7 +143,11 @@ public class MeasureQuery extends
AbstractQuery<BanyandbMeasure.QueryRequest> {
/**
* @return QueryRequest for gRPC level query.
*/
- BanyandbMeasure.QueryRequest build() throws BanyanDBException {
+ @Override
+ BanyandbMeasure.QueryRequest build(MetadataCache.EntityMetadata
entityMetadata) throws BanyanDBException {
+ if (entityMetadata == null) {
+ throw new IllegalArgumentException("entity metadata is null");
+ }
final BanyandbMeasure.QueryRequest.Builder builder =
BanyandbMeasure.QueryRequest.newBuilder();
builder.setMetadata(buildMetadata());
if (timestampRange != null) {
@@ -150,9 +155,9 @@ public class MeasureQuery extends
AbstractQuery<BanyandbMeasure.QueryRequest> {
} else {
builder.setTimeRange(TimestampRange.MAX_RANGE);
}
- BanyandbModel.TagProjection tagProjections = buildTagProjections();
+ BanyandbModel.TagProjection tagProjections =
buildTagProjections(entityMetadata);
if (tagProjections.getTagFamiliesCount() > 0) {
- builder.setTagProjection(buildTagProjections());
+ builder.setTagProjection(buildTagProjections(entityMetadata));
}
if (!fieldProjections.isEmpty()) {
builder.setFieldProjection(BanyandbMeasure.QueryRequest.FieldProjection.newBuilder()
@@ -161,7 +166,7 @@ public class MeasureQuery extends
AbstractQuery<BanyandbMeasure.QueryRequest> {
}
if (this.aggregation != null) {
BanyandbMeasure.QueryRequest.GroupBy.Builder groupByBuilder =
BanyandbMeasure.QueryRequest.GroupBy.newBuilder()
-
.setTagProjection(buildTagProjections(this.aggregation.groupByTagsProjection));
+ .setTagProjection(buildTagProjections(entityMetadata,
this.aggregation.groupByTagsProjection));
if (Strings.isNullOrEmpty(this.aggregation.fieldName)) {
if (this.aggregation.aggregationType !=
Aggregation.Type.UNSPECIFIED) {
throw new IllegalArgumentException("field name cannot be
null or empty if aggregation is specified");
diff --git
a/src/main/java/org/apache/skywalking/banyandb/v1/client/MeasureWrite.java
b/src/main/java/org/apache/skywalking/banyandb/v1/client/MeasureWrite.java
index e0c5ee4..7510128 100644
--- a/src/main/java/org/apache/skywalking/banyandb/v1/client/MeasureWrite.java
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/MeasureWrite.java
@@ -23,6 +23,7 @@ import
org.apache.skywalking.banyandb.common.v1.BanyandbCommon;
import org.apache.skywalking.banyandb.measure.v1.BanyandbMeasure;
import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
import
org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException;
+import org.apache.skywalking.banyandb.v1.client.metadata.MetadataCache;
import org.apache.skywalking.banyandb.v1.client.metadata.Serializable;
import java.util.Deque;
@@ -35,8 +36,8 @@ public class MeasureWrite extends
AbstractWrite<BanyandbMeasure.WriteRequest> {
*/
private final Object[] fields;
- public MeasureWrite(final String group, final String name, long timestamp)
{
- super(group, name, timestamp);
+ MeasureWrite(MetadataCache.EntityMetadata entityMetadata, long timestamp) {
+ super(entityMetadata, timestamp);
this.fields = new Object[this.entityMetadata.getTotalFields()];
}
diff --git
a/src/main/java/org/apache/skywalking/banyandb/v1/client/StreamQuery.java
b/src/main/java/org/apache/skywalking/banyandb/v1/client/StreamQuery.java
index b46665b..1c453f7 100644
--- a/src/main/java/org/apache/skywalking/banyandb/v1/client/StreamQuery.java
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/StreamQuery.java
@@ -23,6 +23,7 @@ import java.util.Set;
import lombok.Setter;
import org.apache.skywalking.banyandb.stream.v1.BanyandbStream;
import
org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException;
+import org.apache.skywalking.banyandb.v1.client.metadata.MetadataCache;
/**
* StreamQuery is the high-level query API for the stream model.
@@ -63,13 +64,16 @@ public class StreamQuery extends
AbstractQuery<BanyandbStream.QueryRequest> {
}
@Override
- BanyandbStream.QueryRequest build() throws BanyanDBException {
+ BanyandbStream.QueryRequest build(MetadataCache.EntityMetadata
entityMetadata) throws BanyanDBException {
+ if (entityMetadata == null) {
+ throw new IllegalArgumentException("entity metadata is null");
+ }
final BanyandbStream.QueryRequest.Builder builder =
BanyandbStream.QueryRequest.newBuilder()
.setMetadata(buildMetadata());
if (timestampRange != null) {
builder.setTimeRange(timestampRange.build());
}
- builder.setProjection(buildTagProjections());
+ builder.setProjection(buildTagProjections(entityMetadata));
buildCriteria().ifPresent(builder::setCriteria);
builder.setOffset(offset);
builder.setLimit(limit);
diff --git
a/src/main/java/org/apache/skywalking/banyandb/v1/client/StreamWrite.java
b/src/main/java/org/apache/skywalking/banyandb/v1/client/StreamWrite.java
index 78f60ab..a9f3c2c 100644
--- a/src/main/java/org/apache/skywalking/banyandb/v1/client/StreamWrite.java
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/StreamWrite.java
@@ -26,6 +26,7 @@ import
org.apache.skywalking.banyandb.common.v1.BanyandbCommon;
import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
import org.apache.skywalking.banyandb.stream.v1.BanyandbStream;
import
org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException;
+import org.apache.skywalking.banyandb.v1.client.metadata.MetadataCache;
import org.apache.skywalking.banyandb.v1.client.metadata.Serializable;
/**
@@ -39,16 +40,16 @@ public class StreamWrite extends
AbstractWrite<BanyandbStream.WriteRequest> {
@Getter
private final String elementId;
- public StreamWrite(final String group, final String name, final String
elementId, long timestamp) {
- super(group, name, timestamp);
+ StreamWrite(MetadataCache.EntityMetadata entityMetadata, final String
elementId, long timestamp) {
+ super(entityMetadata, timestamp);
this.elementId = elementId;
}
/**
* Create a StreamWrite without initial timestamp.
*/
- public StreamWrite(final String group, final String name, final String
elementId) {
- super(group, name);
+ StreamWrite(MetadataCache.EntityMetadata entityMetadata, final String
elementId) {
+ super(entityMetadata);
this.elementId = elementId;
}
diff --git
a/src/main/java/org/apache/skywalking/banyandb/v1/client/metadata/MetadataCache.java
b/src/main/java/org/apache/skywalking/banyandb/v1/client/metadata/MetadataCache.java
index f853513..e57c280 100644
---
a/src/main/java/org/apache/skywalking/banyandb/v1/client/metadata/MetadataCache.java
+++
b/src/main/java/org/apache/skywalking/banyandb/v1/client/metadata/MetadataCache.java
@@ -27,12 +27,10 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
-public enum MetadataCache {
- INSTANCE;
-
+public class MetadataCache {
private final Map<String, EntityMetadata> cache;
- MetadataCache() {
+ public MetadataCache() {
this.cache = new CopyOnWriteMap<>();
}
@@ -67,7 +65,7 @@ public enum MetadataCache {
tagInfo.put(s.tagFamilies().get(i).tagSpecs().get(j).getTagName(), new
TagInfo(tagFamilyName, k++));
}
}
- return new EntityMetadata(totalTags, 0, tagFamilyCapacity,
+ return new EntityMetadata(s.group(), s.name(), totalTags, 0,
tagFamilyCapacity,
Collections.unmodifiableMap(tagInfo),
Collections.emptyMap());
}
@@ -89,13 +87,15 @@ public enum MetadataCache {
for (int i = 0; i < m.fields().size(); i++) {
fieldOffset.put(m.fields().get(i).getName(), i);
}
- return new EntityMetadata(totalTags, m.fields().size(),
tagFamilyCapacity,
+ return new EntityMetadata(m.group(), m.name(), totalTags,
m.fields().size(), tagFamilyCapacity,
Collections.unmodifiableMap(tagOffset),
Collections.unmodifiableMap(fieldOffset));
}
@Getter
@RequiredArgsConstructor
public static class EntityMetadata {
+ private final String group;
+ private final String name;
private final int totalTags;
private final int totalFields;
diff --git
a/src/test/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClientMeasureWriteTest.java
b/src/test/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClientMeasureWriteTest.java
index 2f4fc88..7bb8437 100644
---
a/src/test/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClientMeasureWriteTest.java
+++
b/src/test/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClientMeasureWriteTest.java
@@ -108,7 +108,7 @@ public class BanyanDBClientMeasureWriteTest extends
AbstractBanyanDBClientTest {
serviceRegistry.addService(serviceImpl);
Instant now = Instant.now();
- MeasureWrite measureWrite = new MeasureWrite("sw_metric",
"service_cpm_minute", now.toEpochMilli());
+ MeasureWrite measureWrite = client.createMeasureWrite("sw_metric",
"service_cpm_minute", now.toEpochMilli());
measureWrite.tag("entity_id", TagAndValue.stringTagValue("entity_1"))
.field("total", TagAndValue.longFieldValue(100))
.field("value", TagAndValue.longFieldValue(1));
diff --git
a/src/test/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClientStreamWriteTest.java
b/src/test/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClientStreamWriteTest.java
index 3d80b40..2827987 100644
---
a/src/test/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClientStreamWriteTest.java
+++
b/src/test/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClientStreamWriteTest.java
@@ -155,7 +155,7 @@ public class BanyanDBClientStreamWriteTest extends
AbstractBanyanDBClientTest {
String httpStatusCode = "200";
String dbType = "SQL";
- StreamWrite streamWrite = new StreamWrite("default", "sw", segmentId,
now.toEpochMilli())
+ StreamWrite streamWrite = client.createStreamWrite("default", "sw",
segmentId, now.toEpochMilli())
.tag("data_binary", Value.binaryTagValue(byteData))
.tag("trace_id", Value.stringTagValue(traceId)) // 0
.tag("state", Value.longTagValue(state)) // 1
@@ -235,7 +235,7 @@ public class BanyanDBClientStreamWriteTest extends
AbstractBanyanDBClientTest {
String dbType = "SQL";
String dbInstance = "127.0.0.1:3306";
- StreamWrite streamWrite = new StreamWrite("default", "sw", segmentId,
now.toEpochMilli())
+ StreamWrite streamWrite = client.createStreamWrite("default", "sw",
segmentId, now.toEpochMilli())
.tag("data_binary", Value.binaryTagValue(byteData))
.tag("trace_id", Value.stringTagValue(traceId)) // 0
.tag("state", Value.longTagValue(state)) // 1
diff --git
a/src/test/java/org/apache/skywalking/banyandb/v1/client/ITBanyanDBMeasureQueryTests.java
b/src/test/java/org/apache/skywalking/banyandb/v1/client/ITBanyanDBMeasureQueryTests.java
index 1f310f6..3f8c731 100644
---
a/src/test/java/org/apache/skywalking/banyandb/v1/client/ITBanyanDBMeasureQueryTests.java
+++
b/src/test/java/org/apache/skywalking/banyandb/v1/client/ITBanyanDBMeasureQueryTests.java
@@ -70,7 +70,7 @@ public class ITBanyanDBMeasureQueryTests extends
BanyanDBClientTestCI {
Instant now = Instant.now();
Instant begin = now.minus(15, ChronoUnit.MINUTES);
- MeasureWrite measureWrite = new MeasureWrite("sw_metric",
"service_cpm_minute", now.toEpochMilli());
+ MeasureWrite measureWrite = client.createMeasureWrite("sw_metric",
"service_cpm_minute", now.toEpochMilli());
measureWrite.tag("entity_id",
TagAndValue.stringTagValue("entity_1")).field("total",
TagAndValue.longFieldValue(100)).field("value", TagAndValue.longFieldValue(1));
CompletableFuture<Void> f = processor.add(measureWrite);
diff --git
a/src/test/java/org/apache/skywalking/banyandb/v1/client/ITBanyanDBStreamQueryTests.java
b/src/test/java/org/apache/skywalking/banyandb/v1/client/ITBanyanDBStreamQueryTests.java
index 7619bd8..9088ebe 100644
---
a/src/test/java/org/apache/skywalking/banyandb/v1/client/ITBanyanDBStreamQueryTests.java
+++
b/src/test/java/org/apache/skywalking/banyandb/v1/client/ITBanyanDBStreamQueryTests.java
@@ -106,7 +106,7 @@ public class ITBanyanDBStreamQueryTests extends
BanyanDBClientTestCI {
String dbType = "SQL";
String dbInstance = "127.0.0.1:3306";
- StreamWrite streamWrite = new StreamWrite("default", "sw", segmentId,
now.toEpochMilli())
+ StreamWrite streamWrite = client.createStreamWrite("default", "sw",
segmentId, now.toEpochMilli())
.tag("data_binary", Value.binaryTagValue(byteData))
.tag("trace_id", Value.stringTagValue(traceId)) // 0
.tag("state", Value.longTagValue(state)) // 1