lujiajing1126 commented on code in PR #47:
URL: 
https://github.com/apache/skywalking-banyandb-java-client/pull/47#discussion_r1327122199


##########
src/main/java/org/apache/skywalking/banyandb/v1/client/StreamBulkWriteProcessor.java:
##########
@@ -34,29 +40,50 @@
 @ThreadSafe
 public class StreamBulkWriteProcessor extends 
AbstractBulkWriteProcessor<BanyandbStream.WriteRequest,
         StreamServiceGrpc.StreamServiceStub> {
+    private final BanyanDBClient client;
+
     /**
      * Create the processor.
      *
-     * @param serviceStub   stub for gRPC call.
+     * @param client        the client
      * @param maxBulkSize   the max bulk size for the flush operation
      * @param flushInterval if given maxBulkSize is not reached in this 
period, the flush would be trigger
      *                      automatically. Unit is second.
      * @param concurrency   the number of concurrency would run for the flush 
max.
      */
     protected StreamBulkWriteProcessor(
-            final StreamServiceGrpc.StreamServiceStub serviceStub,
+            final BanyanDBClient client,
             final int maxBulkSize,
             final int flushInterval,
             final int concurrency) {
-        super(serviceStub, "StreamBulkWriteProcessor", maxBulkSize, 
flushInterval, concurrency);
+        super(client.getStreamServiceStub(), "StreamBulkWriteProcessor", 
maxBulkSize, flushInterval, concurrency);
+        this.client = client;
     }
 
     @Override
     protected StreamObserver<BanyandbStream.WriteRequest> 
buildStreamObserver(StreamServiceGrpc.StreamServiceStub stub, 
CompletableFuture<Void> batch) {
         return stub.write(
                 new StreamObserver<BanyandbStream.WriteResponse>() {
+                    private final Set<String> schemaExpired = new HashSet<>();
+
                     @Override
                     public void onNext(BanyandbStream.WriteResponse 
writeResponse) {
+                        switch (writeResponse.getStatus()) {
+                            case STATUS_EXPIRED_SCHEMA:
+                                BanyandbCommon.Metadata metadata = 
writeResponse.getMetadata();
+                                String schemaKey = metadata.getGroup() + "." + 
metadata.getName();
+                                if (!schemaExpired.contains(schemaKey)) {
+                                    log.warn("The schema {} is expired, trying 
update the schema...", schemaKey);
+                                    try {
+                                        client.findStream(metadata.getGroup(), 
metadata.getName());
+                                        schemaExpired.add(schemaKey);
+                                    } catch (BanyanDBException e) {
+                                        throw new RuntimeException(e);

Review Comment:
   What would happen if we throw an exception here?



##########
src/main/proto/banyandb/v1/banyandb-stream.proto:
##########
@@ -84,13 +84,22 @@ message ElementValue {
 }
 
 message WriteRequest {
-  // the metadata is only required in the first write.
+  // the metadata is required.
   common.v1.Metadata metadata = 1 [(validate.rules).message.required = true];
   // the element is required.
   ElementValue element = 2 [(validate.rules).message.required = true];
+  // the message_id is required.
+  uint64 message_id = 3 [(validate.rules).uint64.gt = 0];
 }
 
-message WriteResponse {}
+message WriteResponse {
+  // the message_id from request.
+  uint64 message_id = 1 [(validate.rules).uint64.gt = 0];
+  // status indicates the request processing result
+  model.v1.Status status = 2 [(validate.rules).enum.defined_only = true];
+  // the metadata from request when request fails
+  common.v1.Metadata metadata = 3 [(validate.rules).message.required = true];

Review Comment:
   > // the metadata from request when request fails
   
   So it is not required, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to