This is an automated email from the ASF dual-hosted git repository. lujiajing pushed a commit to branch banyandb-integration-stream in repository https://gitbox.apache.org/repos/asf/skywalking.git
commit c0f606c35663a3ac4c8870a58471bbb97a52ad5f Author: Megrez Lu <[email protected]> AuthorDate: Sat Apr 30 16:55:24 2022 +0800 add measure id and check sharding keys --- .../storage/plugin/banyandb/BanyanDBConverter.java | 2 +- .../storage/plugin/banyandb/MetadataRegistry.java | 42 ++++++++++++++++------ 2 files changed, 32 insertions(+), 12 deletions(-) diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBConverter.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBConverter.java index c512b5653c..c3c936ef84 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBConverter.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBConverter.java @@ -45,7 +45,7 @@ public class BanyanDBConverter { @Override public Object get(String fieldName) { MetadataRegistry.ColumnSpec spec = schema.getSpec(fieldName); - if (double.class.equals(spec.getModelColumn().getType())) { + if (double.class.equals(spec.getColumnClass())) { return ByteUtil.bytes2Double(rowEntity.getTagValue(fieldName)); } else { return rowEntity.getTagValue(fieldName); diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java index 6c6fbb7658..3ff4cb2833 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java @@ -67,6 +67,7 @@ import java.util.stream.Collectors; public enum MetadataRegistry { INSTANCE; + private static final String ID = "id"; private final Map<String, Schema> registry = new ConcurrentHashMap<>(); public NamedSchema<?> registerModel(Model model, ConfigService configService) { @@ -87,25 +88,33 @@ public enum MetadataRegistry { .filter(Objects::nonNull) .collect(Collectors.toList()); - if (partialMetadata.getKind() == Kind.STREAM) { final Stream.Builder builder = Stream.create(partialMetadata.getGroup(), partialMetadata.getName()); + if (entities.isEmpty()) { + log.warn("sharding keys of model[stream.{}] must not be empty", model.getName()); +// throw new IllegalStateException("sharding keys of model[" + model.getName() + "] must not be empty"); + } builder.setEntityRelativeTags(entities); builder.addTagFamilies(tagFamilySpecs); builder.addIndexes(indexRules); - registry.put(model.getName(), schemaBuilder.build()); return builder.build(); } else { final Measure.Builder builder = Measure.create(partialMetadata.getGroup(), partialMetadata.getName(), downSamplingDuration(model.getDownsampling())); - builder.setEntityRelativeTags(entities); + if (entities.isEmpty()) { // if shardingKeys is empty, for measure, we can use ID as a single sharding key. + builder.setEntityRelativeTags(ID); + } else { + builder.setEntityRelativeTags(entities); + } builder.addTagFamilies(tagFamilySpecs); builder.addIndexes(indexRules); // parse and set field Optional<ValueColumnMetadata.ValueColumn> valueColumnOpt = ValueColumnMetadata.INSTANCE .readValueColumnDefinition(model.getName()); valueColumnOpt.ifPresent(valueColumn -> builder.addField(parseFieldSpec(modelColumnMap.get(valueColumn.getValueCName()), valueColumn))); + // register ID + schemaBuilder.spec(ID, new ColumnSpec(ColumnType.TAG, String.class)); registry.put(model.getName(), schemaBuilder.build()); return builder.build(); @@ -194,14 +203,14 @@ public enum MetadataRegistry { .readValueColumnDefinition(model.getName()); for (final ModelColumn col : model.getColumns()) { if (valueColumnOpt.isPresent() && valueColumnOpt.get().getValueCName().equals(col.getColumnName().getStorageName())) { - builder.spec(col.getColumnName().getStorageName(), new ColumnSpec(ColumnType.FIELD, col)); + builder.spec(col.getColumnName().getStorageName(), new ColumnSpec(ColumnType.FIELD, col.getType())); continue; } final TagFamilySpec.TagSpec tagSpec = parseTagSpec(col); if (tagSpec == null) { continue; } - builder.spec(col.getColumnName().getStorageName(), new ColumnSpec(ColumnType.TAG, col)); + builder.spec(col.getColumnName().getStorageName(), new ColumnSpec(ColumnType.TAG, col.getType())); if (col.shouldIndex()) { // build indexRule IndexRule indexRule = parseIndexRule(tagSpec.getTagName(), col); @@ -316,11 +325,22 @@ public enum MetadataRegistry { } private List<TagFamilySpec> extractTagFamilySpec(List<TagMetadata> tagMetadataList) { - return tagMetadataList.stream().collect(Collectors.groupingBy(tagMetadata -> tagMetadata.isIndex() ? PartialMetadata.this.indexFamily() : PartialMetadata.this.nonIndexFamily())) - .entrySet().stream() - .map(entry -> TagFamilySpec.create(entry.getKey()) - .addTagSpecs(entry.getValue().stream().map(TagMetadata::getTagSpec).collect(Collectors.toList())).build()) - .collect(Collectors.toList()); + Map<String, List<TagMetadata>> tagMetadataMap = tagMetadataList.stream() + .collect(Collectors.groupingBy(tagMetadata -> tagMetadata.isIndex() ? PartialMetadata.this.indexFamily() : PartialMetadata.this.nonIndexFamily())); + + final List<TagFamilySpec> tagFamilySpecs = new ArrayList<>(tagMetadataMap.size()); + for (final Map.Entry<String, List<TagMetadata>> entry : tagMetadataMap.entrySet()) { + final TagFamilySpec.Builder b = TagFamilySpec.create(entry.getKey()) + .addTagSpecs(entry.getValue().stream().map(TagMetadata::getTagSpec).collect(Collectors.toList())); + if (this.getKind() == Kind.MEASURE && entry.getKey().equals(this.indexFamily())) { + // append measure ID, but it should not generate an index in the client side. + // BanyanDB will take care of the ID index registration. + b.addTagSpec(TagFamilySpec.TagSpec.newIDTag(ID)); + } + tagFamilySpecs.add(b.build()); + } + + return tagFamilySpecs; } public Group getOrCreateGroup(BanyanDBClient client) throws BanyanDBException { @@ -392,7 +412,7 @@ public enum MetadataRegistry { @Getter public static class ColumnSpec { private final ColumnType columnType; - private final ModelColumn modelColumn; + private final Class<?> columnClass; } public enum ColumnType {
