This is an automated email from the ASF dual-hosted git repository.

lujiajing pushed a commit to branch banyandb-integration-stream
in repository https://gitbox.apache.org/repos/asf/skywalking.git

commit dffec7a8a564750484fc844e11daf5ad315477bc
Author: Megrez Lu <[email protected]>
AuthorDate: Fri May 6 09:39:50 2022 +0800

    support update
---
 .../storage/plugin/banyandb/BanyanDBBatchDAO.java  |  3 ++
 .../measure/BanyanDBMeasureUpdateRequest.java      | 30 ++++++++++++++++++++
 .../banyandb/measure/BanyanDBMetricsDAO.java       | 33 ++++++++++++++++++++--
 3 files changed, 63 insertions(+), 3 deletions(-)

diff --git 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBBatchDAO.java
 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBBatchDAO.java
index bb776c6dfe..b81a91c93a 100644
--- 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBBatchDAO.java
+++ 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBBatchDAO.java
@@ -26,6 +26,7 @@ import 
org.apache.skywalking.oap.server.library.client.request.InsertRequest;
 import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
 import org.apache.skywalking.oap.server.library.util.CollectionUtils;
 import 
org.apache.skywalking.oap.server.storage.plugin.banyandb.measure.BanyanDBMeasureInsertRequest;
+import 
org.apache.skywalking.oap.server.storage.plugin.banyandb.measure.BanyanDBMeasureUpdateRequest;
 import 
org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBStreamInsertRequest;
 
 import java.util.List;
@@ -79,6 +80,8 @@ public class BanyanDBBatchDAO extends 
AbstractDAO<BanyanDBStorageClient> impleme
                     
this.streamBulkWriteProcessor.add(((BanyanDBStreamInsertRequest) 
prepareRequest).getStreamWrite());
                 } else if (prepareRequest instanceof 
BanyanDBMeasureInsertRequest) {
                     
this.measureBulkWriteProcessor.add(((BanyanDBMeasureInsertRequest) 
prepareRequest).getMeasureWrite());
+                } else if (prepareRequest instanceof 
BanyanDBMeasureUpdateRequest) {
+                    
this.measureBulkWriteProcessor.add(((BanyanDBMeasureUpdateRequest) 
prepareRequest).getMeasureWrite());
                 }
                 return CompletableFuture.completedFuture(null);
             }).toArray(CompletableFuture[]::new));
diff --git 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMeasureUpdateRequest.java
 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMeasureUpdateRequest.java
new file mode 100644
index 0000000000..2ff1fae015
--- /dev/null
+++ 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMeasureUpdateRequest.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.measure;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import org.apache.skywalking.banyandb.v1.client.MeasureWrite;
+import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
+
+@RequiredArgsConstructor
+@Getter
+public class BanyanDBMeasureUpdateRequest implements UpdateRequest {
+    private final MeasureWrite measureWrite;
+}
diff --git 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetricsDAO.java
 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetricsDAO.java
index 3f14de822c..492ed38d88 100644
--- 
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetricsDAO.java
+++ 
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetricsDAO.java
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
 package org.apache.skywalking.oap.server.storage.plugin.banyandb.measure;
 
 import lombok.extern.slf4j.Slf4j;
@@ -13,7 +31,6 @@ import 
org.apache.skywalking.oap.server.core.analysis.manual.service.ServiceTraf
 import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
 import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
 import org.apache.skywalking.oap.server.core.storage.model.Model;
-import org.apache.skywalking.oap.server.core.storage.type.HashMapConverter;
 import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
 import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
 import org.apache.skywalking.oap.server.library.client.request.UpdateRequest;
@@ -98,7 +115,17 @@ public class BanyanDBMetricsDAO extends AbstractBanyanDBDAO 
implements IMetricsD
 
     @Override
     public UpdateRequest prepareBatchUpdate(Model model, Metrics metrics) 
throws IOException {
-        return new UpdateRequest() {
-        };
+        log.info("prepare to update {}", model.getName());
+        MetadataRegistry.Schema schema = 
MetadataRegistry.INSTANCE.findMetadata(model.getName());
+        if (schema == null) {
+            throw new IOException(model.getName() + " is not registered");
+        }
+        MeasureWrite measureWrite = new 
MeasureWrite(schema.getMetadata().getGroup(), // group name
+                model.getName(), // index-name
+                TimeBucket.getTimestamp(metrics.getTimeBucket(), 
model.getDownsampling())); // timestamp
+        final BanyanDBConverter.MeasureToStorage toStorage = new 
BanyanDBConverter.MeasureToStorage(schema, measureWrite);
+        storageBuilder.entity2Storage(metrics, toStorage);
+        toStorage.acceptID(metrics.id());
+        return new BanyanDBMeasureUpdateRequest(toStorage.obtain());
     }
 }

Reply via email to