This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch add-write-obs
in repository 
https://gitbox.apache.org/repos/asf/skywalking-banyandb-java-client.git

commit c6955ba4c60156f88bbfd9108f66aa9897fed0a3
Author: Wu Sheng <wu.sh...@foxmail.com>
AuthorDate: Tue Aug 26 16:31:11 2025 -0700

    Add write obs for client.
---
 CHANGES.md                                         |  1 +
 pom.xml                                            |  8 +++-
 .../v1/client/AbstractBulkWriteProcessor.java      |  7 ++-
 .../banyandb/v1/client/BanyanDBClient.java         | 51 +++++++++++++++++++---
 .../v1/client/MeasureBulkWriteProcessor.java       | 38 +++++++++++-----
 .../skywalking/banyandb/v1/client/Options.java     | 16 +++++--
 .../v1/client/StreamBulkWriteProcessor.java        | 21 ++++++++-
 7 files changed, 118 insertions(+), 24 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index e368bba..1e19694 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -13,6 +13,7 @@ Release Notes.
 * Add replicas configuration to the API: introduce `replicas` in 
LifecycleStage and ResourceOpts to support high availability.
 * Simplify TLS options: remove unsupported mTLS client certificate settings 
from Options and DefaultChannelFactory; trust CA is still supported.
 * Support auth with username and password.
+* Update gRPC to 1.75.0.
 
 0.8.0
 ------------------
diff --git a/pom.xml b/pom.xml
index e6e4615..646c2f7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -82,7 +82,7 @@
         <!-- core lib dependency -->
         <bytebuddy.version>1.10.19</bytebuddy.version>
         <!-- grpc version should align with the Skywalking main repo -->
-        <grpc.version>1.63.0</grpc.version>
+        <grpc.version>1.75.0</grpc.version>
         <protoc.version>3.25.3</protoc.version>
         <os-maven-plugin.version>1.7.1</os-maven-plugin.version>
         <protobuf-maven-plugin.version>0.6.1</protobuf-maven-plugin.version>
@@ -95,6 +95,7 @@
         <!-- necessary for Java 9+ -->
         
<org.apache.tomcat.annotations-api.version>6.0.53</org.apache.tomcat.annotations-api.version>
         <slf4j.version>1.7.36</slf4j.version>
+        <prometheus.client.version>0.16.0</prometheus.client.version>
 
         <!-- Plugin versions -->
         <docker.plugin.version>0.4.13</docker.plugin.version>
@@ -161,6 +162,11 @@
             <artifactId>pgv-java-stub</artifactId>
             <version>${bufbuild.protoc-gen-validate.version}</version>
         </dependency>
+        <dependency>
+            <groupId>io.prometheus</groupId>
+            <artifactId>simpleclient</artifactId>
+            <version>${prometheus.client.version}</version>
+        </dependency>
 
         <dependency>
             <groupId>com.google.auto.value</groupId>
diff --git 
a/src/main/java/org/apache/skywalking/banyandb/v1/client/AbstractBulkWriteProcessor.java
 
b/src/main/java/org/apache/skywalking/banyandb/v1/client/AbstractBulkWriteProcessor.java
index d327207..5c66cda 100644
--- 
a/src/main/java/org/apache/skywalking/banyandb/v1/client/AbstractBulkWriteProcessor.java
+++ 
b/src/main/java/org/apache/skywalking/banyandb/v1/client/AbstractBulkWriteProcessor.java
@@ -21,6 +21,7 @@ package org.apache.skywalking.banyandb.v1.client;
 import com.google.auto.value.AutoValue;
 import io.grpc.stub.AbstractAsyncStub;
 import io.grpc.stub.StreamObserver;
+import io.prometheus.client.Histogram;
 import java.io.Closeable;
 import java.util.ArrayList;
 import java.util.List;
@@ -134,14 +135,16 @@ public abstract class AbstractBulkWriteProcessor<REQ 
extends com.google.protobuf
 
         final List<Holder> batch = new ArrayList<>(requests.size());
         requests.drainTo(batch);
-        final CompletableFuture<Void> future = doFlush(batch);
+        final CompletableFuture<Void> future = doObservedFlush(batch);
         future.whenComplete((v, t) -> semaphore.release());
         future.join();
         lastFlushTS = System.currentTimeMillis();
 
     }
 
