This is an automated email from the ASF dual-hosted git repository.
lujiajing pushed a commit to branch banyandb-integration-stream
in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/banyandb-integration-stream by
this push:
new b043af38d5 add measure id and check sharding keys
b043af38d5 is described below
commit b043af38d5a35a12fd4375dbcb9bbfbbebe4a551
Author: Megrez Lu <[email protected]>
AuthorDate: Sat Apr 30 16:55:24 2022 +0800
add measure id and check sharding keys
---
.../storage/plugin/banyandb/BanyanDBConverter.java | 2 +-
.../storage/plugin/banyandb/MetadataRegistry.java | 42 ++++++++++++++++------
2 files changed, 32 insertions(+), 12 deletions(-)
diff --git
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBConverter.java
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBConverter.java
index c512b5653c..c3c936ef84 100644
---
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBConverter.java
+++
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBConverter.java
@@ -45,7 +45,7 @@ public class BanyanDBConverter {
@Override
public Object get(String fieldName) {
MetadataRegistry.ColumnSpec spec = schema.getSpec(fieldName);
- if (double.class.equals(spec.getModelColumn().getType())) {
+ if (double.class.equals(spec.getColumnClass())) {
return ByteUtil.bytes2Double(rowEntity.getTagValue(fieldName));
} else {
return rowEntity.getTagValue(fieldName);
diff --git
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java
index 6c6fbb7658..3ff4cb2833 100644
---
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java
+++
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java
@@ -67,6 +67,7 @@ import java.util.stream.Collectors;
public enum MetadataRegistry {
INSTANCE;
+ private static final String ID = "id";
private final Map<String, Schema> registry = new ConcurrentHashMap<>();
public NamedSchema<?> registerModel(Model model, ConfigService
configService) {
@@ -87,25 +88,33 @@ public enum MetadataRegistry {
.filter(Objects::nonNull)
.collect(Collectors.toList());
-
if (partialMetadata.getKind() == Kind.STREAM) {
final Stream.Builder builder =
Stream.create(partialMetadata.getGroup(), partialMetadata.getName());
+ if (entities.isEmpty()) {
+ log.warn("sharding keys of model[stream.{}] must not be
empty", model.getName());
+// throw new IllegalStateException("sharding keys of model[" +
model.getName() + "] must not be empty");
+ }
builder.setEntityRelativeTags(entities);
builder.addTagFamilies(tagFamilySpecs);
builder.addIndexes(indexRules);
-
registry.put(model.getName(), schemaBuilder.build());
return builder.build();
} else {
final Measure.Builder builder =
Measure.create(partialMetadata.getGroup(), partialMetadata.getName(),
downSamplingDuration(model.getDownsampling()));
- builder.setEntityRelativeTags(entities);
+ if (entities.isEmpty()) { // if shardingKeys is empty, for
measure, we can use ID as a single sharding key.
+ builder.setEntityRelativeTags(ID);
+ } else {
+ builder.setEntityRelativeTags(entities);
+ }
builder.addTagFamilies(tagFamilySpecs);
builder.addIndexes(indexRules);
// parse and set field
Optional<ValueColumnMetadata.ValueColumn> valueColumnOpt =
ValueColumnMetadata.INSTANCE
.readValueColumnDefinition(model.getName());
valueColumnOpt.ifPresent(valueColumn ->
builder.addField(parseFieldSpec(modelColumnMap.get(valueColumn.getValueCName()),
valueColumn)));
+ // register ID
+ schemaBuilder.spec(ID, new ColumnSpec(ColumnType.TAG,
String.class));
registry.put(model.getName(), schemaBuilder.build());
return builder.build();
@@ -194,14 +203,14 @@ public enum MetadataRegistry {
.readValueColumnDefinition(model.getName());
for (final ModelColumn col : model.getColumns()) {
if (valueColumnOpt.isPresent() &&
valueColumnOpt.get().getValueCName().equals(col.getColumnName().getStorageName()))
{
- builder.spec(col.getColumnName().getStorageName(), new
ColumnSpec(ColumnType.FIELD, col));
+ builder.spec(col.getColumnName().getStorageName(), new
ColumnSpec(ColumnType.FIELD, col.getType()));
continue;
}
final TagFamilySpec.TagSpec tagSpec = parseTagSpec(col);
if (tagSpec == null) {
continue;
}
- builder.spec(col.getColumnName().getStorageName(), new
ColumnSpec(ColumnType.TAG, col));
+ builder.spec(col.getColumnName().getStorageName(), new
ColumnSpec(ColumnType.TAG, col.getType()));
if (col.shouldIndex()) {
// build indexRule
IndexRule indexRule = parseIndexRule(tagSpec.getTagName(),
col);
@@ -316,11 +325,22 @@ public enum MetadataRegistry {
}
private List<TagFamilySpec> extractTagFamilySpec(List<TagMetadata>
tagMetadataList) {
- return
tagMetadataList.stream().collect(Collectors.groupingBy(tagMetadata ->
tagMetadata.isIndex() ? PartialMetadata.this.indexFamily() :
PartialMetadata.this.nonIndexFamily()))
- .entrySet().stream()
- .map(entry -> TagFamilySpec.create(entry.getKey())
-
.addTagSpecs(entry.getValue().stream().map(TagMetadata::getTagSpec).collect(Collectors.toList())).build())
- .collect(Collectors.toList());
+ Map<String, List<TagMetadata>> tagMetadataMap =
tagMetadataList.stream()
+ .collect(Collectors.groupingBy(tagMetadata ->
tagMetadata.isIndex() ? PartialMetadata.this.indexFamily() :
PartialMetadata.this.nonIndexFamily()));
+
+ final List<TagFamilySpec> tagFamilySpecs = new
ArrayList<>(tagMetadataMap.size());
+ for (final Map.Entry<String, List<TagMetadata>> entry :
tagMetadataMap.entrySet()) {
+ final TagFamilySpec.Builder b =
TagFamilySpec.create(entry.getKey())
+
.addTagSpecs(entry.getValue().stream().map(TagMetadata::getTagSpec).collect(Collectors.toList()));
+ if (this.getKind() == Kind.MEASURE &&
entry.getKey().equals(this.indexFamily())) {
+ // append measure ID, but it should not generate an index
in the client side.
+ // BanyanDB will take care of the ID index registration.
+ b.addTagSpec(TagFamilySpec.TagSpec.newIDTag(ID));
+ }
+ tagFamilySpecs.add(b.build());
+ }
+
+ return tagFamilySpecs;
}
public Group getOrCreateGroup(BanyanDBClient client) throws
BanyanDBException {
@@ -392,7 +412,7 @@ public enum MetadataRegistry {
@Getter
public static class ColumnSpec {
private final ColumnType columnType;
- private final ModelColumn modelColumn;
+ private final Class<?> columnClass;
}
public enum ColumnType {