This is an automated email from the ASF dual-hosted git repository. lujiajing pushed a commit to branch banyandb-integration-stream in repository https://gitbox.apache.org/repos/asf/skywalking.git
commit 99357ad9614c348ad2b92878e8e61c2d2df6a35e Author: Megrez Lu <[email protected]> AuthorDate: Thu Dec 9 18:21:57 2021 +0800 add installer --- .../src/main/resources/application.yml | 8 + .../plugin/banyandb/BanyanDBIndexInstaller.java | 44 +++++ .../plugin/banyandb/BanyanDBStorageClient.java | 33 ++++ .../plugin/banyandb/BanyanDBStorageProvider.java | 3 + .../storage/plugin/banyandb/StreamMetaInfo.java | 191 +++++++++++++++++++++ .../metadata/index_rules/segment/db.instance.json | 13 ++ .../src/main/resources/metadata/segment.json | 93 ++++++++++ 7 files changed, 385 insertions(+) diff --git a/oap-server/server-starter/src/main/resources/application.yml b/oap-server/server-starter/src/main/resources/application.yml index d8430650b7..12db39b01b 100755 --- a/oap-server/server-starter/src/main/resources/application.yml +++ b/oap-server/server-starter/src/main/resources/application.yml @@ -260,6 +260,14 @@ storage: storageGroup: ${SW_STORAGE_IOTDB_STORAGE_GROUP:root.skywalking} sessionPoolSize: ${SW_STORAGE_IOTDB_SESSIONPOOL_SIZE:8} # If it's zero, the SessionPool size will be 2*CPU_Cores fetchTaskLogMaxSize: ${SW_STORAGE_IOTDB_FETCH_TASK_LOG_MAX_SIZE:1000} # the max number of fetch task log in a request + banyandb: + host: ${SW_STORAGE_BANYANDB_HOST:127.0.0.1} + port: ${SW_STORAGE_BANYANDB_PORT:17912} + group: ${SW_STORAGE_BANYANDB_GROUP:default} + maxBulkSize: ${SW_STORAGE_BANYANDB_MAX_BULK_SIZE:5000} + flushInterval: ${SW_STORAGE_BANYANDB_FLUSH_INTERVAL:15} + concurrentWriteThreads: ${SW_STORAGE_BANYANDB_CONCURRENT_WRITE_THREADS:15} + fetchTaskLogMaxSize: ${SW_STORAGE_BANYANDB_FETCH_TASK_LOG_MAX_SIZE:1000} # the max number of fetch task log in a request agent-analyzer: selector: ${SW_AGENT_ANALYZER:default} diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java new file mode 100644 index 0000000000..ffc5aaf586 --- /dev/null +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.storage.plugin.banyandb; + +import org.apache.skywalking.oap.server.core.storage.StorageException; +import org.apache.skywalking.oap.server.core.storage.model.Model; +import org.apache.skywalking.oap.server.core.storage.model.ModelInstaller; +import org.apache.skywalking.oap.server.library.client.Client; +import org.apache.skywalking.oap.server.library.module.ModuleManager; + +public class BanyanDBIndexInstaller extends ModelInstaller { + public BanyanDBIndexInstaller(Client client, ModuleManager moduleManager) { + super(client, moduleManager); + } + + @Override + protected boolean isExists(Model model) throws StorageException { + return false; + } + + @Override + protected void createTable(Model model) throws StorageException { + StreamMetaInfo metaInfo = StreamMetaInfo.addModel(model); + if (metaInfo != null) { + ((BanyanDBStorageClient) client).createStream(metaInfo); + } + } +} diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java index 3ad4daf918..2ee45b12f0 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + package org.apache.skywalking.oap.server.storage.plugin.banyandb; import org.apache.skywalking.banyandb.v1.client.BanyanDBClient; @@ -5,12 +23,17 @@ import org.apache.skywalking.banyandb.v1.client.StreamBulkWriteProcessor; import org.apache.skywalking.banyandb.v1.client.StreamQuery; import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse; import org.apache.skywalking.banyandb.v1.client.StreamWrite; +import org.apache.skywalking.banyandb.v1.client.metadata.IndexRule; +import org.apache.skywalking.banyandb.v1.client.metadata.Stream; import org.apache.skywalking.oap.server.library.client.Client; import org.apache.skywalking.oap.server.library.client.healthcheck.DelegatedHealthChecker; import org.apache.skywalking.oap.server.library.client.healthcheck.HealthCheckable; import org.apache.skywalking.oap.server.library.util.HealthChecker; import java.io.IOException; +import java.time.Instant; +import java.time.ZoneOffset; +import java.time.ZonedDateTime; /** * BanyanDBStorageClient is a simple wrapper for the underlying {@link BanyanDBClient}, @@ -45,6 +68,16 @@ public class BanyanDBStorageClient implements Client, HealthCheckable { } } + public void createStream(StreamMetaInfo streamMetaInfo) { + Stream stm = this.client.define(streamMetaInfo.getStream()); + if (stm != null) { + // TODO: should be fixed in SDK + this.client.defineIndexRules(stm, ZonedDateTime.from(Instant.now()), + ZonedDateTime.of(2099, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC), + streamMetaInfo.getIndexRules().toArray(new IndexRule[]{})); + } + } + public void write(StreamWrite streamWrite) { this.client.write(streamWrite); } diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java index aade57b62b..64f8e96cbf 100644 --- a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java @@ -26,6 +26,7 @@ import org.apache.skywalking.oap.server.core.storage.StorageDAO; import org.apache.skywalking.oap.server.core.storage.StorageModule; import org.apache.skywalking.oap.server.core.storage.cache.INetworkAddressAliasDAO; import org.apache.skywalking.oap.server.core.storage.management.UITemplateManagementDAO; +import org.apache.skywalking.oap.server.core.storage.model.ModelCreator; import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskLogQueryDAO; import org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskQueryDAO; import org.apache.skywalking.oap.server.core.storage.profile.IProfileThreadSnapshotQueryDAO; @@ -122,6 +123,8 @@ public class BanyanDBStorageProvider extends ModuleProvider { this.client.registerChecker(healthChecker); try { this.client.connect(); + BanyanDBIndexInstaller installer = new BanyanDBIndexInstaller(client, getManager()); + getManager().find(CoreModule.NAME).provider().getService(ModelCreator.class).addModelListener(installer); } catch (Exception e) { throw new ModuleStartException(e.getMessage(), e); } diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/StreamMetaInfo.java b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/StreamMetaInfo.java new file mode 100644 index 0000000000..1fbf2eb636 --- /dev/null +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/StreamMetaInfo.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.storage.plugin.banyandb; + +import com.google.common.base.Charsets; +import com.google.common.io.CharStreams; +import com.google.protobuf.util.JsonFormat; +import lombok.Builder; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.apache.skywalking.banyandb.database.v1.metadata.BanyandbMetadata; +import org.apache.skywalking.banyandb.v1.client.metadata.Duration; +import org.apache.skywalking.banyandb.v1.client.metadata.IndexRule; +import org.apache.skywalking.banyandb.v1.client.metadata.Stream; +import org.apache.skywalking.banyandb.v1.client.metadata.TagFamilySpec; +import org.apache.skywalking.oap.server.core.storage.model.Model; + +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +@Getter +@Builder +@Slf4j +public class StreamMetaInfo { + public static final String TAG_FAMILY_SEARCHABLE = "searchable"; + public static final String TAG_FAMILY_DATA = "data"; + + private static final Map<String, StreamMetaInfo> STREAMS = new HashMap<>(); + + private final Model model; + + /** + * stream is the metadata to be used for schema creation, + * 1. Read json from resources/metadata/{model.name}.json and deserialize to protobuf, + * 2. Iterate over tag families, + * 3. Iterate over tags in each tag family + * 4. + */ + private final Stream stream; + + private final List<IndexRule> indexRules; + + public static StreamMetaInfo addModel(Model model) { + BanyandbMetadata.Stream pbStream = parseStreamFromJSON(model.getName()); + if (pbStream == null) { + log.warn("fail to find the stream schema {}", model.getName()); + return null; + } + BanyandbMetadata.Duration duration = pbStream.getOpts().getTtl(); + Duration ttl = fromProtobuf(duration); + final Stream stream = new Stream(pbStream.getMetadata().getName(), pbStream.getOpts().getShardNum(), ttl); + + List<IndexRule> indexRules = new ArrayList<>(); + + stream.setEntityTagNames(pbStream.getEntity().getTagNamesList()); + for (BanyandbMetadata.TagFamilySpec pbTagFamilySpec : pbStream.getTagFamiliesList()) { + final TagFamilySpec tagFamilySpec = new TagFamilySpec(pbTagFamilySpec.getName()); + final boolean needIndexParse = pbTagFamilySpec.getName().equals(TAG_FAMILY_SEARCHABLE); + for (final BanyandbMetadata.TagSpec pbTagSpec : pbTagFamilySpec.getTagsList()) { + tagFamilySpec.addTagSpec(parseTagSpec(pbTagSpec)); + + // if the tag family equals to "searchable", build index rules + if (needIndexParse) { + BanyandbMetadata.IndexRule pbIndexRule = parseIndexRulesFromJSON(model.getName(), pbTagSpec.getName()); + if (pbIndexRule == null) { + log.warn("fail to find the index rule for {}", pbTagSpec.getName()); + continue; + } + IndexRule.IndexType indexType = fromProtobuf(pbIndexRule.getType()); + IndexRule.IndexLocation indexLocation = fromProtobuf(pbIndexRule.getLocation()); + IndexRule indexRule = new IndexRule(pbIndexRule.getMetadata().getName(), indexType, indexLocation); + indexRule.setTags(new ArrayList<>(pbIndexRule.getTagsList())); + indexRules.add(indexRule); + } + } + } + + return StreamMetaInfo.builder().model(model).stream(stream).indexRules(indexRules).build(); + } + + private static TagFamilySpec.TagSpec parseTagSpec(BanyandbMetadata.TagSpec pbTagSpec) { + switch (pbTagSpec.getType()) { + case TAG_TYPE_INT: + return TagFamilySpec.TagSpec.newIntTag(pbTagSpec.getName()); + case TAG_TYPE_INT_ARRAY: + return TagFamilySpec.TagSpec.newIntArrayTag(pbTagSpec.getName()); + case TAG_TYPE_STRING: + return TagFamilySpec.TagSpec.newStringTag(pbTagSpec.getName()); + case TAG_TYPE_STRING_ARRAY: + return TagFamilySpec.TagSpec.newStringArrayTag(pbTagSpec.getName()); + case TAG_TYPE_DATA_BINARY: + return TagFamilySpec.TagSpec.newBinaryTag(pbTagSpec.getName()); + default: + throw new IllegalArgumentException("unrecognized tag type"); + } + } + + private static BanyandbMetadata.Stream parseStreamFromJSON(String name) { + try { + InputStream is = StreamMetaInfo.class.getClassLoader().getResourceAsStream("metadata/" + name + ".json"); + if (is == null) { + return null; + } + String result = CharStreams.toString(new InputStreamReader(is, Charsets.UTF_8)); + BanyandbMetadata.Stream.Builder b = BanyandbMetadata.Stream.newBuilder(); + JsonFormat.parser().merge(result, b); + return b.build(); + } catch (IOException ioEx) { + log.error("fail to read json", ioEx); + return null; + } + } + + private static BanyandbMetadata.IndexRule parseIndexRulesFromJSON(String streamName, String name) { + try { + InputStream is = StreamMetaInfo.class.getClassLoader().getResourceAsStream(String.join("/", + new String[]{"metadata", "index_rules", streamName, name + ".json"})); + if (is == null) { + return null; + } + String result = CharStreams.toString(new InputStreamReader(is, Charsets.UTF_8)); + BanyandbMetadata.IndexRule.Builder b = BanyandbMetadata.IndexRule.newBuilder(); + JsonFormat.parser().merge(result, b); + return b.build(); + } catch (IOException ioEx) { + log.error("fail to read json", ioEx); + return null; + } + } + + // TODO: change modifier to public in SDK + static Duration fromProtobuf(BanyandbMetadata.Duration duration) { + switch (duration.getUnit()) { + case DURATION_UNIT_DAY: + return Duration.ofDays(duration.getVal()); + case DURATION_UNIT_HOUR: + return Duration.ofHours(duration.getVal()); + case DURATION_UNIT_MONTH: + return Duration.ofMonths(duration.getVal()); + case DURATION_UNIT_WEEK: + return Duration.ofWeeks(duration.getVal()); + default: + throw new IllegalArgumentException("unrecognized DurationUnit"); + } + } + + // TODO: change modifier to public in SDK + private static IndexRule.IndexType fromProtobuf(BanyandbMetadata.IndexRule.Type type) { + switch (type) { + case TYPE_TREE: + return IndexRule.IndexType.TREE; + case TYPE_INVERTED: + return IndexRule.IndexType.INVERTED; + default: + throw new IllegalArgumentException("unrecognized index type"); + } + } + + // TODO: change modifier to public in SDK + private static IndexRule.IndexLocation fromProtobuf(BanyandbMetadata.IndexRule.Location loc) { + switch (loc) { + case LOCATION_GLOBAL: + return IndexRule.IndexLocation.GLOBAL; + case LOCATION_SERIES: + return IndexRule.IndexLocation.SERIES; + default: + throw new IllegalArgumentException("unrecognized index location"); + } + } +} diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/resources/metadata/index_rules/segment/db.instance.json b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/resources/metadata/index_rules/segment/db.instance.json new file mode 100644 index 0000000000..662cff2eac --- /dev/null +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/resources/metadata/index_rules/segment/db.instance.json @@ -0,0 +1,13 @@ +{ + "metadata": { + "id": 1, + "name": "db.instance", + "group": "default" + }, + "tags": [ + "db.instance" + ], + "type": "TYPE_INVERTED", + "location": "LOCATION_SERIES", + "updated_at": "2021-04-15T01:30:15.01Z" +} \ No newline at end of file diff --git a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/resources/metadata/segment.json b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/resources/metadata/segment.json new file mode 100644 index 0000000000..1317d9ecf0 --- /dev/null +++ b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/resources/metadata/segment.json @@ -0,0 +1,93 @@ +{ + "metadata": { + "name": "segment", + "group": "default" + }, + "tag_families": [ + { + "name": "data", + "tags": [ + { + "name": "data_binary", + "type": "TAG_TYPE_DATA_BINARY" + } + ] + }, + { + "name": "searchable", + "tags": [ + { + "name": "trace_id", + "type": "TAG_TYPE_STRING" + }, + { + "name": "state", + "type": "TAG_TYPE_INT" + }, + { + "name": "service_id", + "type": "TAG_TYPE_STRING" + }, + { + "name": "service_instance_id", + "type": "TAG_TYPE_STRING" + }, + { + "name": "endpoint_id", + "type": "TAG_TYPE_STRING" + }, + { + "name": "duration", + "type": "TAG_TYPE_INT" + }, + { + "name": "start_time", + "type": "TAG_TYPE_INT" + }, + { + "name": "http.method", + "type": "TAG_TYPE_STRING" + }, + { + "name": "status_code", + "type": "TAG_TYPE_STRING" + }, + { + "name": "db.type", + "type": "TAG_TYPE_STRING" + }, + { + "name": "db.instance", + "type": "TAG_TYPE_STRING" + }, + { + "name": "mq.queue", + "type": "TAG_TYPE_STRING" + }, + { + "name": "mq.topic", + "type": "TAG_TYPE_STRING" + }, + { + "name": "mq.broker", + "type": "TAG_TYPE_STRING" + } + ] + } + ], + "entity": { + "tag_names": [ + "service_id", + "service_instance_id", + "state" + ] + }, + "opts": { + "shard_num": 2, + "ttl": { + "val": 7, + "unit": "DURATION_UNIT_DAY" + } + }, + "updated_at_nanoseconds": "2021-04-15T01:30:15.01Z" +} \ No newline at end of file
