hanahmily commented on code in PR #377: URL: https://github.com/apache/skywalking-banyandb/pull/377#discussion_r1462695433
########## banyand/stream/series_span.go: ########## @@ -0,0 +1,62 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package stream + +import ( + "context" + + "github.com/apache/skywalking-banyandb/api/common" + "github.com/apache/skywalking-banyandb/banyand/internal/storage" + "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/timestamp" +) + +type seriesSpan struct { + l *logger.Logger + timeRange *timestamp.TimeRange + tableWrappers []storage.TSTableWrapper[*tsTable] + seriesID common.SeriesID +} + +func (s *seriesSpan) Build() *seekerBuilder { + return &seekerBuilder{ + seriesSpan: s, + l: logger.GetLogger("seeker-builder"), + } +} + +func (s *seriesSpan) Close() { + for _, tw := range s.tableWrappers { + tw.DecRef() + } +} + Review Comment: ```suggestion ``` ########## pkg/index/index.go: ########## @@ -249,6 +249,11 @@ type SeriesStore interface { SearchWildcard([]byte) ([]Series, error) } +// ElementStore is an abstract of a element repository. +type ElementStore interface { Review Comment: It appears that using the wrapped `ElementStore` is unnecessary. Please use `Store` directly instead. ########## banyand/liaison/grpc/stream.go: ########## @@ -116,9 +116,8 @@ func (s *streamService) Write(stream streamv1.StreamService_WriteServer) error { Request: writeEntity, ShardId: uint32(shardID), SeriesHash: tsdb.HashEntity(entity), - } - if s.log.Debug().Enabled() { - iwr.EntityValues = tagValues.Encode() + // TODO: remove the first value (stream name) of tagValues Review Comment: Why do you mark a TODO here? ########## banyand/stream/stream.go: ########## @@ -56,33 +101,245 @@ func (s *stream) Close() error { func (s *stream) parseSpec() { s.name, s.group = s.schema.GetMetadata().GetName(), s.schema.GetMetadata().GetGroup() + s.indexRuleLocators = partition.ParseIndexRuleLocators(s.schema.GetTagFamilies(), s.indexRules) +} + +func (s *stream) ParseElementIDDeprecated(item tsdb.Item) (string, error) { Review Comment: Do you still need it? ########## banyand/stream/stream.go: ########## @@ -56,33 +101,245 @@ func (s *stream) Close() error { func (s *stream) parseSpec() { s.name, s.group = s.schema.GetMetadata().GetName(), s.schema.GetMetadata().GetGroup() + s.indexRuleLocators = partition.ParseIndexRuleLocators(s.schema.GetTagFamilies(), s.indexRules) +} + +func (s *stream) ParseElementIDDeprecated(item tsdb.Item) (string, error) { + rawBytes, err := item.Val() + if err != nil { + return "", err + } + return string(rawBytes), nil +} + +// NewItemIter returns a ItemIterator which mergers several tsdb.Iterator by input sorting order. +func NewItemIter(iters []*searcherIterator, s modelv1.Sort) itersort.Iterator[item] { Review Comment: ```suggestion func newItemIter(iters []*searcherIterator, s modelv1.Sort) itersort.Iterator[item] { ``` ########## banyand/stream/series_span.go: ########## @@ -0,0 +1,62 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package stream + +import ( + "context" + + "github.com/apache/skywalking-banyandb/api/common" + "github.com/apache/skywalking-banyandb/banyand/internal/storage" + "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/timestamp" +) + +type seriesSpan struct { + l *logger.Logger + timeRange *timestamp.TimeRange + tableWrappers []storage.TSTableWrapper[*tsTable] + seriesID common.SeriesID +} + +func (s *seriesSpan) Build() *seekerBuilder { + return &seekerBuilder{ + seriesSpan: s, + l: logger.GetLogger("seeker-builder"), + } +} + +func (s *seriesSpan) Close() { + for _, tw := range s.tableWrappers { + tw.DecRef() + } +} + +func newSeriesSpan(ctx context.Context, timeRange *timestamp.TimeRange, tableWrappers []storage.TSTableWrapper[*tsTable], id common.SeriesID) *seriesSpan { Review Comment: Would you mind initializing the seekBuilder directly? Initializing a seriesSpan first and then injecting it into the seekBuilder is an unnecessary overhead. Finally, you have a chance to drop `series_span.go` ########## banyand/stream/iter_builder.go: ########## @@ -0,0 +1,106 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package stream + +import ( + "time" + + "github.com/pkg/errors" + + databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + "github.com/apache/skywalking-banyandb/pkg/index" + "github.com/apache/skywalking-banyandb/pkg/logger" + pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" +) + +var ( + errUnspecifiedIndexType = errors.New("Unspecified index type") + rangeOpts = index.RangeOpts{} +) + +type filterFn func(item item) bool + +type seekerBuilder struct { Review Comment: Since seekerBuilder currently only contains a single method, would you be willing to change it to a global method? This change would allow most of the relevant objects to be allocated on the stack, improving performance and making the code easier to manage. ########## banyand/stream/stream.go: ########## @@ -21,25 +21,70 @@ package stream import ( "context" + "errors" + "fmt" + "io" + "sort" + "time" + "go.uber.org/multierr" + + "github.com/apache/skywalking-banyandb/api/common" + commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + "github.com/apache/skywalking-banyandb/banyand/internal/storage" "github.com/apache/skywalking-banyandb/banyand/tsdb" - "github.com/apache/skywalking-banyandb/banyand/tsdb/index" + itersort "github.com/apache/skywalking-banyandb/pkg/iter/sort" "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/partition" + pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" + "github.com/apache/skywalking-banyandb/pkg/schema" +) + +const ( + maxValuesBlockSize = 8 * 1024 * 1024 + maxTimestampsBlockSize = 8 * 1024 * 1024 + maxElementIDsBlockSize = 8 * 1024 * 1024 + maxTagFamiliesMetadataSize = 8 * 1024 * 1024 + maxUncompressedBlockSize = 2 * 1024 * 1024 + maxUncompressedPrimaryBlockSize = 128 * 1024 + + maxBlockLength = 8 * 1024 + defaultFlushTimeout = 5 * time.Second ) -// a chunk is 1MB. -const chunkSize = 1 << 20 +type option struct { + flushTimeout time.Duration +} + +// Query allow to retrieve elements in a series of streams. +type Query interface { + LoadGroup(name string) (schema.Group, bool) + Stream(stream *commonv1.Metadata) (Stream, error) +} + +// Stream allows inspecting elements' details. +type Stream interface { + io.Closer + GetSchema() *databasev1.Stream + GetIndexRules() []*databasev1.IndexRule + Query(ctx context.Context, opts pbv1.StreamQueryOptions) (pbv1.StreamQueryResult, error) + Sort(ctx context.Context, opts pbv1.StreamSortOptions) (pbv1.StreamSortResult, error) + Filter(ctx context.Context, opts pbv1.StreamFilterOptions) (pbv1.StreamFilterResult, error) +} Review Comment: Please move the methods related to queries to the `query.go` file. ########## pkg/pb/v1/metadata.go: ########## @@ -104,17 +104,73 @@ type Field struct { // Result is the result of a query. type Result struct { Timestamps []int64 + ElementIDs []string Review Comment: Please use a separate `StreamResult` here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
