This is an automated email from the ASF dual-hosted git repository. wusheng pushed a commit to branch add-write-obs in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb-java-client.git
commit c6955ba4c60156f88bbfd9108f66aa9897fed0a3 Author: Wu Sheng <wu.sh...@foxmail.com> AuthorDate: Tue Aug 26 16:31:11 2025 -0700 Add write obs for client. --- CHANGES.md | 1 + pom.xml | 8 +++- .../v1/client/AbstractBulkWriteProcessor.java | 7 ++- .../banyandb/v1/client/BanyanDBClient.java | 51 +++++++++++++++++++--- .../v1/client/MeasureBulkWriteProcessor.java | 38 +++++++++++----- .../skywalking/banyandb/v1/client/Options.java | 16 +++++-- .../v1/client/StreamBulkWriteProcessor.java | 21 ++++++++- 7 files changed, 118 insertions(+), 24 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index e368bba..1e19694 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -13,6 +13,7 @@ Release Notes. * Add replicas configuration to the API: introduce `replicas` in LifecycleStage and ResourceOpts to support high availability. * Simplify TLS options: remove unsupported mTLS client certificate settings from Options and DefaultChannelFactory; trust CA is still supported. * Support auth with username and password. +* Update gRPC to 1.75.0. 0.8.0 ------------------ diff --git a/pom.xml b/pom.xml index e6e4615..646c2f7 100644 --- a/pom.xml +++ b/pom.xml @@ -82,7 +82,7 @@ <!-- core lib dependency --> <bytebuddy.version>1.10.19</bytebuddy.version> <!-- grpc version should align with the Skywalking main repo --> - <grpc.version>1.63.0</grpc.version> + <grpc.version>1.75.0</grpc.version> <protoc.version>3.25.3</protoc.version> <os-maven-plugin.version>1.7.1</os-maven-plugin.version> <protobuf-maven-plugin.version>0.6.1</protobuf-maven-plugin.version> @@ -95,6 +95,7 @@ <!-- necessary for Java 9+ --> <org.apache.tomcat.annotations-api.version>6.0.53</org.apache.tomcat.annotations-api.version> <slf4j.version>1.7.36</slf4j.version> + <prometheus.client.version>0.16.0</prometheus.client.version> <!-- Plugin versions --> <docker.plugin.version>0.4.13</docker.plugin.version> @@ -161,6 +162,11 @@ <artifactId>pgv-java-stub</artifactId> <version>${bufbuild.protoc-gen-validate.version}</version> </dependency> + <dependency> + <groupId>io.prometheus</groupId> + <artifactId>simpleclient</artifactId> + <version>${prometheus.client.version}</version> + </dependency> <dependency> <groupId>com.google.auto.value</groupId> diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/AbstractBulkWriteProcessor.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/AbstractBulkWriteProcessor.java index d327207..5c66cda 100644 --- a/src/main/java/org/apache/skywalking/banyandb/v1/client/AbstractBulkWriteProcessor.java +++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/AbstractBulkWriteProcessor.java @@ -21,6 +21,7 @@ package org.apache.skywalking.banyandb.v1.client; import com.google.auto.value.AutoValue; import io.grpc.stub.AbstractAsyncStub; import io.grpc.stub.StreamObserver; +import io.prometheus.client.Histogram; import java.io.Closeable; import java.util.ArrayList; import java.util.List; @@ -134,14 +135,16 @@ public abstract class AbstractBulkWriteProcessor<REQ extends com.google.protobuf final List<Holder> batch = new ArrayList<>(requests.size()); requests.drainTo(batch); - final CompletableFuture<Void> future = doFlush(batch); + final CompletableFuture<Void> future = doObservedFlush(batch); future.whenComplete((v, t) -> semaphore.release()); future.join(); lastFlushTS = System.currentTimeMillis(); } - protected CompletableFuture<Void> doFlush(final List<Holder> data) { + protected abstract CompletableFuture<Void> doObservedFlush(final List<Holder> data); + + protected CompletableFuture<Void> doFlush(final List<Holder> data, Histogram.Timer timer) { // The batch is used to control the completion of the flush operation. // There is at most one error per batch, // because the database server would terminate the batch process when the first error occurs. diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java index 7d3ddd0..b238d9c 100644 --- a/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java +++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java @@ -27,6 +27,7 @@ import io.grpc.ClientInterceptors; import io.grpc.ManagedChannel; import io.grpc.Status; import io.grpc.stub.StreamObserver; +import io.prometheus.client.Histogram; import java.time.ZoneOffset; import java.time.ZonedDateTime; import lombok.AccessLevel; @@ -101,6 +102,7 @@ import static com.google.common.base.Preconditions.checkState; @Slf4j public class BanyanDBClient implements Closeable { public static final ZonedDateTime DEFAULT_EXPIRE_AT = ZonedDateTime.of(2099, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC); + private static Histogram WRITE_HISTOGRAM; private final String[] targets; /** * Options for server connection. @@ -146,6 +148,16 @@ public class BanyanDBClient implements Closeable { */ private final MetadataCache metadataCache; + static { + // init prometheus metric + WRITE_HISTOGRAM = Histogram.build() + .name("banyandb_write_latency_seconds") + .help("BanyanDB Bulk Write latency in seconds") + .buckets(0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0) + .labelNames("catalog", "operation", "instanceID") + .register(); + } + /** * Create a BanyanDB client instance with a default options. * @@ -234,6 +246,13 @@ public class BanyanDBClient implements Closeable { public CompletableFuture<Void> write(StreamWrite streamWrite) { checkState(this.streamServiceStub != null, "stream service is null"); + Histogram.Timer timer + = WRITE_HISTOGRAM.labels( + "stream", + "single_write", // single write for non-bulk operation. + options.getPrometheusMetricsOpts().getClientID() + ) + .startTimer(); CompletableFuture<Void> future = new CompletableFuture<>(); final StreamObserver<BanyandbStream.WriteRequest> writeRequestStreamObserver = this.streamServiceStub @@ -279,12 +298,14 @@ public class BanyanDBClient implements Closeable { @Override public void onError(Throwable throwable) { + timer.observeDuration(); log.error("Error occurs in flushing streams.", throwable); future.completeExceptionally(throwable); } @Override public void onCompleted() { + timer.observeDuration(); if (responseException == null) { future.complete(null); } else { @@ -313,7 +334,7 @@ public class BanyanDBClient implements Closeable { public StreamBulkWriteProcessor buildStreamWriteProcessor(int maxBulkSize, int flushInterval, int concurrency, int timeout) { checkState(this.streamServiceStub != null, "stream service is null"); - return new StreamBulkWriteProcessor(this, maxBulkSize, flushInterval, concurrency, timeout); + return new StreamBulkWriteProcessor(this, maxBulkSize, flushInterval, concurrency, timeout, WRITE_HISTOGRAM, options); } /** @@ -329,7 +350,7 @@ public class BanyanDBClient implements Closeable { public MeasureBulkWriteProcessor buildMeasureWriteProcessor(int maxBulkSize, int flushInterval, int concurrency, int timeout) { checkState(this.measureServiceStub != null, "measure service is null"); - return new MeasureBulkWriteProcessor(this, maxBulkSize, flushInterval, concurrency, timeout); + return new MeasureBulkWriteProcessor(this, maxBulkSize, flushInterval, concurrency, timeout, WRITE_HISTOGRAM, options); } /** @@ -827,7 +848,7 @@ public class BanyanDBClient implements Closeable { /** * Define a new property. - * + * * @param property the property to be stored in the BanyanBD * @throws BanyanDBException if the property is invalid */ @@ -889,7 +910,13 @@ public class BanyanDBClient implements Closeable { */ public ApplyResponse apply(Property property) throws BanyanDBException { PropertyStore store = new PropertyStore(checkNotNull(this.channel)); - return store.apply(property); + try (Histogram.Timer timer = WRITE_HISTOGRAM.labels( + "property", + "single_write", + options.getPrometheusMetricsOpts().getClientID() + ).startTimer()) { + return store.apply(property); + } } /** @@ -901,7 +928,13 @@ public class BanyanDBClient implements Closeable { public ApplyResponse apply(Property property, Strategy strategy) throws BanyanDBException { PropertyStore store = new PropertyStore(checkNotNull(this.channel)); - return store.apply(property, strategy); + try (Histogram.Timer timer = WRITE_HISTOGRAM.labels( + "property", + "single_write", + options.getPrometheusMetricsOpts().getClientID() + ).startTimer()) { + return store.apply(property, strategy); + } } /** @@ -926,7 +959,13 @@ public class BanyanDBClient implements Closeable { public DeleteResponse deleteProperty(String group, String name, String id) throws BanyanDBException { PropertyStore store = new PropertyStore(checkNotNull(this.channel)); - return store.delete(group, name, id); + try (Histogram.Timer timer = WRITE_HISTOGRAM.labels( + "property", + "delete", + options.getPrometheusMetricsOpts().getClientID() + ).startTimer()) { + return store.delete(group, name, id); + } } /** diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/MeasureBulkWriteProcessor.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/MeasureBulkWriteProcessor.java index fbc71a7..ce17e66 100644 --- a/src/main/java/org/apache/skywalking/banyandb/v1/client/MeasureBulkWriteProcessor.java +++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/MeasureBulkWriteProcessor.java @@ -19,6 +19,8 @@ package org.apache.skywalking.banyandb.v1.client; import io.grpc.stub.StreamObserver; +import io.prometheus.client.Histogram; +import java.util.List; import lombok.extern.slf4j.Slf4j; import org.apache.skywalking.banyandb.common.v1.BanyandbCommon; @@ -42,25 +44,29 @@ import java.util.concurrent.CompletableFuture; public class MeasureBulkWriteProcessor extends AbstractBulkWriteProcessor<BanyandbMeasure.WriteRequest, MeasureServiceGrpc.MeasureServiceStub> { private final BanyanDBClient client; + private final Histogram writeHistogram; + private final Options options; /** * Create the processor. * - * @param client the client - * @param maxBulkSize the max bulk size for the flush operation - * @param flushInterval if given maxBulkSize is not reached in this period, the flush would be trigger - * automatically. Unit is second. - * @param timeout network timeout threshold in seconds. - * @param concurrency the number of concurrency would run for the flush max. + * @param client the client + * @param maxBulkSize the max bulk size for the flush operation + * @param flushInterval if given maxBulkSize is not reached in this period, the flush would be trigger + * automatically. Unit is second. + * @param concurrency the number of concurrency would run for the flush max. + * @param timeout network timeout threshold in seconds. */ protected MeasureBulkWriteProcessor( - final BanyanDBClient client, - final int maxBulkSize, - final int flushInterval, - final int concurrency, - final int timeout) { + final BanyanDBClient client, + final int maxBulkSize, + final int flushInterval, + final int concurrency, + final int timeout, final Histogram writeHistogram, final Options options) { super(client.getMeasureServiceStub(), "MeasureBulkWriteProcessor", maxBulkSize, flushInterval, concurrency, timeout); this.client = client; + this.writeHistogram = writeHistogram; + this.options = options; } @Override @@ -105,4 +111,14 @@ public class MeasureBulkWriteProcessor extends AbstractBulkWriteProcessor<Banyan } }); } + + @Override + protected CompletableFuture<Void> doObservedFlush(final List<Holder> data) { + Histogram.Timer timer = writeHistogram.labels( + "measure", + "bulk_write", + options.getPrometheusMetricsOpts().getClientID() + ).startTimer(); + return super.doFlush(data, timer); + } } diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/Options.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/Options.java index 5355168..8bf50e4 100644 --- a/src/main/java/org/apache/skywalking/banyandb/v1/client/Options.java +++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/Options.java @@ -63,14 +63,24 @@ public class Options { * Basic Auth: password of BanyanDB server */ private String password = ""; + /** + * Enable Prometheus metrics + */ + private PrometheusMetricsOpts prometheusMetricsOpts = new PrometheusMetricsOpts(); public Options() { } ChannelManagerSettings buildChannelManagerSettings() { return ChannelManagerSettings.newBuilder() - .setRefreshInterval(this.refreshInterval) - .setForceReconnectionThreshold(this.forceReconnectionThreshold) - .build(); + .setRefreshInterval(this.refreshInterval) + .setForceReconnectionThreshold(this.forceReconnectionThreshold) + .build(); + } + + public static class PrometheusMetricsOpts { + @Setter(AccessLevel.PUBLIC) + @Getter + private String clientID = "default"; } } diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/StreamBulkWriteProcessor.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/StreamBulkWriteProcessor.java index ce2ab61..01f042a 100644 --- a/src/main/java/org/apache/skywalking/banyandb/v1/client/StreamBulkWriteProcessor.java +++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/StreamBulkWriteProcessor.java @@ -20,6 +20,8 @@ package org.apache.skywalking.banyandb.v1.client; import io.grpc.stub.StreamObserver; +import io.prometheus.client.Histogram; +import java.util.List; import lombok.extern.slf4j.Slf4j; import org.apache.skywalking.banyandb.common.v1.BanyandbCommon; @@ -43,6 +45,8 @@ import java.util.concurrent.CompletableFuture; public class StreamBulkWriteProcessor extends AbstractBulkWriteProcessor<BanyandbStream.WriteRequest, StreamServiceGrpc.StreamServiceStub> { private final BanyanDBClient client; + private final Histogram writeHistogram; + private final Options options; /** * Create the processor. @@ -59,9 +63,13 @@ public class StreamBulkWriteProcessor extends AbstractBulkWriteProcessor<Banyand final int maxBulkSize, final int flushInterval, final int concurrency, - final int timeout) { + final int timeout, + final Histogram writeHistogram, + final Options options) { super(client.getStreamServiceStub(), "StreamBulkWriteProcessor", maxBulkSize, flushInterval, concurrency, timeout); this.client = client; + this.writeHistogram = writeHistogram; + this.options = options; } @Override @@ -106,4 +114,15 @@ public class StreamBulkWriteProcessor extends AbstractBulkWriteProcessor<Banyand } }); } + + @Override + protected CompletableFuture<Void> doObservedFlush(final List<Holder> data) { + Histogram.Timer timer = writeHistogram.labels( + "stream", + "bulk_write", + options.getPrometheusMetricsOpts().getClientID() + ).startTimer(); + return super.doFlush(data, timer); + } } +