This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch feat/trace-response in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb-java-client.git
commit 8488614a5923f1a543d325f4acc3fbf6a1c24fbf Author: Gao Hongtao <hanahm...@gmail.com> AuthorDate: Tue Sep 9 20:42:03 2025 +0800 Restore changes Signed-off-by: Gao Hongtao <hanahm...@gmail.com> --- .../banyandb/v1/client/BanyanDBClient.java | 29 ++++++---------------- .../skywalking/banyandb/v1/client/StreamWrite.java | 10 -------- .../v1/client/ITBanyanDBStreamQueryTests.java | 3 ++- 3 files changed, 9 insertions(+), 33 deletions(-) diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java index 0cd45bb..bcae829 100644 --- a/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java +++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java @@ -400,30 +400,15 @@ public class BanyanDBClient implements Closeable { } /** - * Build a StreamWrite request. + * Build a trace bulk write processor. * - * @param group the group of the stream - * @param name the name of the stream - * @param elementId the primary key of the stream - * @param timestamp the timestamp of the stream - * @return the request to be built + * @param maxBulkSize the max size of each flush. The actual size is determined by the length of byte array. + * @param flushInterval if given maxBulkSize is not reached in this period, the flush would be trigger + * automatically. Unit is second. + * @param concurrency the number of concurrency would run for the flush max. + * @param timeout network timeout threshold in seconds. + * @return trace bulk write processor */ - public StreamWrite createStreamWrite(String group, String name, final String elementId, long timestamp) throws BanyanDBException { - Preconditions.checkArgument(!Strings.isNullOrEmpty(group)); - Preconditions.checkArgument(!Strings.isNullOrEmpty(name)); - return new StreamWrite(this.metadataCache.findStreamMetadata(group, name), elementId, timestamp); - } - - /** - * Build a trace bulk write processor. - * - * @param maxBulkSize the max size of each flush. The actual size is determined by the length of byte array. - * @param flushInterval if given maxBulkSize is not reached in this period, the flush would be trigger - * automatically. Unit is second. - * @param concurrency the number of concurrency would run for the flush max. - * @param timeout network timeout threshold in seconds. - * @return trace bulk write processor - */ public TraceBulkWriteProcessor buildTraceWriteProcessor(int maxBulkSize, int flushInterval, int concurrency, int timeout) { checkState(this.traceServiceStub != null, "trace service is null"); diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/StreamWrite.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/StreamWrite.java index ad84b3e..c10d70d 100644 --- a/src/main/java/org/apache/skywalking/banyandb/v1/client/StreamWrite.java +++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/StreamWrite.java @@ -50,16 +50,6 @@ public class StreamWrite extends AbstractWrite<BanyandbStream.WriteRequest> { this.elementId = elementId; } - /** - * Create a StreamWrite with initial timestamp. - * - * @param timestamp in milliseconds - */ - public StreamWrite(MetadataCache.EntityMetadata streamMetadata, String elementId, long timestamp) { - this(streamMetadata, elementId); - this.timestamp = Optional.of(timestamp); - } - @Override public StreamWrite tag(String tagName, Serializable<BanyandbModel.TagValue> tagValue) throws BanyanDBException { return (StreamWrite) super.tag(tagName, tagValue); diff --git a/src/test/java/org/apache/skywalking/banyandb/v1/client/ITBanyanDBStreamQueryTests.java b/src/test/java/org/apache/skywalking/banyandb/v1/client/ITBanyanDBStreamQueryTests.java index c4f2573..c394138 100644 --- a/src/test/java/org/apache/skywalking/banyandb/v1/client/ITBanyanDBStreamQueryTests.java +++ b/src/test/java/org/apache/skywalking/banyandb/v1/client/ITBanyanDBStreamQueryTests.java @@ -94,7 +94,7 @@ public class ITBanyanDBStreamQueryTests extends BanyanDBClientTestCI { String dbType = "SQL"; String dbInstance = "127.0.0.1:3306"; - StreamWrite streamWrite = client.createStreamWrite("sw_record", "trace", segmentId, now.toEpochMilli()) + StreamWrite streamWrite = client.createStreamWrite("sw_record", "trace", segmentId) .tag("data_binary", Value.binaryTagValue(byteData)) .tag("trace_id", Value.stringTagValue(traceId)) // 0 .tag("state", Value.longTagValue(state)) // 1 @@ -109,6 +109,7 @@ public class ITBanyanDBStreamQueryTests extends BanyanDBClientTestCI { .tag("mq.broker", Value.stringTagValue(broker)) // 10 .tag("mq.topic", Value.stringTagValue(topic)) // 11 .tag("mq.queue", Value.stringTagValue(queue)); // 12 + streamWrite.setTimestamp(now.toEpochMilli()); CompletableFuture<Void> f = processor.add(streamWrite); f.exceptionally(exp -> {