-    protected CompletableFuture<Void> doFlush(final List<Holder> data) {
+    protected abstract CompletableFuture<Void> doObservedFlush(final 
List<Holder> data);
+
+    protected CompletableFuture<Void> doFlush(final List<Holder> data, 
Histogram.Timer timer) {
         // The batch is used to control the completion of the flush operation.
         // There is at most one error per batch,
         // because the database server would terminate the batch process when 
the first error occurs.
diff --git 
a/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java 
b/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java
index 7d3ddd0..b238d9c 100644
--- a/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java
@@ -27,6 +27,7 @@ import io.grpc.ClientInterceptors;
 import io.grpc.ManagedChannel;
 import io.grpc.Status;
 import io.grpc.stub.StreamObserver;
+import io.prometheus.client.Histogram;
 import java.time.ZoneOffset;
 import java.time.ZonedDateTime;
 import lombok.AccessLevel;
@@ -101,6 +102,7 @@ import static 
com.google.common.base.Preconditions.checkState;
 @Slf4j
 public class BanyanDBClient implements Closeable {
     public static final ZonedDateTime DEFAULT_EXPIRE_AT = 
ZonedDateTime.of(2099, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC);
+    private static Histogram WRITE_HISTOGRAM;
     private final String[] targets;
     /**
      * Options for server connection.
@@ -146,6 +148,16 @@ public class BanyanDBClient implements Closeable {
      */
     private final MetadataCache metadataCache;
 
+    static {
+        // init prometheus metric
+        WRITE_HISTOGRAM = Histogram.build()
+                                   .name("banyandb_write_latency_seconds")
+                                   .help("BanyanDB Bulk Write latency in 
seconds")
+                                   .buckets(0.005, 0.01, 0.025, 0.05, 0.1, 
0.25, 0.5, 1.0, 2.5, 5.0, 10.0)
+                                   .labelNames("catalog", "operation", 
"instanceID")
+                                   .register();
+    }
+
     /**
      * Create a BanyanDB client instance with a default options.
      *
@@ -234,6 +246,13 @@ public class BanyanDBClient implements Closeable {
     public CompletableFuture<Void> write(StreamWrite streamWrite) {
         checkState(this.streamServiceStub != null, "stream service is null");
 
+        Histogram.Timer timer
+            = WRITE_HISTOGRAM.labels(
+                                "stream",
+                                "single_write", // single write for non-bulk 
operation.
+                                
options.getPrometheusMetricsOpts().getClientID()
+                            )
+                             .startTimer();
         CompletableFuture<Void> future = new CompletableFuture<>();
         final StreamObserver<BanyandbStream.WriteRequest> 
writeRequestStreamObserver
                 = this.streamServiceStub
@@ -279,12 +298,14 @@ public class BanyanDBClient implements Closeable {
 
                             @Override
                             public void onError(Throwable throwable) {
+                                timer.observeDuration();
                                 log.error("Error occurs in flushing streams.", 
throwable);
                                 future.completeExceptionally(throwable);
                             }
 
                             @Override
                             public void onCompleted() {
+                                timer.observeDuration();
                                 if (responseException == null) {
                                     future.complete(null);
                                 } else {
@@ -313,7 +334,7 @@ public class BanyanDBClient implements Closeable {
     public StreamBulkWriteProcessor buildStreamWriteProcessor(int maxBulkSize, 
int flushInterval, int concurrency, int timeout) {
         checkState(this.streamServiceStub != null, "stream service is null");
 
-        return new StreamBulkWriteProcessor(this, maxBulkSize, flushInterval, 
concurrency, timeout);
+        return new StreamBulkWriteProcessor(this, maxBulkSize, flushInterval, 
concurrency, timeout, WRITE_HISTOGRAM, options);
     }
 
     /**
@@ -329,7 +350,7 @@ public class BanyanDBClient implements Closeable {
     public MeasureBulkWriteProcessor buildMeasureWriteProcessor(int 
maxBulkSize, int flushInterval, int concurrency, int timeout) {
         checkState(this.measureServiceStub != null, "measure service is null");
 
-        return new MeasureBulkWriteProcessor(this, maxBulkSize, flushInterval, 
concurrency, timeout);
+        return new MeasureBulkWriteProcessor(this, maxBulkSize, flushInterval, 
concurrency, timeout, WRITE_HISTOGRAM, options);
     }
 
     /**
@@ -827,7 +848,7 @@ public class BanyanDBClient implements Closeable {
 
     /**
      * Define a new property.
-     * 
+     *
      * @param property the property to be stored in the BanyanBD
      * @throws BanyanDBException if the property is invalid
      */
@@ -889,7 +910,13 @@ public class BanyanDBClient implements Closeable {
      */
     public ApplyResponse apply(Property property) throws BanyanDBException {
         PropertyStore store = new PropertyStore(checkNotNull(this.channel));
-        return store.apply(property);
+        try (Histogram.Timer timer = WRITE_HISTOGRAM.labels(
+            "property",
+            "single_write",
+            options.getPrometheusMetricsOpts().getClientID()
+        ).startTimer()) {
+            return store.apply(property);
+        }
     }
 
     /**
@@ -901,7 +928,13 @@ public class BanyanDBClient implements Closeable {
     public ApplyResponse apply(Property property, Strategy strategy) throws
             BanyanDBException {
         PropertyStore store = new PropertyStore(checkNotNull(this.channel));
-        return store.apply(property, strategy);
+        try (Histogram.Timer timer = WRITE_HISTOGRAM.labels(
+            "property",
+            "single_write",
+            options.getPrometheusMetricsOpts().getClientID()
+        ).startTimer()) {
+            return store.apply(property, strategy);
+        }
     }
 
     /**
@@ -926,7 +959,13 @@ public class BanyanDBClient implements Closeable {
     public DeleteResponse deleteProperty(String group, String name, String id) 
throws
             BanyanDBException {
         PropertyStore store = new PropertyStore(checkNotNull(this.channel));
-        return store.delete(group, name, id);
+        try (Histogram.Timer timer = WRITE_HISTOGRAM.labels(
+            "property",
+            "delete",
+            options.getPrometheusMetricsOpts().getClientID()
+        ).startTimer()) {
+            return store.delete(group, name, id);
+        }
     }
 
     /**
diff --git 
a/src/main/java/org/apache/skywalking/banyandb/v1/client/MeasureBulkWriteProcessor.java
 
b/src/main/java/org/apache/skywalking/banyandb/v1/client/MeasureBulkWriteProcessor.java
index fbc71a7..ce17e66 100644
--- 
a/src/main/java/org/apache/skywalking/banyandb/v1/client/MeasureBulkWriteProcessor.java
+++ 
b/src/main/java/org/apache/skywalking/banyandb/v1/client/MeasureBulkWriteProcessor.java
@@ -19,6 +19,8 @@
 package org.apache.skywalking.banyandb.v1.client;
 
 import io.grpc.stub.StreamObserver;
+import io.prometheus.client.Histogram;
+import java.util.List;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.skywalking.banyandb.common.v1.BanyandbCommon;
@@ -42,25 +44,29 @@ import java.util.concurrent.CompletableFuture;
 public class MeasureBulkWriteProcessor extends 
AbstractBulkWriteProcessor<BanyandbMeasure.WriteRequest,
         MeasureServiceGrpc.MeasureServiceStub> {
     private final BanyanDBClient client;
+    private final Histogram writeHistogram;
+    private final Options options;
 
     /**
      * Create the processor.
      *
-     * @param client        the client
-     * @param maxBulkSize   the max bulk size for the flush operation
-     * @param flushInterval if given maxBulkSize is not reached in this 
period, the flush would be trigger
-     *                      automatically. Unit is second.
-     * @param timeout       network timeout threshold in seconds.
-     * @param concurrency   the number of concurrency would run for the flush 
max.
+     * @param client         the client
+     * @param maxBulkSize    the max bulk size for the flush operation
+     * @param flushInterval  if given maxBulkSize is not reached in this 
period, the flush would be trigger
+     *                       automatically. Unit is second.
+     * @param concurrency    the number of concurrency would run for the flush 
max.
+     * @param timeout        network timeout threshold in seconds.
      */
     protected MeasureBulkWriteProcessor(
-            final BanyanDBClient client,
-            final int maxBulkSize,
-            final int flushInterval,
-            final int concurrency,
-            final int timeout) {
+        final BanyanDBClient client,
+        final int maxBulkSize,
+        final int flushInterval,
+        final int concurrency,
+        final int timeout, final Histogram writeHistogram, final Options 
options) {
         super(client.getMeasureServiceStub(), "MeasureBulkWriteProcessor", 
maxBulkSize, flushInterval, concurrency, timeout);
         this.client = client;
+        this.writeHistogram = writeHistogram;
+        this.options = options;
     }
 
     @Override
@@ -105,4 +111,14 @@ public class MeasureBulkWriteProcessor extends 
AbstractBulkWriteProcessor<Banyan
             }
         });
     }
+
+    @Override
+    protected CompletableFuture<Void> doObservedFlush(final List<Holder> data) 
{
+        Histogram.Timer timer = writeHistogram.labels(
+            "measure",
+            "bulk_write",
+            options.getPrometheusMetricsOpts().getClientID()
+        ).startTimer();
+        return super.doFlush(data, timer);
+    }
 }
