This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository 
https://gitbox.apache.org/repos/asf/skywalking-banyandb-java-client.git


The following commit(s) were added to refs/heads/main by this push:
     new 35eb8b8  Move write logic to OAP Server. (#101)
35eb8b8 is described below

commit 35eb8b8a48f8c5961a9754daa37d1037e8c024bf
Author: Wan Kai <[email protected]>
AuthorDate: Mon Oct 13 16:35:46 2025 +0800

    Move write logic to OAP Server. (#101)
---
 .../v1/client/AbstractBulkWriteProcessor.java      | 204 -------------------
 .../banyandb/v1/client/AbstractWrite.java          |   3 +-
 .../banyandb/v1/client/BanyanDBClient.java         | 225 +--------------------
 .../v1/client/MeasureBulkWriteProcessor.java       | 124 ------------
 .../v1/client/StreamBulkWriteProcessor.java        | 128 ------------
 .../v1/client/TraceBulkWriteProcessor.java         | 127 ------------
 .../v1/client/ITBanyanDBMeasureQueryTests.java     |  35 +++-
 .../v1/client/ITBanyanDBPropertyTests.java         |  25 ++-
 .../v1/client/ITBanyanDBStreamQueryTests.java      |  35 ++--
 .../skywalking/banyandb/v1/client/ITTraceTest.java |  71 +++++--
 10 files changed, 121 insertions(+), 856 deletions(-)

diff --git 
a/src/main/java/org/apache/skywalking/banyandb/v1/client/AbstractBulkWriteProcessor.java
 
b/src/main/java/org/apache/skywalking/banyandb/v1/client/AbstractBulkWriteProcessor.java
deleted file mode 100644
index d850cd1..0000000
--- 
a/src/main/java/org/apache/skywalking/banyandb/v1/client/AbstractBulkWriteProcessor.java
+++ /dev/null
@@ -1,204 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.banyandb.v1.client;
-
-import com.google.auto.value.AutoValue;
-import io.grpc.stub.AbstractAsyncStub;
-import io.grpc.stub.StreamObserver;
-import io.prometheus.client.Histogram;
-import java.io.Closeable;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-import lombok.SneakyThrows;
-import lombok.extern.slf4j.Slf4j;
-
-@Slf4j
-public abstract class AbstractBulkWriteProcessor<REQ extends 
com.google.protobuf.GeneratedMessageV3,
-    STUB extends AbstractAsyncStub<STUB>>
-    implements Runnable, Closeable {
-    private final STUB stub;
-    private final int maxBulkSize;
-    private final int flushInterval;
-    private final ArrayBlockingQueue<Holder> requests;
-    private final Semaphore semaphore;
-    private final long flushInternalInMillis;
-    private final ScheduledThreadPoolExecutor scheduler;
-    private final int timeout;
-    private volatile long lastFlushTS = 0;
-
-    /**
-     * Create the processor.
-     *
-     * @param stub          an implementation of {@link AbstractAsyncStub}
-     * @param processorName name of the processor for logging
-     * @param maxBulkSize   the max bulk size for the flush operation
-     * @param flushInterval if given maxBulkSize is not reached in this 
period, the flush would be trigger
-     *                      automatically. Unit is second.
-     * @param concurrency   the number of concurrency would run for the flush 
max.
-     * @param timeout       network timeout threshold in seconds.
-     */
-    protected AbstractBulkWriteProcessor(STUB stub,
-                                         String processorName,
-                                         int maxBulkSize,
-                                         int flushInterval,
-                                         int concurrency,
-                                         int timeout) {
-        this.stub = stub;
-        this.maxBulkSize = maxBulkSize;
-        this.flushInterval = flushInterval;
-        this.timeout = timeout;
-        requests = new ArrayBlockingQueue<>(maxBulkSize + 1);
-        this.semaphore = new Semaphore(concurrency > 0 ? concurrency : 1);
-
-        scheduler = new ScheduledThreadPoolExecutor(1, r -> {
-            final Thread thread = new Thread(r);
-            thread.setName("BanyanDB BulkProcessor");
-            return thread;
-        });
-        scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
-        scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
-        scheduler.setRemoveOnCancelPolicy(true);
-        flushInternalInMillis = flushInterval * 1000;
-        scheduler.scheduleWithFixedDelay(
-            this, 0, flushInterval, TimeUnit.SECONDS);
-    }
-
-    /**
-     * Add the measure to the bulk processor.
-     *
-     * @param writeEntity to add.
-     */
-    @SneakyThrows
-    public CompletableFuture<Void> add(AbstractWrite<REQ> writeEntity) {
-        final CompletableFuture<Void> f = new CompletableFuture<>();
-        requests.put(Holder.create(writeEntity, f));
-        flushIfNeeded();
-        return f;
-    }
-
-    public void run() {
-        try {
-            doPeriodicalFlush();
-        } catch (Throwable t) {
-            log.error("Failed to flush data to BanyanDB", t);
-        }
-    }
-
-    @SneakyThrows
-    protected void flushIfNeeded() {
-        if (requests.size() >= maxBulkSize) {
-            flush();
-        }
-    }
-
-    private void doPeriodicalFlush() {
-        if (System.currentTimeMillis() - lastFlushTS > flushInternalInMillis / 
2) {
-            // Run periodical flush if there is no `flushIfNeeded` executed in 
the second half of the flush period.
-            // Otherwise, wait for the next round. By default, the last 2 
seconds of the 5s period.
-            // This could avoid periodical flush running among 
bulks(controlled by bulkActions).
-            flush();
-        }
-    }
-
-    public void flush() {
-        if (requests.isEmpty()) {
-            return;
-        }
-
-        try {
-            semaphore.acquire();
-        } catch (InterruptedException e) {
-            log.error("Interrupted when trying to get semaphore to execute 
bulk requests", e);
-            return;
-        }
-
-        final List<Holder> batch = new ArrayList<>(requests.size());
-        requests.drainTo(batch);
-        final CompletableFuture<Void> future = doObservedFlush(batch);
-        future.whenComplete((v, t) -> semaphore.release());
-        future.join();
-        lastFlushTS = System.currentTimeMillis();
-
-    }
-
-    protected abstract CompletableFuture<Void> doObservedFlush(final 
List<Holder> data);
-
-    protected CompletableFuture<Void> doFlush(final List<Holder> data, 
Histogram.Timer timer) {
-        // The batch is used to control the completion of the flush operation.
-        // There is at most one error per batch,
-        // because the database server would terminate the batch process when 
the first error occurs.
-        final CompletableFuture<Void> batch = new CompletableFuture<>();
-        final StreamObserver<REQ> writeRequestStreamObserver
-            = this.buildStreamObserver(stub.withDeadlineAfter(timeout, 
TimeUnit.SECONDS), batch);
-
-        try {
-            data.forEach(h -> {
-                AbstractWrite<REQ> entity = (AbstractWrite<REQ>) 
h.writeEntity();
-                REQ request;
-                try {
-                    request = entity.build();
-                } catch (Throwable bt) {
-                    log.error("building the entity fails: {}", 
entity.toString(), bt);
-                    h.future().completeExceptionally(bt);
-                    return;
-                }
-                writeRequestStreamObserver.onNext(request);
-                h.future().complete(null);
-            });
-        } finally {
-            writeRequestStreamObserver.onCompleted();
-        }
-        batch.whenComplete((ignored, exp) -> {
-            timer.observeDuration();
-            if (exp != null) {
-                log.error("Failed to execute requests in bulk", exp);
-            }
-        });
-        return batch;
-    }
-
-    public void close() {
-        scheduler.shutdownNow();
-    }
-
-    protected abstract StreamObserver<REQ> buildStreamObserver(STUB stub, 
CompletableFuture<Void> batch);
-
-    @AutoValue
-    static abstract class Holder {
-        abstract AbstractWrite writeEntity();
-
-        abstract CompletableFuture<Void> future();
-
-        public static <REQ extends com.google.protobuf.GeneratedMessageV3> 
Holder create(AbstractWrite<REQ> writeEntity,
-                                                                               
          CompletableFuture<Void> future) {
-            future.whenComplete((v, t) -> {
-                if (t != null) {
-                    log.error("Failed to execute the request: {}", 
writeEntity.toString(), t);
-                }
-            });
-            return new 
AutoValue_AbstractBulkWriteProcessor_Holder(writeEntity, future);
-        }
-
-    }
-}
diff --git 
a/src/main/java/org/apache/skywalking/banyandb/v1/client/AbstractWrite.java 
b/src/main/java/org/apache/skywalking/banyandb/v1/client/AbstractWrite.java
index bae244d..0f29c45 100644
--- a/src/main/java/org/apache/skywalking/banyandb/v1/client/AbstractWrite.java
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/AbstractWrite.java
@@ -44,6 +44,7 @@ public abstract class AbstractWrite<P extends 
com.google.protobuf.GeneratedMessa
 
     protected final Object[] tags;
 
+    @Getter
     protected final MetadataCache.EntityMetadata entityMetadata;
 
     public AbstractWrite(MetadataCache.EntityMetadata entityMetadata, long 
timestamp) {
@@ -71,7 +72,7 @@ public abstract class AbstractWrite<P extends 
com.google.protobuf.GeneratedMessa
         return this;
     }
 
-    P build() {
+    public P build() {
         BanyandbCommon.Metadata metadata = BanyandbCommon.Metadata.newBuilder()
                 
.setGroup(entityMetadata.getGroup()).setName(entityMetadata.getName()).setModRevision(entityMetadata.getModRevision()).build();
 
diff --git 
a/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java 
b/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java
index bcae829..d9505c1 100644
--- a/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java
+++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java
@@ -26,11 +26,8 @@ import io.grpc.Channel;
 import io.grpc.ClientInterceptors;
 import io.grpc.ManagedChannel;
 import io.grpc.Status;
-import io.grpc.stub.StreamObserver;
-import io.prometheus.client.Histogram;
 import java.time.ZoneOffset;
 import java.time.ZonedDateTime;
-import lombok.AccessLevel;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.skywalking.banyandb.common.v1.BanyandbCommon;
@@ -44,12 +41,7 @@ import 
org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.IndexRule;
 import 
org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.IndexRuleBinding;
 import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.Subject;
 import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.Trace;
-import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
 import org.apache.skywalking.banyandb.property.v1.BanyandbProperty;
-import org.apache.skywalking.banyandb.property.v1.BanyandbProperty.Property;
-import 
org.apache.skywalking.banyandb.property.v1.BanyandbProperty.ApplyRequest.Strategy;
-import 
org.apache.skywalking.banyandb.property.v1.BanyandbProperty.ApplyResponse;
-import 
org.apache.skywalking.banyandb.property.v1.BanyandbProperty.DeleteResponse;
 import org.apache.skywalking.banyandb.measure.v1.BanyandbMeasure;
 import org.apache.skywalking.banyandb.measure.v1.MeasureServiceGrpc;
 import org.apache.skywalking.banyandb.stream.v1.BanyandbStream;
@@ -61,8 +53,6 @@ import 
org.apache.skywalking.banyandb.v1.client.grpc.HandleExceptionsWith;
 import org.apache.skywalking.banyandb.v1.client.grpc.channel.ChannelManager;
 import 
org.apache.skywalking.banyandb.v1.client.grpc.channel.DefaultChannelFactory;
 import 
org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException;
-import 
org.apache.skywalking.banyandb.v1.client.grpc.exception.InternalException;
-import 
org.apache.skywalking.banyandb.v1.client.grpc.exception.InvalidArgumentException;
 import org.apache.skywalking.banyandb.v1.client.metadata.GroupMetadataRegistry;
 import 
org.apache.skywalking.banyandb.v1.client.metadata.IndexRuleBindingMetadataRegistry;
 import 
org.apache.skywalking.banyandb.v1.client.metadata.IndexRuleMetadataRegistry;
@@ -81,12 +71,10 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
 
-import org.apache.skywalking.banyandb.v1.client.util.StatusUtil;
 import org.apache.skywalking.banyandb.v1.client.util.TimeUtils;
 
 import static com.google.common.base.Preconditions.checkNotNull;
@@ -106,47 +94,46 @@ import static 
com.google.common.base.Preconditions.checkState;
 @Slf4j
 public class BanyanDBClient implements Closeable {
     public static final ZonedDateTime DEFAULT_EXPIRE_AT = 
ZonedDateTime.of(2099, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC);
-    private static Histogram WRITE_HISTOGRAM;
     private final String[] targets;
     /**
      * Options for server connection.
      */
-    @Getter(value = AccessLevel.PACKAGE)
+    @Getter
     private final Options options;
     /**
      * gRPC connection.
      */
-    @Getter(value = AccessLevel.PACKAGE)
+    @Getter
     private volatile Channel channel;
     /**
      * gRPC client stub
      */
-    @Getter(value = AccessLevel.PACKAGE)
+    @Getter
     private StreamServiceGrpc.StreamServiceStub streamServiceStub;
     /**
      * gRPC client stub
      */
-    @Getter(value = AccessLevel.PACKAGE)
+    @Getter
     private MeasureServiceGrpc.MeasureServiceStub measureServiceStub;
     /**
      * gRPC client stub
      */
-    @Getter(value = AccessLevel.PACKAGE)
+    @Getter
     private TraceServiceGrpc.TraceServiceStub traceServiceStub;
     /**
      * gRPC future stub.
      */
-    @Getter(value = AccessLevel.PACKAGE)
+    @Getter
     private StreamServiceGrpc.StreamServiceBlockingStub 
streamServiceBlockingStub;
     /**
      * gRPC future stub.
      */
-    @Getter(value = AccessLevel.PACKAGE)
+    @Getter
     private MeasureServiceGrpc.MeasureServiceBlockingStub 
measureServiceBlockingStub;
     /**
      * gRPC future stub.
      */
-    @Getter(value = AccessLevel.PACKAGE)
+    @Getter
     private TraceServiceGrpc.TraceServiceBlockingStub traceServiceBlockingStub;
     /**
      * The connection status.
@@ -162,16 +149,6 @@ public class BanyanDBClient implements Closeable {
      */
     private final MetadataCache metadataCache;
 
-    static {
-        // init prometheus metric
-        WRITE_HISTOGRAM = Histogram.build()
-                                   .name("banyandb_write_latency_seconds")
-                                   .help("BanyanDB Bulk Write latency in 
seconds")
-                                   .buckets(0.005, 0.01, 0.025, 0.05, 0.1, 
0.25, 0.5, 1.0, 2.5, 5.0, 10.0)
-                                   .labelNames("catalog", "operation", 
"instanceID")
-                                   .register();
-    }
-
     /**
      * Create a BanyanDB client instance with a default options.
      *
@@ -255,122 +232,6 @@ public class BanyanDBClient implements Closeable {
         }
     }
 
-    /**
-     * Perform a single write with given entity.
-     *
-     * @param streamWrite the entity to be written
-     * @return a future of write result
-     */
-    public CompletableFuture<Void> write(StreamWrite streamWrite) {
-        checkState(this.streamServiceStub != null, "stream service is null");
-
-        Histogram.Timer timer
-            = WRITE_HISTOGRAM.labels(
-                                "stream",
-                                "single_write", // single write for non-bulk 
operation.
-                                
options.getPrometheusMetricsOpts().getClientID()
-                            )
-                             .startTimer();
-        CompletableFuture<Void> future = new CompletableFuture<>();
-        final StreamObserver<BanyandbStream.WriteRequest> 
writeRequestStreamObserver
-                = this.streamServiceStub
-                .withDeadlineAfter(this.getOptions().getDeadline(), 
TimeUnit.SECONDS)
-                .write(
-                        new StreamObserver<BanyandbStream.WriteResponse>() {
-                            private BanyanDBException responseException;
-
-                            @Override
-                            public void onNext(BanyandbStream.WriteResponse 
writeResponse) {
-                                BanyandbModel.Status status = 
StatusUtil.convertStringToStatus(writeResponse.getStatus());
-                                switch (status) {
-                                    case STATUS_SUCCEED:
-                                        break;
-                                    case STATUS_INVALID_TIMESTAMP:
-                                        responseException = new 
InvalidArgumentException(
-                                                "Invalid timestamp: " + 
streamWrite.getTimestamp(), null, Status.Code.INVALID_ARGUMENT, false);
-                                        break;
-                                    case STATUS_NOT_FOUND:
-                                        responseException = new 
InvalidArgumentException(
-                                                "Invalid metadata: " + 
streamWrite.entityMetadata, null, Status.Code.INVALID_ARGUMENT, false);
-                                        break;
-                                    case STATUS_EXPIRED_SCHEMA:
-                                        BanyandbCommon.Metadata metadata = 
writeResponse.getMetadata();
-                                        log.warn("The schema {}.{} is expired, 
trying update the schema...",
-                                                metadata.getGroup(), 
metadata.getName());
-                                        try {
-                                            
BanyanDBClient.this.updateStreamMetadataCacheFromSever(metadata.getGroup(), 
metadata.getName());
-                                        } catch (BanyanDBException e) {
-                                            String warnMessage = 
String.format("Failed to refresh the stream schema %s.%s",
-                                                    metadata.getGroup(), 
metadata.getName());
-                                            log.warn(warnMessage, e);
-                                        }
-                                        responseException = new 
InvalidArgumentException(
-                                                "Expired revision: " + 
metadata.getModRevision(), null, Status.Code.INVALID_ARGUMENT, true);
-                                        break;
-                                    default:
-                                        responseException = new 
InternalException(
-                                              String.format("Internal error 
(%s) occurs in server", writeResponse.getStatus()), null, Status.Code.INTERNAL, 
true);
-                                        break;
-                                }
-                            }
-
-                            @Override
-                            public void onError(Throwable throwable) {
-                                timer.observeDuration();
-                                log.error("Error occurs in flushing streams.", 
throwable);
-                                future.completeExceptionally(throwable);
-                            }
-
-                            @Override
-                            public void onCompleted() {
-                                timer.observeDuration();
-                                if (responseException == null) {
-                                    future.complete(null);
-                                } else {
-                                    
future.completeExceptionally(responseException);
-                                }
-                            }
-                        });
-        try {
-            writeRequestStreamObserver.onNext(streamWrite.build());
-        } finally {
-            writeRequestStreamObserver.onCompleted();
-        }
-        return future;
-    }
-
-    /**
-     * Create a build process for stream write.
-     *
-     * @param maxBulkSize   the max bulk size for the flush operation
-     * @param flushInterval if given maxBulkSize is not reached in this 
period, the flush would be trigger
-     *                      automatically. Unit is second
-     * @param concurrency   the number of concurrency would run for the flush 
max
-     * @param timeout       network timeout threshold in seconds.
-     * @return stream bulk write processor
-     */
-    public StreamBulkWriteProcessor buildStreamWriteProcessor(int maxBulkSize, 
int flushInterval, int concurrency, int timeout) {
-        checkState(this.streamServiceStub != null, "stream service is null");
-
-        return new StreamBulkWriteProcessor(this, maxBulkSize, flushInterval, 
concurrency, timeout, WRITE_HISTOGRAM, options);
-    }
-
-    /**
-     * Create a build process for measure write.
-     *
-     * @param maxBulkSize   the max bulk size for the flush operation
-     * @param flushInterval if given maxBulkSize is not reached in this 
period, the flush would be trigger
-     *                      automatically. Unit is second
-     * @param concurrency   the number of concurrency would run for the flush 
max
-     * @param timeout       network timeout threshold in seconds.
-     * @return stream bulk write processor
-     */
-    public MeasureBulkWriteProcessor buildMeasureWriteProcessor(int 
maxBulkSize, int flushInterval, int concurrency, int timeout) {
-        checkState(this.measureServiceStub != null, "measure service is null");
-
-        return new MeasureBulkWriteProcessor(this, maxBulkSize, flushInterval, 
concurrency, timeout, WRITE_HISTOGRAM, options);
-    }
-
     /**
      * Build a MeasureWrite request.
      *
@@ -399,22 +260,6 @@ public class BanyanDBClient implements Closeable {
         return new StreamWrite(this.metadataCache.findStreamMetadata(group, 
name), elementId);
     }
 
-    /**
-     * Build a trace bulk write processor.
-     *
-     * @param maxBulkSize   the max size of each flush. The actual size is 
determined by the length of byte array.
-     * @param flushInterval if given maxBulkSize is not reached in this 
period, the flush would be trigger
-     *                      automatically. Unit is second.
-     * @param concurrency   the number of concurrency would run for the flush 
max.
-     * @param timeout       network timeout threshold in seconds.
-     * @return trace bulk write processor
-     */
-    public TraceBulkWriteProcessor buildTraceWriteProcessor(int maxBulkSize, 
int flushInterval, int concurrency, int timeout) {
-        checkState(this.traceServiceStub != null, "trace service is null");
-
-        return new TraceBulkWriteProcessor(this, maxBulkSize, flushInterval, 
concurrency, timeout, WRITE_HISTOGRAM, options);
-    }
-
     /**
      * Build a TraceWrite request without initial timestamp.
      *
@@ -958,40 +803,6 @@ public class BanyanDBClient implements Closeable {
         return registry.delete(group, name);
     }
 
-    /**
-     * Apply(Create or update) the property with {@link 
BanyandbProperty.ApplyRequest.Strategy#STRATEGY_MERGE}
-     *
-     * @param property the property to be stored in the BanyanBD
-     */
-    public ApplyResponse apply(Property property) throws BanyanDBException {
-        PropertyStore store = new PropertyStore(checkNotNull(this.channel));
-        try (Histogram.Timer timer = WRITE_HISTOGRAM.labels(
-            "property",
-            "single_write",
-            options.getPrometheusMetricsOpts().getClientID()
-        ).startTimer()) {
-            return store.apply(property);
-        }
-    }
-
-    /**
-     * Apply(Create or update) the property
-     *
-     * @param property the property to be stored in the BanyanBD
-     * @param strategy dedicates how to apply the property
-     */
-    public ApplyResponse apply(Property property, Strategy strategy) throws
-            BanyanDBException {
-        PropertyStore store = new PropertyStore(checkNotNull(this.channel));
-        try (Histogram.Timer timer = WRITE_HISTOGRAM.labels(
-            "property",
-            "single_write",
-            options.getPrometheusMetricsOpts().getClientID()
-        ).startTimer()) {
-            return store.apply(property, strategy);
-        }
-    }
-
     /**
      * Query properties
      *
@@ -1003,26 +814,6 @@ public class BanyanDBClient implements Closeable {
         return store.query(request);
     }
 
-    /**
-     * Delete property
-     *
-     * @param group group of the metadata
-     * @param name  name of the metadata
-     * @param id    identity of the property
-     * @return if this property has been deleted
-     */
-    public DeleteResponse deleteProperty(String group, String name, String id) 
throws
-            BanyanDBException {
-        PropertyStore store = new PropertyStore(checkNotNull(this.channel));
-        try (Histogram.Timer timer = WRITE_HISTOGRAM.labels(
-            "property",
-            "delete",
-            options.getPrometheusMetricsOpts().getClientID()
-        ).startTimer()) {
-            return store.delete(group, name, id);
-        }
-    }
-
     /**
      * Define a new trace
      *
diff --git 
a/src/main/java/org/apache/skywalking/banyandb/v1/client/MeasureBulkWriteProcessor.java
 
b/src/main/java/org/apache/skywalking/banyandb/v1/client/MeasureBulkWriteProcessor.java
deleted file mode 100644
index ce17e66..0000000
--- 
a/src/main/java/org/apache/skywalking/banyandb/v1/client/MeasureBulkWriteProcessor.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.banyandb.v1.client;
-
-import io.grpc.stub.StreamObserver;
-import io.prometheus.client.Histogram;
-import java.util.List;
-import lombok.extern.slf4j.Slf4j;
-
-import org.apache.skywalking.banyandb.common.v1.BanyandbCommon;
-import org.apache.skywalking.banyandb.measure.v1.BanyandbMeasure;
-import org.apache.skywalking.banyandb.measure.v1.MeasureServiceGrpc;
-import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
-import 
org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException;
-import org.apache.skywalking.banyandb.v1.client.util.StatusUtil;
-
-import javax.annotation.concurrent.ThreadSafe;
-
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-
-/**
- * MeasureBulkWriteProcessor works for measure flush.
- */
-@Slf4j
-@ThreadSafe
-public class MeasureBulkWriteProcessor extends 
AbstractBulkWriteProcessor<BanyandbMeasure.WriteRequest,
-        MeasureServiceGrpc.MeasureServiceStub> {
-    private final BanyanDBClient client;
-    private final Histogram writeHistogram;
-    private final Options options;
-
-    /**
-     * Create the processor.
-     *
-     * @param client         the client
-     * @param maxBulkSize    the max bulk size for the flush operation
-     * @param flushInterval  if given maxBulkSize is not reached in this 
period, the flush would be trigger
-     *                       automatically. Unit is second.
-     * @param concurrency    the number of concurrency would run for the flush 
max.
-     * @param timeout        network timeout threshold in seconds.
-     */
-    protected MeasureBulkWriteProcessor(
-        final BanyanDBClient client,
-        final int maxBulkSize,
-        final int flushInterval,
-        final int concurrency,
-        final int timeout, final Histogram writeHistogram, final Options 
options) {
-        super(client.getMeasureServiceStub(), "MeasureBulkWriteProcessor", 
maxBulkSize, flushInterval, concurrency, timeout);
-        this.client = client;
-        this.writeHistogram = writeHistogram;
-        this.options = options;
-    }
-
-    @Override
-    protected StreamObserver<BanyandbMeasure.WriteRequest> 
buildStreamObserver(MeasureServiceGrpc.MeasureServiceStub stub,
-                                                                               
CompletableFuture<Void> batch) {
-        return stub.write(new StreamObserver<BanyandbMeasure.WriteResponse>() {
-            private final Set<String> schemaExpired = new HashSet<>();
-
-            @Override
-            public void onNext(BanyandbMeasure.WriteResponse writeResponse) {
-                BanyandbModel.Status status = 
StatusUtil.convertStringToStatus(writeResponse.getStatus());
-                switch (status) {
-                    case STATUS_SUCCEED:
-                        break;
-                    case STATUS_EXPIRED_SCHEMA:
-                        BanyandbCommon.Metadata metadata = 
writeResponse.getMetadata();
-                        String schemaKey = metadata.getGroup() + "." + 
metadata.getName();
-                        if (!schemaExpired.contains(schemaKey)) {
-                            log.warn("The schema {} is expired, trying update 
the schema...", schemaKey);
-                            try {
-                                
client.updateMeasureMetadataCacheFromSever(metadata.getGroup(), 
metadata.getName());
-                                schemaExpired.add(schemaKey);
-                            } catch (BanyanDBException e) {
-                                log.error(e.getMessage(), e);
-                            }
-                        }
-                        break;
-                    default:
-                        log.warn("Write measure failed with status: {}", 
status);
-                }
-            }
-
-            @Override
-            public void onError(Throwable t) {
-                batch.completeExceptionally(t);
-                log.error("Error occurs in flushing measures", t);
-            }
-
-            @Override
-            public void onCompleted() {
-                batch.complete(null);
-            }
-        });
-    }
-
-    @Override
-    protected CompletableFuture<Void> doObservedFlush(final List<Holder> data) 
{
-        Histogram.Timer timer = writeHistogram.labels(
-            "measure",
-            "bulk_write",
-            options.getPrometheusMetricsOpts().getClientID()
-        ).startTimer();
-        return super.doFlush(data, timer);
-    }
-}
diff --git 
a/src/main/java/org/apache/skywalking/banyandb/v1/client/StreamBulkWriteProcessor.java
 
b/src/main/java/org/apache/skywalking/banyandb/v1/client/StreamBulkWriteProcessor.java
deleted file mode 100644
index 01f042a..0000000
--- 
a/src/main/java/org/apache/skywalking/banyandb/v1/client/StreamBulkWriteProcessor.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.banyandb.v1.client;
-
-import io.grpc.stub.StreamObserver;
-
-import io.prometheus.client.Histogram;
-import java.util.List;
-import lombok.extern.slf4j.Slf4j;
-
-import org.apache.skywalking.banyandb.common.v1.BanyandbCommon;
-import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
-import org.apache.skywalking.banyandb.stream.v1.StreamServiceGrpc;
-import org.apache.skywalking.banyandb.stream.v1.BanyandbStream;
-import 
org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException;
-import org.apache.skywalking.banyandb.v1.client.util.StatusUtil;
-
-import javax.annotation.concurrent.ThreadSafe;
-
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-
-/**
- * StreamBulkWriteProcessor works for stream flush.
- */
-@Slf4j
-@ThreadSafe
-public class StreamBulkWriteProcessor extends 
AbstractBulkWriteProcessor<BanyandbStream.WriteRequest,
-        StreamServiceGrpc.StreamServiceStub> {
-    private final BanyanDBClient client;
-    private final Histogram writeHistogram;
-    private final Options options;
-
-    /**
-     * Create the processor.
-     *
-     * @param client        the client
-     * @param maxBulkSize   the max bulk size for the flush operation
-     * @param flushInterval if given maxBulkSize is not reached in this 
period, the flush would be trigger
-     *                      automatically. Unit is second.
-     * @param timeout       network timeout threshold in seconds.
-     * @param concurrency   the number of concurrency would run for the flush 
max.
-     */
-    protected StreamBulkWriteProcessor(
-            final BanyanDBClient client,
-            final int maxBulkSize,
-            final int flushInterval,
-            final int concurrency,
-            final int timeout,
-            final Histogram writeHistogram,
-            final Options options) {
-        super(client.getStreamServiceStub(), "StreamBulkWriteProcessor", 
maxBulkSize, flushInterval, concurrency, timeout);
-        this.client = client;
-        this.writeHistogram = writeHistogram;
-        this.options = options;
-    }
-
-    @Override
-    protected StreamObserver<BanyandbStream.WriteRequest> 
buildStreamObserver(StreamServiceGrpc.StreamServiceStub stub, 
CompletableFuture<Void> batch) {
-        return stub.write(
-                new StreamObserver<BanyandbStream.WriteResponse>() {
-                    private final Set<String> schemaExpired = new HashSet<>();
-
-                    @Override
-                    public void onNext(BanyandbStream.WriteResponse 
writeResponse) {
-                        BanyandbModel.Status status = 
StatusUtil.convertStringToStatus(writeResponse.getStatus());
-                        switch (status) {
-                            case STATUS_SUCCEED:
-                                break;
-                            case STATUS_EXPIRED_SCHEMA:
-                                BanyandbCommon.Metadata metadata = 
writeResponse.getMetadata();
-                                String schemaKey = metadata.getGroup() + "." + 
metadata.getName();
-                                if (!schemaExpired.contains(schemaKey)) {
-                                    log.warn("The schema {} is expired, trying 
update the schema...", schemaKey);
-                                    try {
-                                        
client.updateStreamMetadataCacheFromSever(metadata.getGroup(), 
metadata.getName());
-                                        schemaExpired.add(schemaKey);
-                                    } catch (BanyanDBException e) {
-                                        log.error(e.getMessage(), e);
-                                    }
-                                }
-                                break;
-                            default:
-                                log.warn("Write stream failed with status: 
{}", status);
-                        }
-                    }
-
-                    @Override
-                    public void onError(Throwable t) {
-                        batch.completeExceptionally(t);
-                        log.error("Error occurs in flushing streams", t);
-                    }
-
-                    @Override
-                    public void onCompleted() {
-                        batch.complete(null);
-                    }
-                });
-    }
-
-    @Override
-    protected CompletableFuture<Void> doObservedFlush(final List<Holder> data) 
{
-        Histogram.Timer timer = writeHistogram.labels(
-            "stream",
-            "bulk_write",
-            options.getPrometheusMetricsOpts().getClientID()
-        ).startTimer();
-        return super.doFlush(data, timer);
-    }
-}
-
diff --git 
a/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceBulkWriteProcessor.java
 
b/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceBulkWriteProcessor.java
deleted file mode 100644
index 96fdc4d..0000000
--- 
a/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceBulkWriteProcessor.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.banyandb.v1.client;
-
-import io.grpc.stub.StreamObserver;
-
-import io.prometheus.client.Histogram;
-import java.util.List;
-import lombok.extern.slf4j.Slf4j;
-
-import org.apache.skywalking.banyandb.common.v1.BanyandbCommon;
-import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
-import org.apache.skywalking.banyandb.trace.v1.TraceServiceGrpc;
-import org.apache.skywalking.banyandb.trace.v1.BanyandbTrace;
-import 
org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException;
-import org.apache.skywalking.banyandb.v1.client.util.StatusUtil;
-
-import javax.annotation.concurrent.ThreadSafe;
-
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-
-/**
- * TraceBulkWriteProcessor works for trace flush.
- */
-@Slf4j
-@ThreadSafe
-public class TraceBulkWriteProcessor extends 
AbstractBulkWriteProcessor<BanyandbTrace.WriteRequest,
-        TraceServiceGrpc.TraceServiceStub> {
-    private final BanyanDBClient client;
-    private final Histogram writeHistogram;
-    private final Options options;
-
-    /**
-     * Create the processor.
-     *
-     * @param client        the client
-     * @param maxBulkSize   the max bulk size for the flush operation
-     * @param flushInterval if given maxBulkSize is not reached in this 
period, the flush would be trigger
-     *                      automatically. Unit is second.
-     * @param timeout       network timeout threshold in seconds.
-     * @param concurrency   the number of concurrency would run for the flush 
max.
-     */
-    protected TraceBulkWriteProcessor(
-            final BanyanDBClient client,
-            final int maxBulkSize,
-            final int flushInterval,
-            final int concurrency,
-            final int timeout,
-            final Histogram writeHistogram,
-            final Options options) {
-        super(client.getTraceServiceStub(), "TraceBulkWriteProcessor", 
maxBulkSize, flushInterval, concurrency, timeout);
-        this.client = client;
-        this.writeHistogram = writeHistogram;
-        this.options = options;
-    }
-
-    @Override
-    protected StreamObserver<BanyandbTrace.WriteRequest> 
buildStreamObserver(TraceServiceGrpc.TraceServiceStub stub, 
CompletableFuture<Void> batch) {
-        return stub.write(
-                new StreamObserver<BanyandbTrace.WriteResponse>() {
-                    private final Set<String> schemaExpired = new HashSet<>();
-
-                    @Override
-                    public void onNext(BanyandbTrace.WriteResponse 
writeResponse) {
-                        BanyandbModel.Status status = 
StatusUtil.convertStringToStatus(writeResponse.getStatus());
-                        switch (status) {
-                            case STATUS_SUCCEED:
-                                break;
-                            case STATUS_EXPIRED_SCHEMA:
-                                BanyandbCommon.Metadata metadata = 
writeResponse.getMetadata();
-                                String schemaKey = metadata.getGroup() + "." + 
metadata.getName();
-                                if (!schemaExpired.contains(schemaKey)) {
-                                    log.warn("The trace schema {} is expired, 
trying update the schema...", schemaKey);
-                                    try {
-                                        
client.updateTraceMetadataCacheFromServer(metadata.getGroup(), 
metadata.getName());
-                                        schemaExpired.add(schemaKey);
-                                    } catch (BanyanDBException e) {
-                                        log.error(e.getMessage(), e);
-                                    }
-                                }
-                                break;
-                            default:
-                                log.warn("Write trace failed with status: {}", 
status);
-                        }
-                    }
-
-                    @Override
-                    public void onError(Throwable t) {
-                        batch.completeExceptionally(t);
-                        log.error("Error occurs in flushing traces", t);
-                    }
-
-                    @Override
-                    public void onCompleted() {
-                        batch.complete(null);
-                    }
-                });
-    }
-
-    @Override
-    protected CompletableFuture<Void> doObservedFlush(final List<Holder> data) 
{
-        Histogram.Timer timer = writeHistogram.labels(
-            "trace",
-            "bulk_write",
-            options.getPrometheusMetricsOpts().getClientID()
-        ).startTimer();
-        return super.doFlush(data, timer);
-    }
-}
\ No newline at end of file
diff --git 
a/src/test/java/org/apache/skywalking/banyandb/v1/client/ITBanyanDBMeasureQueryTests.java
 
b/src/test/java/org/apache/skywalking/banyandb/v1/client/ITBanyanDBMeasureQueryTests.java
index 67c96d5..7fddcf9 100644
--- 
a/src/test/java/org/apache/skywalking/banyandb/v1/client/ITBanyanDBMeasureQueryTests.java
+++ 
b/src/test/java/org/apache/skywalking/banyandb/v1/client/ITBanyanDBMeasureQueryTests.java
@@ -20,6 +20,7 @@ package org.apache.skywalking.banyandb.v1.client;
 
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
+import io.grpc.stub.StreamObserver;
 import org.apache.skywalking.banyandb.common.v1.BanyandbCommon.Group;
 import org.apache.skywalking.banyandb.common.v1.BanyandbCommon.Catalog;
 import org.apache.skywalking.banyandb.common.v1.BanyandbCommon.IntervalRule;
@@ -34,6 +35,8 @@ import 
org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.Measure;
 import 
org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.TagFamilySpec;
 import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.TagSpec;
 import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.TagType;
+import org.apache.skywalking.banyandb.measure.v1.BanyandbMeasure;
+import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
 import 
org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException;
 import org.apache.skywalking.banyandb.v1.client.metadata.Duration;
 import org.junit.After;
@@ -44,7 +47,6 @@ import org.junit.Test;
 import java.io.IOException;
 import java.time.Instant;
 import java.time.temporal.ChronoUnit;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -52,7 +54,6 @@ import java.util.concurrent.TimeoutException;
 import static org.awaitility.Awaitility.await;
 
 public class ITBanyanDBMeasureQueryTests extends BanyanDBClientTestCI {
-    private MeasureBulkWriteProcessor processor;
 
     @Before
     public void setUp() throws IOException, BanyanDBException, 
InterruptedException {
@@ -62,14 +63,10 @@ public class ITBanyanDBMeasureQueryTests extends 
BanyanDBClientTestCI {
         Assert.assertNotNull(expectedGroup);
         Measure expectedMeasure = buildMeasure();
         client.define(expectedMeasure);
-        processor = client.buildMeasureWriteProcessor(1000, 1, 1, 10);
     }
 
     @After
     public void tearDown() throws IOException {
-        if (this.processor != null) {
-            this.processor.close();
-        }
         this.closeClient();
     }
 
@@ -81,13 +78,29 @@ public class ITBanyanDBMeasureQueryTests extends 
BanyanDBClientTestCI {
 
         MeasureWrite measureWrite = client.createMeasureWrite("sw_metric", 
"service_cpm_minute", now.toEpochMilli());
         measureWrite.tag("entity_id", 
TagAndValue.stringTagValue("entity_1")).field("total", 
TagAndValue.longFieldValue(100)).field("value", TagAndValue.longFieldValue(1));
+        StreamObserver<BanyandbMeasure.WriteRequest> writeObserver
+            = client.getMeasureServiceStub().write(new 
StreamObserver<BanyandbMeasure.WriteResponse>() {
+            @Override
+            public void onNext(BanyandbMeasure.WriteResponse writeResponse) {
+                
Assert.assertEquals(BanyandbModel.Status.STATUS_SUCCEED.name(), 
writeResponse.getStatus());
+            }
+
+            @Override
+            public void onError(Throwable throwable) {
+                Assert.fail("write failed: " + throwable.getMessage());
+            }
+
+            @Override
+            public void onCompleted() {
 
-        CompletableFuture<Void> f = processor.add(measureWrite);
-        f.exceptionally(exp -> {
-            Assert.fail(exp.getMessage());
-            return null;
+            }
         });
-        f.get(10, TimeUnit.SECONDS);
+        try {
+            writeObserver.onNext(measureWrite.build());
+
+        } finally {
+            writeObserver.onCompleted();
+        }
 
         MeasureQuery query = new MeasureQuery(Lists.newArrayList("sw_metric"), 
"service_cpm_minute", new TimestampRange(begin.toEpochMilli(), now.plus(1, 
ChronoUnit.MINUTES).toEpochMilli()), ImmutableSet.of("entity_id"), // tags
                 ImmutableSet.of("total")); // fields
diff --git 
a/src/test/java/org/apache/skywalking/banyandb/v1/client/ITBanyanDBPropertyTests.java
 
b/src/test/java/org/apache/skywalking/banyandb/v1/client/ITBanyanDBPropertyTests.java
index be0fb09..e752e67 100644
--- 
a/src/test/java/org/apache/skywalking/banyandb/v1/client/ITBanyanDBPropertyTests.java
+++ 
b/src/test/java/org/apache/skywalking/banyandb/v1/client/ITBanyanDBPropertyTests.java
@@ -83,7 +83,8 @@ public class ITBanyanDBPropertyTests extends 
BanyanDBClientTestCI {
         Property property = buildProperty("default", "sw", 
"ui_template").toBuilder().addTags(
             Tag.newBuilder().setKey("name").setValue(
                 
TagValue.newBuilder().setStr(Str.newBuilder().setValue("hello")))).build();
-        Assert.assertTrue(this.client.apply(property).getCreated());
+        PropertyStore store = new PropertyStore(client.getChannel());
+        Assert.assertTrue(store.apply(property).getCreated());
 
         await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
             BanyandbProperty.QueryResponse resp = 
client.query(BanyandbProperty.QueryRequest.newBuilder()
@@ -103,7 +104,8 @@ public class ITBanyanDBPropertyTests extends 
BanyanDBClientTestCI {
         Property property = buildProperty("default", "sw", 
"ui_template").toBuilder().addTags(
             Tag.newBuilder().setKey("name").setValue(
                 
TagValue.newBuilder().setStr(Str.newBuilder().setValue("hello")))).build();
-        Assert.assertTrue(this.client.apply(property).getCreated());
+        PropertyStore store = new PropertyStore(client.getChannel());
+        Assert.assertTrue(store.apply(property).getCreated());
 
         await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
             BanyandbProperty.QueryResponse resp = 
client.query(BanyandbProperty.QueryRequest.newBuilder()
@@ -116,8 +118,8 @@ public class ITBanyanDBPropertyTests extends 
BanyanDBClientTestCI {
             Assert.assertNotNull(gotProperty);
             Assert.assertEquals(property.getTagsList(), 
gotProperty.getTagsList());
         });
-
-        Assert.assertTrue(this.client.deleteProperty("default", "sw", 
"ui_template").getDeleted());
+        BanyandbProperty.DeleteResponse result = store.delete("default", "sw", 
"ui_template");
+        Assert.assertTrue(result.getDeleted());
         BanyandbProperty.QueryResponse resp = 
client.query(BanyandbProperty.QueryRequest.newBuilder()
                 .addGroups("default")
                 .setName("sw")
@@ -131,12 +133,13 @@ public class ITBanyanDBPropertyTests extends 
BanyanDBClientTestCI {
         Property property1 = buildProperty("default", "sw", 
"ui_template").toBuilder().addTags(
             Tag.newBuilder().setKey("name").setValue(
                 
TagValue.newBuilder().setStr(Str.newBuilder().setValue("hello")))).build();
-        Assert.assertTrue(this.client.apply(property1).getCreated());
+        PropertyStore store = new PropertyStore(client.getChannel());
+        Assert.assertTrue(store.apply(property1).getCreated());
 
         Property property2 = buildProperty("default", "sw", 
"ui_template").toBuilder().addTags(
             Tag.newBuilder().setKey("name").setValue(
                 
TagValue.newBuilder().setStr(Str.newBuilder().setValue("word")))).build();
-        Assert.assertFalse(this.client.apply(property2).getCreated());
+        Assert.assertFalse(store.apply(property2).getCreated());
 
         await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
             BanyandbProperty.QueryResponse resp = 
client.query(BanyandbProperty.QueryRequest.newBuilder()
@@ -156,11 +159,12 @@ public class ITBanyanDBPropertyTests extends 
BanyanDBClientTestCI {
         Property property = buildProperty("default", "sw", 
"id1").toBuilder().addTags(
             Tag.newBuilder().setKey("name").setValue(
                 
TagValue.newBuilder().setStr(Str.newBuilder().setValue("bar")))).build();
-        Assert.assertTrue(this.client.apply(property).getCreated());
+        PropertyStore store = new PropertyStore(client.getChannel());
+        Assert.assertTrue(store.apply(property).getCreated());
         property = buildProperty("default", "sw", "id2").toBuilder().addTags(
             Tag.newBuilder().setKey("name").setValue(
                 
TagValue.newBuilder().setStr(Str.newBuilder().setValue("foo")))).build();
-        Assert.assertTrue(this.client.apply(property).getCreated());
+        Assert.assertTrue(store.apply(property).getCreated());
 
         await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
             client.query(new PropertyQuery(Lists.newArrayList("default"), 
"sw", ImmutableSet.of("name")).build(null));
@@ -194,11 +198,12 @@ public class ITBanyanDBPropertyTests extends 
BanyanDBClientTestCI {
         Property property = buildProperty("default", "sw", 
"id1").toBuilder().addTags(
             Tag.newBuilder().setKey("name").setValue(
                 
TagValue.newBuilder().setStr(Str.newBuilder().setValue("bar")))).build();
-        Assert.assertTrue(this.client.apply(property).getCreated());
+        PropertyStore store = new PropertyStore(client.getChannel());
+        Assert.assertTrue(store.apply(property).getCreated());
         property = buildProperty("default", "sw", "id2").toBuilder().addTags(
             Tag.newBuilder().setKey("name").setValue(
                 
TagValue.newBuilder().setStr(Str.newBuilder().setValue("foo")))).build();
-        Assert.assertTrue(this.client.apply(property).getCreated());
+        Assert.assertTrue(store.apply(property).getCreated());
 
         await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
             BanyandbProperty.QueryResponse resp = client.query(new 
PropertyQuery(Lists.newArrayList("default"), "sw", 
ImmutableSet.of("name")).build());
diff --git 
a/src/test/java/org/apache/skywalking/banyandb/v1/client/ITBanyanDBStreamQueryTests.java
 
b/src/test/java/org/apache/skywalking/banyandb/v1/client/ITBanyanDBStreamQueryTests.java
index c394138..d098d81 100644
--- 
a/src/test/java/org/apache/skywalking/banyandb/v1/client/ITBanyanDBStreamQueryTests.java
+++ 
b/src/test/java/org/apache/skywalking/banyandb/v1/client/ITBanyanDBStreamQueryTests.java
@@ -20,6 +20,7 @@ package org.apache.skywalking.banyandb.v1.client;
 
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
+import io.grpc.stub.StreamObserver;
 import java.time.ZoneOffset;
 import java.time.ZonedDateTime;
 import java.util.Arrays;
@@ -37,16 +38,16 @@ import 
org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.TagType;
 import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.Stream;
 import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.IndexRule;
 import 
org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.IndexRuleBinding;
+import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
+import org.apache.skywalking.banyandb.stream.v1.BanyandbStream;
 import 
org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException;
 import org.apache.skywalking.banyandb.v1.client.util.TimeUtils;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
-
 import java.io.IOException;
 import java.time.Instant;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -55,7 +56,6 @@ import static 
org.apache.skywalking.banyandb.v1.client.BanyanDBClient.DEFAULT_EX
 import static org.awaitility.Awaitility.await;
 
 public class ITBanyanDBStreamQueryTests extends BanyanDBClientTestCI {
-    private StreamBulkWriteProcessor processor;
 
     @Before
     public void setUp() throws IOException, BanyanDBException, 
InterruptedException {
@@ -64,14 +64,10 @@ public class ITBanyanDBStreamQueryTests extends 
BanyanDBClientTestCI {
         this.client.define(buildStream());
         this.client.define(buildIndexRule());
         this.client.define(buildIndexRuleBinding());
-        processor = client.buildStreamWriteProcessor(1000, 1, 1, 10);
     }
 
     @After
     public void tearDown() throws IOException {
-        if (processor != null) {
-            this.processor.close();
-        }
         this.closeClient();
     }
 
@@ -110,13 +106,28 @@ public class ITBanyanDBStreamQueryTests extends 
BanyanDBClientTestCI {
                 .tag("mq.topic", Value.stringTagValue(topic)) // 11
                 .tag("mq.queue", Value.stringTagValue(queue)); // 12
         streamWrite.setTimestamp(now.toEpochMilli());
+        StreamObserver<BanyandbStream.WriteRequest> writeObserver
+            = client.getStreamServiceStub().write(new 
StreamObserver<BanyandbStream.WriteResponse>() {
+            @Override
+            public void onNext(BanyandbStream.WriteResponse writeResponse) {
+                
Assert.assertEquals(BanyandbModel.Status.STATUS_SUCCEED.name(), 
writeResponse.getStatus());
+            }
+
+            @Override
+            public void onError(Throwable throwable) {
+                Assert.fail("write failed: " + throwable.getMessage());
+            }
 
-        CompletableFuture<Void> f = processor.add(streamWrite);
-        f.exceptionally(exp -> {
-            Assert.fail(exp.getMessage());
-            return null;
+            @Override
+            public void onCompleted() {
+            }
         });
-        f.get(10, TimeUnit.SECONDS);
+        try {
+            writeObserver.onNext(streamWrite.build());
+
+        } finally {
+            writeObserver.onCompleted();
+        }
 
         StreamQuery query = new StreamQuery(
             Lists.newArrayList("sw_record"), "trace", ImmutableSet.of("state", 
"duration", "trace_id", "data_binary"));
diff --git 
a/src/test/java/org/apache/skywalking/banyandb/v1/client/ITTraceTest.java 
b/src/test/java/org/apache/skywalking/banyandb/v1/client/ITTraceTest.java
index 15a16df..0d45187 100644
--- a/src/test/java/org/apache/skywalking/banyandb/v1/client/ITTraceTest.java
+++ b/src/test/java/org/apache/skywalking/banyandb/v1/client/ITTraceTest.java
@@ -20,22 +20,23 @@ package org.apache.skywalking.banyandb.v1.client;
 
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
+import io.grpc.stub.StreamObserver;
 import org.apache.skywalking.banyandb.common.v1.BanyandbCommon;
 import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase;
+import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
+import org.apache.skywalking.banyandb.trace.v1.BanyandbTrace;
 import 
org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException;
 import org.apache.skywalking.banyandb.v1.client.util.TimeUtils;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
-
 import java.io.IOException;
 import java.time.Instant;
 import java.time.ZoneOffset;
 import java.time.ZonedDateTime;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -50,7 +51,6 @@ import static org.awaitility.Awaitility.await;
 public class ITTraceTest extends BanyanDBClientTestCI {
     private final String groupName = "sw_trace";
     private final String traceName = "trace_data";
-    private TraceBulkWriteProcessor processor;
 
     @Before
     public void setUp() throws IOException, BanyanDBException, 
InterruptedException {
@@ -95,15 +95,10 @@ public class ITTraceTest extends BanyanDBClientTestCI {
         this.client.define(trace);
         this.client.define(buildIndexRule());
         this.client.define(buildIndexRuleBinding());
-        
-        processor = client.buildTraceWriteProcessor(1000, 1, 1, 10);
     }
 
     @After
     public void tearDown() throws IOException {
-        if (processor != null) {
-            processor.close();
-        }
         this.closeClient();
     }
 
@@ -136,14 +131,29 @@ public class ITTraceTest extends BanyanDBClientTestCI {
             .tag("start_time", Value.timestampTagValue(now.toEpochMilli()))
             .span(spanData)
             .version(1L);
-            
-        // Write the trace via bulk processor
-        CompletableFuture<Void> writeFuture = processor.add(traceWrite);
-        writeFuture.exceptionally(exp -> {
-            Assert.fail("Write failed: " + exp.getMessage());
-            return null;
+
+        StreamObserver<BanyandbTrace.WriteRequest> writeObserver
+            = client.getTraceServiceStub().write(new 
StreamObserver<BanyandbTrace.WriteResponse>() {
+            @Override
+            public void onNext(BanyandbTrace.WriteResponse writeResponse) {
+                
Assert.assertEquals(BanyandbModel.Status.STATUS_SUCCEED.name(), 
writeResponse.getStatus());
+            }
+
+            @Override
+            public void onError(Throwable throwable) {
+                Assert.fail("write failed: " + throwable.getMessage());
+            }
+
+            @Override
+            public void onCompleted() {
+
+            }
         });
-        writeFuture.get(10, TimeUnit.SECONDS);
+        try {
+            writeObserver.onNext(traceWrite.build());
+        } finally {
+            writeObserver.onCompleted();
+        }
         
         // Create trace query with trace_id condition
         TraceQuery query = new TraceQuery(
@@ -211,13 +221,30 @@ public class ITTraceTest extends BanyanDBClientTestCI {
             .tag("start_time", 
Value.timestampTagValue(baseTime.plusSeconds(120).toEpochMilli()))
             .span("span-data-3".getBytes())
             .version(1L);
-        
-        // Write the traces via bulk processor
-        CompletableFuture<Void> future1 = processor.add(trace1);
-        CompletableFuture<Void> future2 = processor.add(trace2);
-        CompletableFuture<Void> future3 = processor.add(trace3);
-        
-        CompletableFuture.allOf(future1, future2, future3).get(10, 
TimeUnit.SECONDS);
+        StreamObserver<BanyandbTrace.WriteRequest> writeObserver
+            = client.getTraceServiceStub().write(new 
StreamObserver<BanyandbTrace.WriteResponse>() {
+            @Override
+            public void onNext(BanyandbTrace.WriteResponse writeResponse) {
+                
Assert.assertEquals(BanyandbModel.Status.STATUS_SUCCEED.name(), 
writeResponse.getStatus());
+            }
+
+            @Override
+            public void onError(Throwable throwable) {
+                Assert.fail("write failed: " + throwable.getMessage());
+            }
+
+            @Override
+            public void onCompleted() {
+
+            }
+        });
+        try {
+            writeObserver.onNext(trace1.build());
+            writeObserver.onNext(trace2.build());
+            writeObserver.onNext(trace3.build());
+        } finally {
+            writeObserver.onCompleted();
+        }
         
         // Create trace query with order by start_time (no trace_id condition 
as it interferes with ordering)
         TraceQuery query = new TraceQuery(

Reply via email to