This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch banyandb-process in repository https://gitbox.apache.org/repos/asf/skywalking.git
commit e2b55fc73e3c1b063620fbc608f6ab71c67bbe0b Author: Gao Hongtao <[email protected]> AuthorDate: Tue Jun 27 09:39:31 2023 +0800 Handle StorageIDTag Signed-off-by: Gao Hongtao <[email protected]> --- .../banyandb/measure/BanyanDBMetricsDAO.java | 56 ++++++++++++++-------- 1 file changed, 37 insertions(+), 19 deletions(-) diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetricsDAO.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetricsDAO.java index 767da1cb92..d1350cfa64 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetricsDAO.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetricsDAO.java @@ -43,6 +43,7 @@ import org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.AbstractB import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -68,25 +69,44 @@ public class BanyanDBMetricsDAO extends AbstractBanyanDBDAO implements IMetricsD if (schema == null) { throw new IOException(model.getName() + " is not registered"); } - String tc = model.getBanyanDBModelExtension().getTimestampColumn(); - final String tsCol = Strings.isBlank(tc) ? TIME_BUCKET : tc; - long begin = 0L, end = 0L; final Map<String, List<String>> seriesIDColumns = new HashMap<>(); - model.getColumns().forEach(c -> { - BanyanDBExtension ext = c.getBanyanDBExtension(); - if (ext == null) { - return; - } - if (ext.isShardingKey()) { - seriesIDColumns.put(c.getColumnName().getName(), new ArrayList<>()); + if (model.getBanyanDBModelExtension().isStoreIDTag()) { + seriesIDColumns.put(BanyanDBConverter.ID, new ArrayList<>()); + } else { + model.getColumns().forEach(c -> { + BanyanDBExtension ext = c.getBanyanDBExtension(); + if (ext == null) { + return; + } + if (ext.isShardingKey()) { + seriesIDColumns.put(c.getColumnName().getName(), new ArrayList<>()); + } + }); + if (seriesIDColumns.isEmpty()) { + seriesIDColumns.put(ENTITY_ID, new ArrayList<>()); } - }); - if (seriesIDColumns.isEmpty()) { - seriesIDColumns.put(ENTITY_ID, new ArrayList<>()); } + + String tc = model.getBanyanDBModelExtension().getTimestampColumn(); + final String tsCol = Strings.isBlank(tc) ? TIME_BUCKET : tc; + long begin = 0L, end = 0L; StringBuilder idStr = new StringBuilder(); for (Metrics m : metrics) { - AnalyticalResult result = analyze(m, tsCol, seriesIDColumns); + List<StorageID.Fragment> fragments = m.id().read(); + if (model.getBanyanDBModelExtension().isStoreIDTag()) { + if (fragments.size() != 1) { + log.error("[{}]fragments' size is more than expected", fragments); + continue; + } + Object val = fragments.get(0).getValue(); + fragments = Arrays.asList(new StorageID.Fragment( + new String[]{BanyanDBConverter.ID}, + String.class, + true, + val)); + } + AnalyticalResult result = analyze(fragments, tsCol, seriesIDColumns); + idStr.append(result.cols()).append("=").append(m.id().build()).append(","); if (!result.success) { continue; @@ -180,9 +200,7 @@ public class BanyanDBMetricsDAO extends AbstractBanyanDBDAO implements IMetricsD } } - private AnalyticalResult analyze(Metrics m, String tsCol, Map<String, List<String>> seriesIDColumns) { - StorageID id = m.id(); - List<StorageID.Fragment> fragments = id.read(); + private AnalyticalResult analyze(List<StorageID.Fragment> fragments, String tsCol, Map<String, List<String>> seriesIDColumns) { AnalyticalResult result = new AnalyticalResult(); for (StorageID.Fragment f : fragments) { Optional<String[]> cols = f.getName(); @@ -202,12 +220,12 @@ public class BanyanDBMetricsDAO extends AbstractBanyanDBDAO implements IMetricsD Preconditions.checkState(f.getType().equals(String.class)); seriesIDColumns.get(col).add((String) f.getValue()); } else { - log.error("col [{}] in fragment [{}] in id [{}] is not ts or seriesID", col, f, id.build()); + log.error("col [{}] in fragment [{}] id [{}] is not ts or seriesID", col, f, fragments); return result; } } } else { - log.error("fragment [{}] in id [{}] doesn't contains cols", f, id.build()); + log.error("fragment [{}] in id [{}] doesn't contains cols", f, fragments); return result; } }