diff --git 
a/src/main/java/org/apache/skywalking/banyandb/v1/client/Options.java 
b/src/main/java/org/apache/skywalking/banyandb/v1/client/Options.java
index 5355168..8bf50e4 100644
--- a/src/main/java/org/apache/skywalking/banyandb/v1/client/Options.java
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/Options.java
@@ -63,14 +63,24 @@ public class Options {
      * Basic Auth: password of BanyanDB server
      */
     private String password = "";
+    /**
+     * Enable Prometheus metrics
+     */
+    private PrometheusMetricsOpts prometheusMetricsOpts = new 
PrometheusMetricsOpts();
 
     public Options() {
     }
 
     ChannelManagerSettings buildChannelManagerSettings() {
         return ChannelManagerSettings.newBuilder()
-                .setRefreshInterval(this.refreshInterval)
-                .setForceReconnectionThreshold(this.forceReconnectionThreshold)
-                .build();
+                                     .setRefreshInterval(this.refreshInterval)
+                                     
.setForceReconnectionThreshold(this.forceReconnectionThreshold)
+                                     .build();
+    }
+
+    public static class PrometheusMetricsOpts {
+        @Setter(AccessLevel.PUBLIC)
+        @Getter
+        private String clientID = "default";
     }
 }
diff --git 
a/src/main/java/org/apache/skywalking/banyandb/v1/client/StreamBulkWriteProcessor.java
 
