This is an automated email from the ASF dual-hosted git repository. wankai pushed a commit to branch remove-write-logic in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb-java-client.git
commit 141eb5190f19d99558a536a8527b7634019177ff Author: wankai123 <[email protected]> AuthorDate: Mon Oct 13 16:02:14 2025 +0800 Move write logic to OAP Server. --- .../v1/client/AbstractBulkWriteProcessor.java | 204 ------------------- .../banyandb/v1/client/AbstractWrite.java | 3 +- .../banyandb/v1/client/BanyanDBClient.java | 225 +-------------------- .../v1/client/MeasureBulkWriteProcessor.java | 124 ------------ .../v1/client/StreamBulkWriteProcessor.java | 128 ------------ .../v1/client/TraceBulkWriteProcessor.java | 127 ------------ .../v1/client/ITBanyanDBMeasureQueryTests.java | 34 +++- .../v1/client/ITBanyanDBPropertyTests.java | 25 ++- .../v1/client/ITBanyanDBStreamQueryTests.java | 33 ++- .../skywalking/banyandb/v1/client/ITTraceTest.java | 76 +++++-- 10 files changed, 125 insertions(+), 854 deletions(-) diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/AbstractBulkWriteProcessor.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/AbstractBulkWriteProcessor.java deleted file mode 100644 index d850cd1..0000000 --- a/src/main/java/org/apache/skywalking/banyandb/v1/client/AbstractBulkWriteProcessor.java +++ /dev/null @@ -1,204 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.skywalking.banyandb.v1.client; - -import com.google.auto.value.AutoValue; -import io.grpc.stub.AbstractAsyncStub; -import io.grpc.stub.StreamObserver; -import io.prometheus.client.Histogram; -import java.io.Closeable; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; -import lombok.SneakyThrows; -import lombok.extern.slf4j.Slf4j; - -@Slf4j -public abstract class AbstractBulkWriteProcessor<REQ extends com.google.protobuf.GeneratedMessageV3, - STUB extends AbstractAsyncStub<STUB>> - implements Runnable, Closeable { - private final STUB stub; - private final int maxBulkSize; - private final int flushInterval; - private final ArrayBlockingQueue<Holder> requests; - private final Semaphore semaphore; - private final long flushInternalInMillis; - private final ScheduledThreadPoolExecutor scheduler; - private final int timeout; - private volatile long lastFlushTS = 0; - - /** - * Create the processor. - * - * @param stub an implementation of {@link AbstractAsyncStub} - * @param processorName name of the processor for logging - * @param maxBulkSize the max bulk size for the flush operation - * @param flushInterval if given maxBulkSize is not reached in this period, the flush would be trigger - * automatically. Unit is second. - * @param concurrency the number of concurrency would run for the flush max. - * @param timeout network timeout threshold in seconds. - */ - protected AbstractBulkWriteProcessor(STUB stub, - String processorName, - int maxBulkSize, - int flushInterval, - int concurrency, - int timeout) { - this.stub = stub; - this.maxBulkSize = maxBulkSize; - this.flushInterval = flushInterval; - this.timeout = timeout; - requests = new ArrayBlockingQueue<>(maxBulkSize + 1); - this.semaphore = new Semaphore(concurrency > 0 ? concurrency : 1); - - scheduler = new ScheduledThreadPoolExecutor(1, r -> { - final Thread thread = new Thread(r); - thread.setName("BanyanDB BulkProcessor"); - return thread; - }); - scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); - scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false); - scheduler.setRemoveOnCancelPolicy(true); - flushInternalInMillis = flushInterval * 1000; - scheduler.scheduleWithFixedDelay( - this, 0, flushInterval, TimeUnit.SECONDS); - } - - /** - * Add the measure to the bulk processor. - * - * @param writeEntity to add. - */ - @SneakyThrows - public CompletableFuture<Void> add(AbstractWrite<REQ> writeEntity) { - final CompletableFuture<Void> f = new CompletableFuture<>(); - requests.put(Holder.create(writeEntity, f)); - flushIfNeeded(); - return f; - } - - public void run() { - try { - doPeriodicalFlush(); - } catch (Throwable t) { - log.error("Failed to flush data to BanyanDB", t); - } - } - - @SneakyThrows - protected void flushIfNeeded() { - if (requests.size() >= maxBulkSize) { - flush(); - } - } - - private void doPeriodicalFlush() { - if (System.currentTimeMillis() - lastFlushTS > flushInternalInMillis / 2) { - // Run periodical flush if there is no `flushIfNeeded` executed in the second half of the flush period. - // Otherwise, wait for the next round. By default, the last 2 seconds of the 5s period. - // This could avoid periodical flush running among bulks(controlled by bulkActions). - flush(); - } - } - - public void flush() { - if (requests.isEmpty()) { - return; - } - - try { - semaphore.acquire(); - } catch (InterruptedException e) { - log.error("Interrupted when trying to get semaphore to execute bulk requests", e); - return; - } - - final List<Holder> batch = new ArrayList<>(requests.size()); - requests.drainTo(batch); - final CompletableFuture<Void> future = doObservedFlush(batch); - future.whenComplete((v, t) -> semaphore.release()); - future.join(); - lastFlushTS = System.currentTimeMillis(); - - } - - protected abstract CompletableFuture<Void> doObservedFlush(final List<Holder> data); - - protected CompletableFuture<Void> doFlush(final List<Holder> data, Histogram.Timer timer) { - // The batch is used to control the completion of the flush operation. - // There is at most one error per batch, - // because the database server would terminate the batch process when the first error occurs. - final CompletableFuture<Void> batch = new CompletableFuture<>(); - final StreamObserver<REQ> writeRequestStreamObserver - = this.buildStreamObserver(stub.withDeadlineAfter(timeout, TimeUnit.SECONDS), batch); - - try { - data.forEach(h -> { - AbstractWrite<REQ> entity = (AbstractWrite<REQ>) h.writeEntity(); - REQ request; - try { - request = entity.build(); - } catch (Throwable bt) { - log.error("building the entity fails: {}", entity.toString(), bt); - h.future().completeExceptionally(bt); - return; - } - writeRequestStreamObserver.onNext(request); - h.future().complete(null); - }); - } finally { - writeRequestStreamObserver.onCompleted(); - } - batch.whenComplete((ignored, exp) -> { - timer.observeDuration(); - if (exp != null) { - log.error("Failed to execute requests in bulk", exp); - } - }); - return batch; - } - - public void close() { - scheduler.shutdownNow(); - } - - protected abstract StreamObserver<REQ> buildStreamObserver(STUB stub, CompletableFuture<Void> batch); - - @AutoValue - static abstract class Holder { - abstract AbstractWrite writeEntity(); - - abstract CompletableFuture<Void> future(); - - public static <REQ extends com.google.protobuf.GeneratedMessageV3> Holder create(AbstractWrite<REQ> writeEntity, - CompletableFuture<Void> future) { - future.whenComplete((v, t) -> { - if (t != null) { - log.error("Failed to execute the request: {}", writeEntity.toString(), t); - } - }); - return new AutoValue_AbstractBulkWriteProcessor_Holder(writeEntity, future); - } - - } -} diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/AbstractWrite.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/AbstractWrite.java index bae244d..0f29c45 100644 --- a/src/main/java/org/apache/skywalking/banyandb/v1/client/AbstractWrite.java +++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/AbstractWrite.java @@ -44,6 +44,7 @@ public abstract class AbstractWrite<P extends com.google.protobuf.GeneratedMessa protected final Object[] tags; + @Getter protected final MetadataCache.EntityMetadata entityMetadata; public AbstractWrite(MetadataCache.EntityMetadata entityMetadata, long timestamp) { @@ -71,7 +72,7 @@ public abstract class AbstractWrite<P extends com.google.protobuf.GeneratedMessa return this; } - P build() { + public P build() { BanyandbCommon.Metadata metadata = BanyandbCommon.Metadata.newBuilder() .setGroup(entityMetadata.getGroup()).setName(entityMetadata.getName()).setModRevision(entityMetadata.getModRevision()).build(); diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java index bcae829..d9505c1 100644 --- a/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java +++ b/src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java @@ -26,11 +26,8 @@ import io.grpc.Channel; import io.grpc.ClientInterceptors; import io.grpc.ManagedChannel; import io.grpc.Status; -import io.grpc.stub.StreamObserver; -import io.prometheus.client.Histogram; import java.time.ZoneOffset; import java.time.ZonedDateTime; -import lombok.AccessLevel; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.skywalking.banyandb.common.v1.BanyandbCommon; @@ -44,12 +41,7 @@ import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.IndexRule; import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.IndexRuleBinding; import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.Subject; import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.Trace; -import org.apache.skywalking.banyandb.model.v1.BanyandbModel; import org.apache.skywalking.banyandb.property.v1.BanyandbProperty; -import org.apache.skywalking.banyandb.property.v1.BanyandbProperty.Property; -import org.apache.skywalking.banyandb.property.v1.BanyandbProperty.ApplyRequest.Strategy; -import org.apache.skywalking.banyandb.property.v1.BanyandbProperty.ApplyResponse; -import org.apache.skywalking.banyandb.property.v1.BanyandbProperty.DeleteResponse; import org.apache.skywalking.banyandb.measure.v1.BanyandbMeasure; import org.apache.skywalking.banyandb.measure.v1.MeasureServiceGrpc; import org.apache.skywalking.banyandb.stream.v1.BanyandbStream; @@ -61,8 +53,6 @@ import org.apache.skywalking.banyandb.v1.client.grpc.HandleExceptionsWith; import org.apache.skywalking.banyandb.v1.client.grpc.channel.ChannelManager; import org.apache.skywalking.banyandb.v1.client.grpc.channel.DefaultChannelFactory; import org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException; -import org.apache.skywalking.banyandb.v1.client.grpc.exception.InternalException; -import org.apache.skywalking.banyandb.v1.client.grpc.exception.InvalidArgumentException; import org.apache.skywalking.banyandb.v1.client.metadata.GroupMetadataRegistry; import org.apache.skywalking.banyandb.v1.client.metadata.IndexRuleBindingMetadataRegistry; import org.apache.skywalking.banyandb.v1.client.metadata.IndexRuleMetadataRegistry; @@ -81,12 +71,10 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; -import org.apache.skywalking.banyandb.v1.client.util.StatusUtil; import org.apache.skywalking.banyandb.v1.client.util.TimeUtils; import static com.google.common.base.Preconditions.checkNotNull; @@ -106,47 +94,46 @@ import static com.google.common.base.Preconditions.checkState; @Slf4j public class BanyanDBClient implements Closeable { public static final ZonedDateTime DEFAULT_EXPIRE_AT = ZonedDateTime.of(2099, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC); - private static Histogram WRITE_HISTOGRAM; private final String[] targets; /** * Options for server connection. */ - @Getter(value = AccessLevel.PACKAGE) + @Getter private final Options options; /** * gRPC connection. */ - @Getter(value = AccessLevel.PACKAGE) + @Getter private volatile Channel channel; /** * gRPC client stub */ - @Getter(value = AccessLevel.PACKAGE) + @Getter private StreamServiceGrpc.StreamServiceStub streamServiceStub; /** * gRPC client stub */ - @Getter(value = AccessLevel.PACKAGE) + @Getter private MeasureServiceGrpc.MeasureServiceStub measureServiceStub; /** * gRPC client stub */ - @Getter(value = AccessLevel.PACKAGE) + @Getter private TraceServiceGrpc.TraceServiceStub traceServiceStub; /** * gRPC future stub. */ - @Getter(value = AccessLevel.PACKAGE) + @Getter private StreamServiceGrpc.StreamServiceBlockingStub streamServiceBlockingStub; /** * gRPC future stub. */ - @Getter(value = AccessLevel.PACKAGE) + @Getter private MeasureServiceGrpc.MeasureServiceBlockingStub measureServiceBlockingStub; /** * gRPC future stub. */ - @Getter(value = AccessLevel.PACKAGE) + @Getter private TraceServiceGrpc.TraceServiceBlockingStub traceServiceBlockingStub; /** * The connection status. @@ -162,16 +149,6 @@ public class BanyanDBClient implements Closeable { */ private final MetadataCache metadataCache; - static { - // init prometheus metric - WRITE_HISTOGRAM = Histogram.build() - .name("banyandb_write_latency_seconds") - .help("BanyanDB Bulk Write latency in seconds") - .buckets(0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0) - .labelNames("catalog", "operation", "instanceID") - .register(); - } - /** * Create a BanyanDB client instance with a default options. * @@ -255,122 +232,6 @@ public class BanyanDBClient implements Closeable { } } - /** - * Perform a single write with given entity. - * - * @param streamWrite the entity to be written - * @return a future of write result - */ - public CompletableFuture<Void> write(StreamWrite streamWrite) { - checkState(this.streamServiceStub != null, "stream service is null"); - - Histogram.Timer timer - = WRITE_HISTOGRAM.labels( - "stream", - "single_write", // single write for non-bulk operation. - options.getPrometheusMetricsOpts().getClientID() - ) - .startTimer(); - CompletableFuture<Void> future = new CompletableFuture<>(); - final StreamObserver<BanyandbStream.WriteRequest> writeRequestStreamObserver - = this.streamServiceStub - .withDeadlineAfter(this.getOptions().getDeadline(), TimeUnit.SECONDS) - .write( - new StreamObserver<BanyandbStream.WriteResponse>() { - private BanyanDBException responseException; - - @Override - public void onNext(BanyandbStream.WriteResponse writeResponse) { - BanyandbModel.Status status = StatusUtil.convertStringToStatus(writeResponse.getStatus()); - switch (status) { - case STATUS_SUCCEED: - break; - case STATUS_INVALID_TIMESTAMP: - responseException = new InvalidArgumentException( - "Invalid timestamp: " + streamWrite.getTimestamp(), null, Status.Code.INVALID_ARGUMENT, false); - break; - case STATUS_NOT_FOUND: - responseException = new InvalidArgumentException( - "Invalid metadata: " + streamWrite.entityMetadata, null, Status.Code.INVALID_ARGUMENT, false); - break; - case STATUS_EXPIRED_SCHEMA: - BanyandbCommon.Metadata metadata = writeResponse.getMetadata(); - log.warn("The schema {}.{} is expired, trying update the schema...", - metadata.getGroup(), metadata.getName()); - try { - BanyanDBClient.this.updateStreamMetadataCacheFromSever(metadata.getGroup(), metadata.getName()); - } catch (BanyanDBException e) { - String warnMessage = String.format("Failed to refresh the stream schema %s.%s", - metadata.getGroup(), metadata.getName()); - log.warn(warnMessage, e); - } - responseException = new InvalidArgumentException( - "Expired revision: " + metadata.getModRevision(), null, Status.Code.INVALID_ARGUMENT, true); - break; - default: - responseException = new InternalException( - String.format("Internal error (%s) occurs in server", writeResponse.getStatus()), null, Status.Code.INTERNAL, true); - break; - } - } - - @Override - public void onError(Throwable throwable) { - timer.observeDuration(); - log.error("Error occurs in flushing streams.", throwable); - future.completeExceptionally(throwable); - } - - @Override - public void onCompleted() { - timer.observeDuration(); - if (responseException == null) { - future.complete(null); - } else { - future.completeExceptionally(responseException); - } - } - }); - try { - writeRequestStreamObserver.onNext(streamWrite.build()); - } finally { - writeRequestStreamObserver.onCompleted(); - } - return future; - } - - /** - * Create a build process for stream write. - * - * @param maxBulkSize the max bulk size for the flush operation - * @param flushInterval if given maxBulkSize is not reached in this period, the flush would be trigger - * automatically. Unit is second - * @param concurrency the number of concurrency would run for the flush max - * @param timeout network timeout threshold in seconds. - * @return stream bulk write processor - */ - public StreamBulkWriteProcessor buildStreamWriteProcessor(int maxBulkSize, int flushInterval, int concurrency, int timeout) { - checkState(this.streamServiceStub != null, "stream service is null"); - - return new StreamBulkWriteProcessor(this, maxBulkSize, flushInterval, concurrency, timeout, WRITE_HISTOGRAM, options); - } - - /** - * Create a build process for measure write. - * - * @param maxBulkSize the max bulk size for the flush operation - * @param flushInterval if given maxBulkSize is not reached in this period, the flush would be trigger - * automatically. Unit is second - * @param concurrency the number of concurrency would run for the flush max - * @param timeout network timeout threshold in seconds. - * @return stream bulk write processor - */ - public MeasureBulkWriteProcessor buildMeasureWriteProcessor(int maxBulkSize, int flushInterval, int concurrency, int timeout) { - checkState(this.measureServiceStub != null, "measure service is null"); - - return new MeasureBulkWriteProcessor(this, maxBulkSize, flushInterval, concurrency, timeout, WRITE_HISTOGRAM, options); - } - /** * Build a MeasureWrite request. * @@ -399,22 +260,6 @@ public class BanyanDBClient implements Closeable { return new StreamWrite(this.metadataCache.findStreamMetadata(group, name), elementId); } - /** - * Build a trace bulk write processor. - * - * @param maxBulkSize the max size of each flush. The actual size is determined by the length of byte array. - * @param flushInterval if given maxBulkSize is not reached in this period, the flush would be trigger - * automatically. Unit is second. - * @param concurrency the number of concurrency would run for the flush max. - * @param timeout network timeout threshold in seconds. - * @return trace bulk write processor - */ - public TraceBulkWriteProcessor buildTraceWriteProcessor(int maxBulkSize, int flushInterval, int concurrency, int timeout) { - checkState(this.traceServiceStub != null, "trace service is null"); - - return new TraceBulkWriteProcessor(this, maxBulkSize, flushInterval, concurrency, timeout, WRITE_HISTOGRAM, options); - } - /** * Build a TraceWrite request without initial timestamp. * @@ -958,40 +803,6 @@ public class BanyanDBClient implements Closeable { return registry.delete(group, name); } - /** - * Apply(Create or update) the property with {@link BanyandbProperty.ApplyRequest.Strategy#STRATEGY_MERGE} - * - * @param property the property to be stored in the BanyanBD - */ - public ApplyResponse apply(Property property) throws BanyanDBException { - PropertyStore store = new PropertyStore(checkNotNull(this.channel)); - try (Histogram.Timer timer = WRITE_HISTOGRAM.labels( - "property", - "single_write", - options.getPrometheusMetricsOpts().getClientID() - ).startTimer()) { - return store.apply(property); - } - } - - /** - * Apply(Create or update) the property - * - * @param property the property to be stored in the BanyanBD - * @param strategy dedicates how to apply the property - */ - public ApplyResponse apply(Property property, Strategy strategy) throws - BanyanDBException { - PropertyStore store = new PropertyStore(checkNotNull(this.channel)); - try (Histogram.Timer timer = WRITE_HISTOGRAM.labels( - "property", - "single_write", - options.getPrometheusMetricsOpts().getClientID() - ).startTimer()) { - return store.apply(property, strategy); - } - } - /** * Query properties * @@ -1003,26 +814,6 @@ public class BanyanDBClient implements Closeable { return store.query(request); } - /** - * Delete property - * - * @param group group of the metadata - * @param name name of the metadata - * @param id identity of the property - * @return if this property has been deleted - */ - public DeleteResponse deleteProperty(String group, String name, String id) throws - BanyanDBException { - PropertyStore store = new PropertyStore(checkNotNull(this.channel)); - try (Histogram.Timer timer = WRITE_HISTOGRAM.labels( - "property", - "delete", - options.getPrometheusMetricsOpts().getClientID() - ).startTimer()) { - return store.delete(group, name, id); - } - } - /** * Define a new trace * diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/MeasureBulkWriteProcessor.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/MeasureBulkWriteProcessor.java deleted file mode 100644 index ce17e66..0000000 --- a/src/main/java/org/apache/skywalking/banyandb/v1/client/MeasureBulkWriteProcessor.java +++ /dev/null @@ -1,124 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.skywalking.banyandb.v1.client; - -import io.grpc.stub.StreamObserver; -import io.prometheus.client.Histogram; -import java.util.List; -import lombok.extern.slf4j.Slf4j; - -import org.apache.skywalking.banyandb.common.v1.BanyandbCommon; -import org.apache.skywalking.banyandb.measure.v1.BanyandbMeasure; -import org.apache.skywalking.banyandb.measure.v1.MeasureServiceGrpc; -import org.apache.skywalking.banyandb.model.v1.BanyandbModel; -import org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException; -import org.apache.skywalking.banyandb.v1.client.util.StatusUtil; - -import javax.annotation.concurrent.ThreadSafe; - -import java.util.HashSet; -import java.util.Set; -import java.util.concurrent.CompletableFuture; - -/** - * MeasureBulkWriteProcessor works for measure flush. - */ -@Slf4j -@ThreadSafe -public class MeasureBulkWriteProcessor extends AbstractBulkWriteProcessor<BanyandbMeasure.WriteRequest, - MeasureServiceGrpc.MeasureServiceStub> { - private final BanyanDBClient client; - private final Histogram writeHistogram; - private final Options options; - - /** - * Create the processor. - * - * @param client the client - * @param maxBulkSize the max bulk size for the flush operation - * @param flushInterval if given maxBulkSize is not reached in this period, the flush would be trigger - * automatically. Unit is second. - * @param concurrency the number of concurrency would run for the flush max. - * @param timeout network timeout threshold in seconds. - */ - protected MeasureBulkWriteProcessor( - final BanyanDBClient client, - final int maxBulkSize, - final int flushInterval, - final int concurrency, - final int timeout, final Histogram writeHistogram, final Options options) { - super(client.getMeasureServiceStub(), "MeasureBulkWriteProcessor", maxBulkSize, flushInterval, concurrency, timeout); - this.client = client; - this.writeHistogram = writeHistogram; - this.options = options; - } - - @Override - protected StreamObserver<BanyandbMeasure.WriteRequest> buildStreamObserver(MeasureServiceGrpc.MeasureServiceStub stub, - CompletableFuture<Void> batch) { - return stub.write(new StreamObserver<BanyandbMeasure.WriteResponse>() { - private final Set<String> schemaExpired = new HashSet<>(); - - @Override - public void onNext(BanyandbMeasure.WriteResponse writeResponse) { - BanyandbModel.Status status = StatusUtil.convertStringToStatus(writeResponse.getStatus()); - switch (status) { - case STATUS_SUCCEED: - break; - case STATUS_EXPIRED_SCHEMA: - BanyandbCommon.Metadata metadata = writeResponse.getMetadata(); - String schemaKey = metadata.getGroup() + "." + metadata.getName(); - if (!schemaExpired.contains(schemaKey)) { - log.warn("The schema {} is expired, trying update the schema...", schemaKey); - try { - client.updateMeasureMetadataCacheFromSever(metadata.getGroup(), metadata.getName()); - schemaExpired.add(schemaKey); - } catch (BanyanDBException e) { - log.error(e.getMessage(), e); - } - } - break; - default: - log.warn("Write measure failed with status: {}", status); - } - } - - @Override - public void onError(Throwable t) { - batch.completeExceptionally(t); - log.error("Error occurs in flushing measures", t); - } - - @Override - public void onCompleted() { - batch.complete(null); - } - }); - } - - @Override - protected CompletableFuture<Void> doObservedFlush(final List<Holder> data) { - Histogram.Timer timer = writeHistogram.labels( - "measure", - "bulk_write", - options.getPrometheusMetricsOpts().getClientID() - ).startTimer(); - return super.doFlush(data, timer); - } -} diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/StreamBulkWriteProcessor.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/StreamBulkWriteProcessor.java deleted file mode 100644 index 01f042a..0000000 --- a/src/main/java/org/apache/skywalking/banyandb/v1/client/StreamBulkWriteProcessor.java +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.skywalking.banyandb.v1.client; - -import io.grpc.stub.StreamObserver; - -import io.prometheus.client.Histogram; -import java.util.List; -import lombok.extern.slf4j.Slf4j; - -import org.apache.skywalking.banyandb.common.v1.BanyandbCommon; -import org.apache.skywalking.banyandb.model.v1.BanyandbModel; -import org.apache.skywalking.banyandb.stream.v1.StreamServiceGrpc; -import org.apache.skywalking.banyandb.stream.v1.BanyandbStream; -import org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException; -import org.apache.skywalking.banyandb.v1.client.util.StatusUtil; - -import javax.annotation.concurrent.ThreadSafe; - -import java.util.HashSet; -import java.util.Set; -import java.util.concurrent.CompletableFuture; - -/** - * StreamBulkWriteProcessor works for stream flush. - */ -@Slf4j -@ThreadSafe -public class StreamBulkWriteProcessor extends AbstractBulkWriteProcessor<BanyandbStream.WriteRequest, - StreamServiceGrpc.StreamServiceStub> { - private final BanyanDBClient client; - private final Histogram writeHistogram; - private final Options options; - - /** - * Create the processor. - * - * @param client the client - * @param maxBulkSize the max bulk size for the flush operation - * @param flushInterval if given maxBulkSize is not reached in this period, the flush would be trigger - * automatically. Unit is second. - * @param timeout network timeout threshold in seconds. - * @param concurrency the number of concurrency would run for the flush max. - */ - protected StreamBulkWriteProcessor( - final BanyanDBClient client, - final int maxBulkSize, - final int flushInterval, - final int concurrency, - final int timeout, - final Histogram writeHistogram, - final Options options) { - super(client.getStreamServiceStub(), "StreamBulkWriteProcessor", maxBulkSize, flushInterval, concurrency, timeout); - this.client = client; - this.writeHistogram = writeHistogram; - this.options = options; - } - - @Override - protected StreamObserver<BanyandbStream.WriteRequest> buildStreamObserver(StreamServiceGrpc.StreamServiceStub stub, CompletableFuture<Void> batch) { - return stub.write( - new StreamObserver<BanyandbStream.WriteResponse>() { - private final Set<String> schemaExpired = new HashSet<>(); - - @Override - public void onNext(BanyandbStream.WriteResponse writeResponse) { - BanyandbModel.Status status = StatusUtil.convertStringToStatus(writeResponse.getStatus()); - switch (status) { - case STATUS_SUCCEED: - break; - case STATUS_EXPIRED_SCHEMA: - BanyandbCommon.Metadata metadata = writeResponse.getMetadata(); - String schemaKey = metadata.getGroup() + "." + metadata.getName(); - if (!schemaExpired.contains(schemaKey)) { - log.warn("The schema {} is expired, trying update the schema...", schemaKey); - try { - client.updateStreamMetadataCacheFromSever(metadata.getGroup(), metadata.getName()); - schemaExpired.add(schemaKey); - } catch (BanyanDBException e) { - log.error(e.getMessage(), e); - } - } - break; - default: - log.warn("Write stream failed with status: {}", status); - } - } - - @Override - public void onError(Throwable t) { - batch.completeExceptionally(t); - log.error("Error occurs in flushing streams", t); - } - - @Override - public void onCompleted() { - batch.complete(null); - } - }); - } - - @Override - protected CompletableFuture<Void> doObservedFlush(final List<Holder> data) { - Histogram.Timer timer = writeHistogram.labels( - "stream", - "bulk_write", - options.getPrometheusMetricsOpts().getClientID() - ).startTimer(); - return super.doFlush(data, timer); - } -} - diff --git a/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceBulkWriteProcessor.java b/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceBulkWriteProcessor.java deleted file mode 100644 index 96fdc4d..0000000 --- a/src/main/java/org/apache/skywalking/banyandb/v1/client/TraceBulkWriteProcessor.java +++ /dev/null @@ -1,127 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.skywalking.banyandb.v1.client; - -import io.grpc.stub.StreamObserver; - -import io.prometheus.client.Histogram; -import java.util.List; -import lombok.extern.slf4j.Slf4j; - -import org.apache.skywalking.banyandb.common.v1.BanyandbCommon; -import org.apache.skywalking.banyandb.model.v1.BanyandbModel; -import org.apache.skywalking.banyandb.trace.v1.TraceServiceGrpc; -import org.apache.skywalking.banyandb.trace.v1.BanyandbTrace; -import org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException; -import org.apache.skywalking.banyandb.v1.client.util.StatusUtil; - -import javax.annotation.concurrent.ThreadSafe; - -import java.util.HashSet; -import java.util.Set; -import java.util.concurrent.CompletableFuture; - -/** - * TraceBulkWriteProcessor works for trace flush. - */ -@Slf4j -@ThreadSafe -public class TraceBulkWriteProcessor extends AbstractBulkWriteProcessor<BanyandbTrace.WriteRequest, - TraceServiceGrpc.TraceServiceStub> { - private final BanyanDBClient client; - private final Histogram writeHistogram; - private final Options options; - - /** - * Create the processor. - * - * @param client the client - * @param maxBulkSize the max bulk size for the flush operation - * @param flushInterval if given maxBulkSize is not reached in this period, the flush would be trigger - * automatically. Unit is second. - * @param timeout network timeout threshold in seconds. - * @param concurrency the number of concurrency would run for the flush max. - */ - protected TraceBulkWriteProcessor( - final BanyanDBClient client, - final int maxBulkSize, - final int flushInterval, - final int concurrency, - final int timeout, - final Histogram writeHistogram, - final Options options) { - super(client.getTraceServiceStub(), "TraceBulkWriteProcessor", maxBulkSize, flushInterval, concurrency, timeout); - this.client = client; - this.writeHistogram = writeHistogram; - this.options = options; - } - - @Override - protected StreamObserver<BanyandbTrace.WriteRequest> buildStreamObserver(TraceServiceGrpc.TraceServiceStub stub, CompletableFuture<Void> batch) { - return stub.write( - new StreamObserver<BanyandbTrace.WriteResponse>() { - private final Set<String> schemaExpired = new HashSet<>(); - - @Override - public void onNext(BanyandbTrace.WriteResponse writeResponse) { - BanyandbModel.Status status = StatusUtil.convertStringToStatus(writeResponse.getStatus()); - switch (status) { - case STATUS_SUCCEED: - break; - case STATUS_EXPIRED_SCHEMA: - BanyandbCommon.Metadata metadata = writeResponse.getMetadata(); - String schemaKey = metadata.getGroup() + "." + metadata.getName(); - if (!schemaExpired.contains(schemaKey)) { - log.warn("The trace schema {} is expired, trying update the schema...", schemaKey); - try { - client.updateTraceMetadataCacheFromServer(metadata.getGroup(), metadata.getName()); - schemaExpired.add(schemaKey); - } catch (BanyanDBException e) { - log.error(e.getMessage(), e); - } - } - break; - default: - log.warn("Write trace failed with status: {}", status); - } - } - - @Override - public void onError(Throwable t) { - batch.completeExceptionally(t); - log.error("Error occurs in flushing traces", t); - } - - @Override - public void onCompleted() { - batch.complete(null); - } - }); - } - - @Override - protected CompletableFuture<Void> doObservedFlush(final List<Holder> data) { - Histogram.Timer timer = writeHistogram.labels( - "trace", - "bulk_write", - options.getPrometheusMetricsOpts().getClientID() - ).startTimer(); - return super.doFlush(data, timer); - } -} \ No newline at end of file diff --git a/src/test/java/org/apache/skywalking/banyandb/v1/client/ITBanyanDBMeasureQueryTests.java b/src/test/java/org/apache/skywalking/banyandb/v1/client/ITBanyanDBMeasureQueryTests.java index 67c96d5..fb3c0f5 100644 --- a/src/test/java/org/apache/skywalking/banyandb/v1/client/ITBanyanDBMeasureQueryTests.java +++ b/src/test/java/org/apache/skywalking/banyandb/v1/client/ITBanyanDBMeasureQueryTests.java @@ -20,6 +20,7 @@ package org.apache.skywalking.banyandb.v1.client; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; +import io.grpc.stub.StreamObserver; import org.apache.skywalking.banyandb.common.v1.BanyandbCommon.Group; import org.apache.skywalking.banyandb.common.v1.BanyandbCommon.Catalog; import org.apache.skywalking.banyandb.common.v1.BanyandbCommon.IntervalRule; @@ -34,6 +35,7 @@ import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.Measure; import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.TagFamilySpec; import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.TagSpec; import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.TagType; +import org.apache.skywalking.banyandb.measure.v1.BanyandbMeasure; import org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException; import org.apache.skywalking.banyandb.v1.client.metadata.Duration; import org.junit.After; @@ -44,7 +46,6 @@ import org.junit.Test; import java.io.IOException; import java.time.Instant; import java.time.temporal.ChronoUnit; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -52,7 +53,6 @@ import java.util.concurrent.TimeoutException; import static org.awaitility.Awaitility.await; public class ITBanyanDBMeasureQueryTests extends BanyanDBClientTestCI { - private MeasureBulkWriteProcessor processor; @Before public void setUp() throws IOException, BanyanDBException, InterruptedException { @@ -62,14 +62,10 @@ public class ITBanyanDBMeasureQueryTests extends BanyanDBClientTestCI { Assert.assertNotNull(expectedGroup); Measure expectedMeasure = buildMeasure(); client.define(expectedMeasure); - processor = client.buildMeasureWriteProcessor(1000, 1, 1, 10); } @After public void tearDown() throws IOException { - if (this.processor != null) { - this.processor.close(); - } this.closeClient(); } @@ -81,13 +77,29 @@ public class ITBanyanDBMeasureQueryTests extends BanyanDBClientTestCI { MeasureWrite measureWrite = client.createMeasureWrite("sw_metric", "service_cpm_minute", now.toEpochMilli()); measureWrite.tag("entity_id", TagAndValue.stringTagValue("entity_1")).field("total", TagAndValue.longFieldValue(100)).field("value", TagAndValue.longFieldValue(1)); + StreamObserver<BanyandbMeasure.WriteRequest> writeObserver + = client.getMeasureServiceStub().write(new StreamObserver<BanyandbMeasure.WriteResponse>() { + @Override + public void onNext(BanyandbMeasure.WriteResponse writeResponse) { + Assert.assertEquals("STATUS_SUCCEED", writeResponse.getStatus()); + } + + @Override + public void onError(Throwable throwable) { + Assert.fail("write failed: " + throwable.getMessage()); + } + + @Override + public void onCompleted() { - CompletableFuture<Void> f = processor.add(measureWrite); - f.exceptionally(exp -> { - Assert.fail(exp.getMessage()); - return null; + } }); - f.get(10, TimeUnit.SECONDS); + try { + writeObserver.onNext(measureWrite.build()); + + } finally { + writeObserver.onCompleted(); + } MeasureQuery query = new MeasureQuery(Lists.newArrayList("sw_metric"), "service_cpm_minute", new TimestampRange(begin.toEpochMilli(), now.plus(1, ChronoUnit.MINUTES).toEpochMilli()), ImmutableSet.of("entity_id"), // tags ImmutableSet.of("total")); // fields diff --git a/src/test/java/org/apache/skywalking/banyandb/v1/client/ITBanyanDBPropertyTests.java b/src/test/java/org/apache/skywalking/banyandb/v1/client/ITBanyanDBPropertyTests.java index be0fb09..e752e67 100644 --- a/src/test/java/org/apache/skywalking/banyandb/v1/client/ITBanyanDBPropertyTests.java +++ b/src/test/java/org/apache/skywalking/banyandb/v1/client/ITBanyanDBPropertyTests.java @@ -83,7 +83,8 @@ public class ITBanyanDBPropertyTests extends BanyanDBClientTestCI { Property property = buildProperty("default", "sw", "ui_template").toBuilder().addTags( Tag.newBuilder().setKey("name").setValue( TagValue.newBuilder().setStr(Str.newBuilder().setValue("hello")))).build(); - Assert.assertTrue(this.client.apply(property).getCreated()); + PropertyStore store = new PropertyStore(client.getChannel()); + Assert.assertTrue(store.apply(property).getCreated()); await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> { BanyandbProperty.QueryResponse resp = client.query(BanyandbProperty.QueryRequest.newBuilder() @@ -103,7 +104,8 @@ public class ITBanyanDBPropertyTests extends BanyanDBClientTestCI { Property property = buildProperty("default", "sw", "ui_template").toBuilder().addTags( Tag.newBuilder().setKey("name").setValue( TagValue.newBuilder().setStr(Str.newBuilder().setValue("hello")))).build(); - Assert.assertTrue(this.client.apply(property).getCreated()); + PropertyStore store = new PropertyStore(client.getChannel()); + Assert.assertTrue(store.apply(property).getCreated()); await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> { BanyandbProperty.QueryResponse resp = client.query(BanyandbProperty.QueryRequest.newBuilder() @@ -116,8 +118,8 @@ public class ITBanyanDBPropertyTests extends BanyanDBClientTestCI { Assert.assertNotNull(gotProperty); Assert.assertEquals(property.getTagsList(), gotProperty.getTagsList()); }); - - Assert.assertTrue(this.client.deleteProperty("default", "sw", "ui_template").getDeleted()); + BanyandbProperty.DeleteResponse result = store.delete("default", "sw", "ui_template"); + Assert.assertTrue(result.getDeleted()); BanyandbProperty.QueryResponse resp = client.query(BanyandbProperty.QueryRequest.newBuilder() .addGroups("default") .setName("sw") @@ -131,12 +133,13 @@ public class ITBanyanDBPropertyTests extends BanyanDBClientTestCI { Property property1 = buildProperty("default", "sw", "ui_template").toBuilder().addTags( Tag.newBuilder().setKey("name").setValue( TagValue.newBuilder().setStr(Str.newBuilder().setValue("hello")))).build(); - Assert.assertTrue(this.client.apply(property1).getCreated()); + PropertyStore store = new PropertyStore(client.getChannel()); + Assert.assertTrue(store.apply(property1).getCreated()); Property property2 = buildProperty("default", "sw", "ui_template").toBuilder().addTags( Tag.newBuilder().setKey("name").setValue( TagValue.newBuilder().setStr(Str.newBuilder().setValue("word")))).build(); - Assert.assertFalse(this.client.apply(property2).getCreated()); + Assert.assertFalse(store.apply(property2).getCreated()); await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> { BanyandbProperty.QueryResponse resp = client.query(BanyandbProperty.QueryRequest.newBuilder() @@ -156,11 +159,12 @@ public class ITBanyanDBPropertyTests extends BanyanDBClientTestCI { Property property = buildProperty("default", "sw", "id1").toBuilder().addTags( Tag.newBuilder().setKey("name").setValue( TagValue.newBuilder().setStr(Str.newBuilder().setValue("bar")))).build(); - Assert.assertTrue(this.client.apply(property).getCreated()); + PropertyStore store = new PropertyStore(client.getChannel()); + Assert.assertTrue(store.apply(property).getCreated()); property = buildProperty("default", "sw", "id2").toBuilder().addTags( Tag.newBuilder().setKey("name").setValue( TagValue.newBuilder().setStr(Str.newBuilder().setValue("foo")))).build(); - Assert.assertTrue(this.client.apply(property).getCreated()); + Assert.assertTrue(store.apply(property).getCreated()); await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> { client.query(new PropertyQuery(Lists.newArrayList("default"), "sw", ImmutableSet.of("name")).build(null)); @@ -194,11 +198,12 @@ public class ITBanyanDBPropertyTests extends BanyanDBClientTestCI { Property property = buildProperty("default", "sw", "id1").toBuilder().addTags( Tag.newBuilder().setKey("name").setValue( TagValue.newBuilder().setStr(Str.newBuilder().setValue("bar")))).build(); - Assert.assertTrue(this.client.apply(property).getCreated()); + PropertyStore store = new PropertyStore(client.getChannel()); + Assert.assertTrue(store.apply(property).getCreated()); property = buildProperty("default", "sw", "id2").toBuilder().addTags( Tag.newBuilder().setKey("name").setValue( TagValue.newBuilder().setStr(Str.newBuilder().setValue("foo")))).build(); - Assert.assertTrue(this.client.apply(property).getCreated()); + Assert.assertTrue(store.apply(property).getCreated()); await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> { BanyandbProperty.QueryResponse resp = client.query(new PropertyQuery(Lists.newArrayList("default"), "sw", ImmutableSet.of("name")).build()); diff --git a/src/test/java/org/apache/skywalking/banyandb/v1/client/ITBanyanDBStreamQueryTests.java b/src/test/java/org/apache/skywalking/banyandb/v1/client/ITBanyanDBStreamQueryTests.java index c394138..f30ceca 100644 --- a/src/test/java/org/apache/skywalking/banyandb/v1/client/ITBanyanDBStreamQueryTests.java +++ b/src/test/java/org/apache/skywalking/banyandb/v1/client/ITBanyanDBStreamQueryTests.java @@ -20,6 +20,7 @@ package org.apache.skywalking.banyandb.v1.client; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; +import io.grpc.stub.StreamObserver; import java.time.ZoneOffset; import java.time.ZonedDateTime; import java.util.Arrays; @@ -37,6 +38,7 @@ import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.TagType; import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.Stream; import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.IndexRule; import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.IndexRuleBinding; +import org.apache.skywalking.banyandb.stream.v1.BanyandbStream; import org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException; import org.apache.skywalking.banyandb.v1.client.util.TimeUtils; import org.junit.After; @@ -46,7 +48,6 @@ import org.junit.Test; import java.io.IOException; import java.time.Instant; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -55,7 +56,6 @@ import static org.apache.skywalking.banyandb.v1.client.BanyanDBClient.DEFAULT_EX import static org.awaitility.Awaitility.await; public class ITBanyanDBStreamQueryTests extends BanyanDBClientTestCI { - private StreamBulkWriteProcessor processor; @Before public void setUp() throws IOException, BanyanDBException, InterruptedException { @@ -64,14 +64,10 @@ public class ITBanyanDBStreamQueryTests extends BanyanDBClientTestCI { this.client.define(buildStream()); this.client.define(buildIndexRule()); this.client.define(buildIndexRuleBinding()); - processor = client.buildStreamWriteProcessor(1000, 1, 1, 10); } @After public void tearDown() throws IOException { - if (processor != null) { - this.processor.close(); - } this.closeClient(); } @@ -110,13 +106,28 @@ public class ITBanyanDBStreamQueryTests extends BanyanDBClientTestCI { .tag("mq.topic", Value.stringTagValue(topic)) // 11 .tag("mq.queue", Value.stringTagValue(queue)); // 12 streamWrite.setTimestamp(now.toEpochMilli()); + StreamObserver<BanyandbStream.WriteRequest> writeObserver + = client.getStreamServiceStub().write(new StreamObserver<BanyandbStream.WriteResponse>() { + @Override + public void onNext(BanyandbStream.WriteResponse writeResponse) { + Assert.assertEquals("STATUS_SUCCEED", writeResponse.getStatus()); + } + + @Override + public void onError(Throwable throwable) { + Assert.fail("write failed: " + throwable.getMessage()); + } - CompletableFuture<Void> f = processor.add(streamWrite); - f.exceptionally(exp -> { - Assert.fail(exp.getMessage()); - return null; + @Override + public void onCompleted() { + } }); - f.get(10, TimeUnit.SECONDS); + try { + writeObserver.onNext(streamWrite.build()); + + } finally { + writeObserver.onCompleted(); + } StreamQuery query = new StreamQuery( Lists.newArrayList("sw_record"), "trace", ImmutableSet.of("state", "duration", "trace_id", "data_binary")); diff --git a/src/test/java/org/apache/skywalking/banyandb/v1/client/ITTraceTest.java b/src/test/java/org/apache/skywalking/banyandb/v1/client/ITTraceTest.java index 15a16df..9e608cc 100644 --- a/src/test/java/org/apache/skywalking/banyandb/v1/client/ITTraceTest.java +++ b/src/test/java/org/apache/skywalking/banyandb/v1/client/ITTraceTest.java @@ -20,8 +20,10 @@ package org.apache.skywalking.banyandb.v1.client; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; +import io.grpc.stub.StreamObserver; import org.apache.skywalking.banyandb.common.v1.BanyandbCommon; import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase; +import org.apache.skywalking.banyandb.trace.v1.BanyandbTrace; import org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException; import org.apache.skywalking.banyandb.v1.client.util.TimeUtils; import org.junit.After; @@ -35,7 +37,6 @@ import java.time.ZoneOffset; import java.time.ZonedDateTime; import java.util.Arrays; import java.util.Collections; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -50,7 +51,6 @@ import static org.awaitility.Awaitility.await; public class ITTraceTest extends BanyanDBClientTestCI { private final String groupName = "sw_trace"; private final String traceName = "trace_data"; - private TraceBulkWriteProcessor processor; @Before public void setUp() throws IOException, BanyanDBException, InterruptedException { @@ -95,15 +95,10 @@ public class ITTraceTest extends BanyanDBClientTestCI { this.client.define(trace); this.client.define(buildIndexRule()); this.client.define(buildIndexRuleBinding()); - - processor = client.buildTraceWriteProcessor(1000, 1, 1, 10); } @After public void tearDown() throws IOException { - if (processor != null) { - processor.close(); - } this.closeClient(); } @@ -136,14 +131,36 @@ public class ITTraceTest extends BanyanDBClientTestCI { .tag("start_time", Value.timestampTagValue(now.toEpochMilli())) .span(spanData) .version(1L); - - // Write the trace via bulk processor - CompletableFuture<Void> writeFuture = processor.add(traceWrite); - writeFuture.exceptionally(exp -> { - Assert.fail("Write failed: " + exp.getMessage()); - return null; + + StreamObserver<BanyandbTrace.WriteRequest> writeObserver + = client.getTraceServiceStub().write(new StreamObserver<BanyandbTrace.WriteResponse>() { + @Override + public void onNext(BanyandbTrace.WriteResponse writeResponse) { + Assert.assertEquals("STATUS_SUCCEED", writeResponse.getStatus()); + } + + @Override + public void onError(Throwable throwable) { + Assert.fail("write failed: " + throwable.getMessage()); + } + + @Override + public void onCompleted() { + + } }); - writeFuture.get(10, TimeUnit.SECONDS); + try { + writeObserver.onNext(traceWrite.build()); + } finally { + writeObserver.onCompleted(); + } + // Write the trace via bulk processor +// CompletableFuture<Void> writeFuture = processor.add(traceWrite); +// writeFuture.exceptionally(exp -> { +// Assert.fail("Write failed: " + exp.getMessage()); +// return null; +// }); +// writeFuture.get(10, TimeUnit.SECONDS); // Create trace query with trace_id condition TraceQuery query = new TraceQuery( @@ -211,13 +228,30 @@ public class ITTraceTest extends BanyanDBClientTestCI { .tag("start_time", Value.timestampTagValue(baseTime.plusSeconds(120).toEpochMilli())) .span("span-data-3".getBytes()) .version(1L); - - // Write the traces via bulk processor - CompletableFuture<Void> future1 = processor.add(trace1); - CompletableFuture<Void> future2 = processor.add(trace2); - CompletableFuture<Void> future3 = processor.add(trace3); - - CompletableFuture.allOf(future1, future2, future3).get(10, TimeUnit.SECONDS); + StreamObserver<BanyandbTrace.WriteRequest> writeObserver + = client.getTraceServiceStub().write(new StreamObserver<BanyandbTrace.WriteResponse>() { + @Override + public void onNext(BanyandbTrace.WriteResponse writeResponse) { + Assert.assertEquals("STATUS_SUCCEED", writeResponse.getStatus()); + } + + @Override + public void onError(Throwable throwable) { + Assert.fail("write failed: " + throwable.getMessage()); + } + + @Override + public void onCompleted() { + + } + }); + try { + writeObserver.onNext(trace1.build()); + writeObserver.onNext(trace2.build()); + writeObserver.onNext(trace3.build()); + } finally { + writeObserver.onCompleted(); + } // Create trace query with order by start_time (no trace_id condition as it interferes with ordering) TraceQuery query = new TraceQuery(
