This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository
https://gitbox.apache.org/repos/asf/skywalking-banyandb-java-client.git
The following commit(s) were added to refs/heads/main by this push:
new c75d56f Optimize APIs, remove unnecessary ts parameter of trace model
(#95)
c75d56f is described below
commit c75d56f28920dc8e8f2938e4bebe4455223c2d29
Author: 吴晟 Wu Sheng <[email protected]>
AuthorDate: Fri Sep 5 10:25:16 2025 +0800
Optimize APIs, remove unnecessary ts parameter of trace model (#95)
---
.../banyandb/v1/client/AbstractWrite.java | 39 ++++++++++------------
.../banyandb/v1/client/BanyanDBClient.java | 29 ----------------
.../banyandb/v1/client/MeasureWrite.java | 9 ++++-
.../skywalking/banyandb/v1/client/StreamWrite.java | 19 +++++++----
.../skywalking/banyandb/v1/client/TraceWrite.java | 15 ++-------
.../v1/client/ITBanyanDBStreamQueryTests.java | 3 +-
.../skywalking/banyandb/v1/client/ITTraceTest.java | 8 ++---
7 files changed, 46 insertions(+), 76 deletions(-)
diff --git
a/src/main/java/org/apache/skywalking/banyandb/v1/client/AbstractWrite.java
b/src/main/java/org/apache/skywalking/banyandb/v1/client/AbstractWrite.java
index 72067da..bae244d 100644
--- a/src/main/java/org/apache/skywalking/banyandb/v1/client/AbstractWrite.java
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/AbstractWrite.java
@@ -18,8 +18,6 @@
package org.apache.skywalking.banyandb.v1.client;
-import com.google.protobuf.Timestamp;
-
import java.util.Map;
import java.util.Optional;
@@ -33,30 +31,35 @@ import
org.apache.skywalking.banyandb.v1.client.metadata.Serializable;
public abstract class AbstractWrite<P extends
com.google.protobuf.GeneratedMessageV3> {
/**
- * Timestamp represents the time of current stream
- * in the timeunit of milliseconds.
+ * Timestamp represents the time of the current data point, in
milliseconds.
+ * <p>
+ * <b>When to set:</b>
+ * <ul>
+ * <li><b>Stream and Measure writes:</b> This field <i>must</i> be set
to indicate the event time.</li>
+ * <li><b>Trace writes:</b> This field is <i>not needed</i> and should
be left unset; trace data does not require an explicit timestamp here.</li>
+ * </ul>
*/
@Getter
- protected long timestamp;
+ protected Optional<Long> timestamp;
protected final Object[] tags;
protected final MetadataCache.EntityMetadata entityMetadata;
public AbstractWrite(MetadataCache.EntityMetadata entityMetadata, long
timestamp) {
- if (entityMetadata == null) {
- throw new IllegalArgumentException("metadata not found");
- }
- this.entityMetadata = entityMetadata;
- this.timestamp = timestamp;
- this.tags = new Object[this.entityMetadata.getTotalTags()];
+ this(entityMetadata);
+ this.timestamp = Optional.of(timestamp);
}
/**
* Build a write request without initial timestamp.
*/
AbstractWrite(MetadataCache.EntityMetadata entityMetadata) {
- this(entityMetadata, 0);
+ if (entityMetadata == null) {
+ throw new IllegalArgumentException("metadata not found");
+ }
+ this.entityMetadata = entityMetadata;
+ this.tags = new Object[this.entityMetadata.getTotalTags()];
}
public AbstractWrite<P> tag(String tagName,
Serializable<BanyandbModel.TagValue> tagValue) throws BanyanDBException {
@@ -69,19 +72,13 @@ public abstract class AbstractWrite<P extends
com.google.protobuf.GeneratedMessa
}
P build() {
- if (timestamp <= 0) {
- throw new IllegalArgumentException("timestamp is invalid.");
- }
-
BanyandbCommon.Metadata metadata = BanyandbCommon.Metadata.newBuilder()
.setGroup(entityMetadata.getGroup()).setName(entityMetadata.getName()).setModRevision(entityMetadata.getModRevision()).build();
- Timestamp ts = Timestamp.newBuilder()
- .setSeconds(timestamp / 1000)
- .setNanos((int) (timestamp % 1000 * 1_000_000)).build();
- return build(metadata, ts);
+
+ return build(metadata);
}
- protected abstract P build(BanyandbCommon.Metadata metadata, Timestamp ts);
+ protected abstract P build(BanyandbCommon.Metadata metadata);
@Override
public String toString() {
diff --git
a/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java
b/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java
index 8f8e252..bcae829 100644
--- a/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java
@@ -399,21 +399,6 @@ public class BanyanDBClient implements Closeable {
return new StreamWrite(this.metadataCache.findStreamMetadata(group,
name), elementId);
}
- /**
- * Build a StreamWrite request.
- *
- * @param group the group of the stream
- * @param name the name of the stream
- * @param elementId the primary key of the stream
- * @param timestamp the timestamp of the stream
- * @return the request to be built
- */
- public StreamWrite createStreamWrite(String group, String name, final
String elementId, long timestamp) throws BanyanDBException {
- Preconditions.checkArgument(!Strings.isNullOrEmpty(group));
- Preconditions.checkArgument(!Strings.isNullOrEmpty(name));
- return new StreamWrite(this.metadataCache.findStreamMetadata(group,
name), elementId, timestamp);
- }
-
/**
* Build a trace bulk write processor.
*
@@ -430,20 +415,6 @@ public class BanyanDBClient implements Closeable {
return new TraceBulkWriteProcessor(this, maxBulkSize, flushInterval,
concurrency, timeout, WRITE_HISTOGRAM, options);
}
- /**
- * Build a TraceWrite request.
- *
- * @param group the group of the trace
- * @param name the name of the trace
- * @param timestamp the timestamp of the trace
- * @return the request to be built
- */
- public TraceWrite createTraceWrite(String group, String name, long
timestamp) throws BanyanDBException {
- Preconditions.checkArgument(!Strings.isNullOrEmpty(group));
- Preconditions.checkArgument(!Strings.isNullOrEmpty(name));
- return new TraceWrite(this.metadataCache.findTraceMetadata(group,
name), timestamp);
- }
-
/**
* Build a TraceWrite request without initial timestamp.
*
diff --git
a/src/main/java/org/apache/skywalking/banyandb/v1/client/MeasureWrite.java
b/src/main/java/org/apache/skywalking/banyandb/v1/client/MeasureWrite.java
index d405c61..5b16dac 100644
--- a/src/main/java/org/apache/skywalking/banyandb/v1/client/MeasureWrite.java
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/MeasureWrite.java
@@ -58,7 +58,14 @@ public class MeasureWrite extends
AbstractWrite<BanyandbMeasure.WriteRequest> {
* @return {@link BanyandbMeasure.WriteRequest} for the bulk process.
*/
@Override
- protected BanyandbMeasure.WriteRequest build(BanyandbCommon.Metadata
metadata, Timestamp ts) {
+ protected BanyandbMeasure.WriteRequest build(BanyandbCommon.Metadata
metadata) {
+ if (!timestamp.isPresent() || timestamp.get() <= 0) {
+ throw new IllegalArgumentException("Timestamp is required and must
be greater than 0 for stream writes.");
+ }
+ Timestamp ts = Timestamp.newBuilder()
+ .setSeconds(timestamp.get() / 1000)
+ .setNanos((int) (timestamp.get() % 1000 * 1_000_000)).build();
+
final BanyandbMeasure.WriteRequest.Builder builder =
BanyandbMeasure.WriteRequest.newBuilder();
builder.setMetadata(metadata);
final BanyandbMeasure.DataPointValue.Builder datapointValueBuilder =
BanyandbMeasure.DataPointValue.newBuilder();
diff --git
a/src/main/java/org/apache/skywalking/banyandb/v1/client/StreamWrite.java
b/src/main/java/org/apache/skywalking/banyandb/v1/client/StreamWrite.java
index d77b26e..c10d70d 100644
--- a/src/main/java/org/apache/skywalking/banyandb/v1/client/StreamWrite.java
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/StreamWrite.java
@@ -21,6 +21,8 @@ package org.apache.skywalking.banyandb.v1.client;
import com.google.protobuf.Timestamp;
import java.util.Deque;
import java.util.LinkedList;
+import java.util.Optional;
+
import lombok.Getter;
import org.apache.skywalking.banyandb.common.v1.BanyandbCommon;
import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
@@ -40,11 +42,6 @@ public class StreamWrite extends
AbstractWrite<BanyandbStream.WriteRequest> {
@Getter
private final String elementId;
- StreamWrite(MetadataCache.EntityMetadata entityMetadata, final String
elementId, long timestamp) {
- super(entityMetadata, timestamp);
- this.elementId = elementId;
- }
-
/**
* Create a StreamWrite without initial timestamp.
*/
@@ -59,7 +56,7 @@ public class StreamWrite extends
AbstractWrite<BanyandbStream.WriteRequest> {
}
public void setTimestamp(long timestamp) {
- super.timestamp = timestamp;
+ super.timestamp = Optional.of(timestamp);
}
/**
@@ -68,7 +65,15 @@ public class StreamWrite extends
AbstractWrite<BanyandbStream.WriteRequest> {
* @return {@link BanyandbStream.WriteRequest} for the bulk process.
*/
@Override
- protected BanyandbStream.WriteRequest build(BanyandbCommon.Metadata
metadata, Timestamp ts) {
+ protected BanyandbStream.WriteRequest build(BanyandbCommon.Metadata
metadata) {
+ if (!timestamp.isPresent() || timestamp.get() <= 0) {
+ throw new IllegalArgumentException("Timestamp is required and must
be greater than 0 for stream writes.");
+ }
+
+ Timestamp ts = Timestamp.newBuilder()
+ .setSeconds(timestamp.get() / 1000)
+ .setNanos((int) (timestamp.get() % 1000 * 1_000_000)).build();
+
final BanyandbStream.WriteRequest.Builder builder =
BanyandbStream.WriteRequest.newBuilder();
builder.setMetadata(metadata);
final BanyandbStream.ElementValue.Builder elemValBuilder =
BanyandbStream.ElementValue.newBuilder();
diff --git
a/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceWrite.java
b/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceWrite.java
index d0ce687..036ad77 100644
--- a/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceWrite.java
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceWrite.java
@@ -19,7 +19,6 @@
package org.apache.skywalking.banyandb.v1.client;
import com.google.protobuf.ByteString;
-import com.google.protobuf.Timestamp;
import java.util.ArrayList;
import java.util.List;
@@ -34,7 +33,7 @@ import
org.apache.skywalking.banyandb.v1.client.metadata.Serializable;
/**
* TraceWrite represents a write operation, including necessary fields, for
{@link
- * BanyanDBClient#buildTraceBulkWriteProcessor}.
+ * BanyanDBClient#buildTraceWriteProcessor(int, int, int, int)}.
*/
public class TraceWrite extends AbstractWrite<BanyandbTrace.WriteRequest> {
/**
@@ -49,12 +48,6 @@ public class TraceWrite extends
AbstractWrite<BanyandbTrace.WriteRequest> {
@Getter
private long version;
- TraceWrite(MetadataCache.EntityMetadata entityMetadata, long timestamp) {
- super(entityMetadata, timestamp);
- this.span = ByteString.EMPTY;
- this.version = 1L;
- }
-
/**
* Create a TraceWrite without initial timestamp.
*/
@@ -99,17 +92,13 @@ public class TraceWrite extends
AbstractWrite<BanyandbTrace.WriteRequest> {
return this;
}
- public void setTimestamp(long timestamp) {
- super.timestamp = timestamp;
- }
-
/**
* Build a write request
*
* @return {@link BanyandbTrace.WriteRequest} for the bulk process.
*/
@Override
- protected BanyandbTrace.WriteRequest build(BanyandbCommon.Metadata
metadata, Timestamp ts) {
+ protected BanyandbTrace.WriteRequest build(BanyandbCommon.Metadata
metadata) {
final BanyandbTrace.WriteRequest.Builder builder =
BanyandbTrace.WriteRequest.newBuilder();
builder.setMetadata(metadata);
diff --git
a/src/test/java/org/apache/skywalking/banyandb/v1/client/ITBanyanDBStreamQueryTests.java
b/src/test/java/org/apache/skywalking/banyandb/v1/client/ITBanyanDBStreamQueryTests.java
index c4f2573..c394138 100644
---
a/src/test/java/org/apache/skywalking/banyandb/v1/client/ITBanyanDBStreamQueryTests.java
+++
b/src/test/java/org/apache/skywalking/banyandb/v1/client/ITBanyanDBStreamQueryTests.java
@@ -94,7 +94,7 @@ public class ITBanyanDBStreamQueryTests extends
BanyanDBClientTestCI {
String dbType = "SQL";
String dbInstance = "127.0.0.1:3306";
- StreamWrite streamWrite = client.createStreamWrite("sw_record",
"trace", segmentId, now.toEpochMilli())
+ StreamWrite streamWrite = client.createStreamWrite("sw_record",
"trace", segmentId)
.tag("data_binary", Value.binaryTagValue(byteData))
.tag("trace_id", Value.stringTagValue(traceId)) // 0
.tag("state", Value.longTagValue(state)) // 1
@@ -109,6 +109,7 @@ public class ITBanyanDBStreamQueryTests extends
BanyanDBClientTestCI {
.tag("mq.broker", Value.stringTagValue(broker)) // 10
.tag("mq.topic", Value.stringTagValue(topic)) // 11
.tag("mq.queue", Value.stringTagValue(queue)); // 12
+ streamWrite.setTimestamp(now.toEpochMilli());
CompletableFuture<Void> f = processor.add(streamWrite);
f.exceptionally(exp -> {
diff --git
a/src/test/java/org/apache/skywalking/banyandb/v1/client/ITTraceTest.java
b/src/test/java/org/apache/skywalking/banyandb/v1/client/ITTraceTest.java
index cd4d2f0..76eb168 100644
--- a/src/test/java/org/apache/skywalking/banyandb/v1/client/ITTraceTest.java
+++ b/src/test/java/org/apache/skywalking/banyandb/v1/client/ITTraceTest.java
@@ -129,7 +129,7 @@ public class ITTraceTest extends BanyanDBClientTestCI {
byte[] spanData = "query-test-span-data".getBytes();
// Create and write trace data
- TraceWrite traceWrite = client.createTraceWrite(groupName, traceName,
now.toEpochMilli())
+ TraceWrite traceWrite = client.createTraceWrite(groupName, traceName)
.tag("trace_id", Value.stringTagValue(traceId))
.tag("span_id", Value.stringTagValue(spanId))
.tag("service_name", Value.stringTagValue(serviceName))
@@ -183,7 +183,7 @@ public class ITTraceTest extends BanyanDBClientTestCI {
Instant baseTime = Instant.now().minusSeconds(60); // Start 1 minute
ago
// Create 3 traces with different timestamps (1 minute apart)
- TraceWrite trace1 = client.createTraceWrite(groupName, traceName,
baseTime.toEpochMilli())
+ TraceWrite trace1 = client.createTraceWrite(groupName, traceName)
.tag("trace_id", Value.stringTagValue(traceId + "1"))
.tag("span_id", Value.stringTagValue("span-1"))
.tag("service_name", Value.stringTagValue(serviceName))
@@ -191,7 +191,7 @@ public class ITTraceTest extends BanyanDBClientTestCI {
.span("span-data-1".getBytes())
.version(1L);
- TraceWrite trace2 = client.createTraceWrite(groupName, traceName,
baseTime.plusSeconds(60).toEpochMilli())
+ TraceWrite trace2 = client.createTraceWrite(groupName, traceName)
.tag("trace_id", Value.stringTagValue(traceId + "2"))
.tag("span_id", Value.stringTagValue("span-2"))
.tag("service_name", Value.stringTagValue(serviceName))
@@ -199,7 +199,7 @@ public class ITTraceTest extends BanyanDBClientTestCI {
.span("span-data-2".getBytes())
.version(1L);
- TraceWrite trace3 = client.createTraceWrite(groupName, traceName,
baseTime.plusSeconds(120).toEpochMilli())
+ TraceWrite trace3 = client.createTraceWrite(groupName, traceName)
.tag("trace_id", Value.stringTagValue(traceId + "3"))
.tag("span_id", Value.stringTagValue("span-3"))
.tag("service_name", Value.stringTagValue(serviceName))