b/src/main/java/org/apache/skywalking/banyandb/v1/client/StreamBulkWriteProcessor.java
index ce2ab61..01f042a 100644
--- 
a/src/main/java/org/apache/skywalking/banyandb/v1/client/StreamBulkWriteProcessor.java
+++ 
b/src/main/java/org/apache/skywalking/banyandb/v1/client/StreamBulkWriteProcessor.java
@@ -20,6 +20,8 @@ package org.apache.skywalking.banyandb.v1.client;
 
 import io.grpc.stub.StreamObserver;
 
+import io.prometheus.client.Histogram;
+import java.util.List;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.skywalking.banyandb.common.v1.BanyandbCommon;
@@ -43,6 +45,8 @@ import java.util.concurrent.CompletableFuture;
 public class StreamBulkWriteProcessor extends 
AbstractBulkWriteProcessor<BanyandbStream.WriteRequest,
         StreamServiceGrpc.StreamServiceStub> {
     private final BanyanDBClient client;
+    private final Histogram writeHistogram;
+    private final Options options;
 
     /**
      * Create the processor.
@@ -59,9 +63,13 @@ public class StreamBulkWriteProcessor extends 
AbstractBulkWriteProcessor<Banyand
             final int maxBulkSize,
             final int flushInterval,
             final int concurrency,
-            final int timeout) {
+            final int timeout,
+            final Histogram writeHistogram,
+            final Options options) {
         super(client.getStreamServiceStub(), "StreamBulkWriteProcessor", 
maxBulkSize, flushInterval, concurrency, timeout);
         this.client = client;
+        this.writeHistogram = writeHistogram;
+        this.options = options;
     }
 
     @Override
@@ -106,4 +114,15 @@ public class StreamBulkWriteProcessor extends 
AbstractBulkWriteProcessor<Banyand
                     }
                 });
     }
+
+    @Override
+    protected CompletableFuture<Void> doObservedFlush(final List<Holder> data) 
{
+        Histogram.Timer timer = writeHistogram.labels(
+            "stream",
+            "bulk_write",
+            options.getPrometheusMetricsOpts().getClientID()
+        ).startTimer();
+        return super.doFlush(data, timer);
+    }
 }
+

Reply via email to