This is an automated email from the ASF dual-hosted git repository. lujiajing pushed a commit to branch banyandb-integration-stream in repository https://gitbox.apache.org/repos/asf/skywalking.git
commit dffec7a8a564750484fc844e11daf5ad315477bc Author: Megrez Lu <[email protected]> AuthorDate: Fri May 6 09:39:50 2022 +0800 support update --- .../storage/plugin/banyandb/BanyanDBBatchDAO.java | 3 ++ .../measure/BanyanDBMeasureUpdateRequest.java | 30 ++++++++++++++++++++ .../banyandb/measure/BanyanDBMetricsDAO.java | 33 ++++++++++++++++++++-- 3 files changed, 63 insertions(+), 3 deletions(-) diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBBatchDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBBatchDAO.java index bb776c6dfe..b81a91c93a 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBBatchDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBBatchDAO.java @@ -26,6 +26,7 @@ import org.apache.skywalking.oap.server.library.client.request.InsertRequest; import org.apache.skywalking.oap.server.library.client.request.PrepareRequest; import org.apache.skywalking.oap.server.library.util.CollectionUtils; import org.apache.skywalking.oap.server.storage.plugin.banyandb.measure.BanyanDBMeasureInsertRequest; +import org.apache.skywalking.oap.server.storage.plugin.banyandb.measure.BanyanDBMeasureUpdateRequest; import org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBStreamInsertRequest; import java.util.List; @@ -79,6 +80,8 @@ public class BanyanDBBatchDAO extends AbstractDAO<BanyanDBStorageClient> impleme this.streamBulkWriteProcessor.add(((BanyanDBStreamInsertRequest) prepareRequest).getStreamWrite()); } else if (prepareRequest instanceof BanyanDBMeasureInsertRequest) { this.measureBulkWriteProcessor.add(((BanyanDBMeasureInsertRequest) prepareRequest).getMeasureWrite()); + } else if (prepareRequest instanceof BanyanDBMeasureUpdateRequest) { + this.measureBulkWriteProcessor.add(((BanyanDBMeasureUpdateRequest) prepareRequest).getMeasureWrite()); } return CompletableFuture.completedFuture(null); }).toArray(CompletableFuture[]::new)); diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMeasureUpdateRequest.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMeasureUpdateRequest.java new file mode 100644 index 0000000000..2ff1fae015 --- /dev/null +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMeasureUpdateRequest.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.storage.plugin.banyandb.measure; + +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import org.apache.skywalking.banyandb.v1.client.MeasureWrite; +import org.apache.skywalking.oap.server.library.client.request.UpdateRequest; + +@RequiredArgsConstructor +@Getter +public class BanyanDBMeasureUpdateRequest implements UpdateRequest { + private final MeasureWrite measureWrite; +} diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetricsDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetricsDAO.java index 3f14de822c..492ed38d88 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetricsDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetricsDAO.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + package org.apache.skywalking.oap.server.storage.plugin.banyandb.measure; import lombok.extern.slf4j.Slf4j; @@ -13,7 +31,6 @@ import org.apache.skywalking.oap.server.core.analysis.manual.service.ServiceTraf import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics; import org.apache.skywalking.oap.server.core.storage.IMetricsDAO; import org.apache.skywalking.oap.server.core.storage.model.Model; -import org.apache.skywalking.oap.server.core.storage.type.HashMapConverter; import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder; import org.apache.skywalking.oap.server.library.client.request.InsertRequest; import org.apache.skywalking.oap.server.library.client.request.UpdateRequest; @@ -98,7 +115,17 @@ public class BanyanDBMetricsDAO extends AbstractBanyanDBDAO implements IMetricsD @Override public UpdateRequest prepareBatchUpdate(Model model, Metrics metrics) throws IOException { - return new UpdateRequest() { - }; + log.info("prepare to update {}", model.getName()); + MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(model.getName()); + if (schema == null) { + throw new IOException(model.getName() + " is not registered"); + } + MeasureWrite measureWrite = new MeasureWrite(schema.getMetadata().getGroup(), // group name + model.getName(), // index-name + TimeBucket.getTimestamp(metrics.getTimeBucket(), model.getDownsampling())); // timestamp + final BanyanDBConverter.MeasureToStorage toStorage = new BanyanDBConverter.MeasureToStorage(schema, measureWrite); + storageBuilder.entity2Storage(metrics, toStorage); + toStorage.acceptID(metrics.id()); + return new BanyanDBMeasureUpdateRequest(toStorage.obtain()); } }
