Copilot commented on code in PR #888:
URL:
https://github.com/apache/skywalking-banyandb/pull/888#discussion_r2596292191
##########
banyand/liaison/grpc/discovery.go:
##########
@@ -255,11 +257,17 @@ func (e *entityRepo) OnAddOrUpdate(schemaMetadata
schema.Metadata) {
e.RWMutex.Lock()
defer e.RWMutex.Unlock()
e.entitiesMap[id] = partition.Locator{TagLocators: l.TagLocators,
ModRevision: modRevision}
- if schemaMetadata.Kind == schema.KindMeasure {
+ switch schemaMetadata.Kind {
+ case schema.KindMeasure:
measure := schemaMetadata.Spec.(*databasev1.Measure)
e.measureMap[id] = measure
- } else {
- delete(e.measureMap, id) // Ensure measure is not stored for
streams
+ case schema.KindStream:
+ stream := schemaMetadata.Spec.(*databasev1.Stream)
+ e.streamMap[id] = stream
+ case schema.KindTrace:
+ trace := schemaMetadata.Spec.(*databasev1.Trace)
+ e.traceMap[id] = trace
Review Comment:
The trace schema is handled twice in this function - once in the first
switch statement (lines 236-252) where it returns early, and again in the
second switch statement (lines 267-269). The second case for trace is
unreachable because the function returns early in the first switch. This
duplicate code should be removed or the logic should be restructured.
Consider removing the unreachable code at lines 267-269 since traces are
already handled in lines 236-252 with an early return.
```suggestion
```
##########
banyand/trace/write_standalone.go:
##########
@@ -444,6 +463,32 @@ func (w *writeCallback) Rev(_ context.Context, message
bus.Message) (resp bus.Me
return
}
+func buildSpecMap(spec *tracev1.TagSpec) map[string]int {
+ if spec == nil {
+ return nil
+ }
+ specMap := make(map[string]int, 0)
Review Comment:
The map is initialized with a capacity of 0, which is inefficient. When the
size is known in advance, the map should be initialized with the appropriate
capacity.
Suggestion:
```go
specMap := make(map[string]int, len(spec.GetTagNames()))
```
```suggestion
specMap := make(map[string]int, len(spec.GetTagNames()))
```
##########
banyand/stream/write_standalone.go:
##########
@@ -327,6 +347,41 @@ func (w *writeCallback) Rev(_ context.Context, message
bus.Message) (resp bus.Me
return
}
+func buildSpecMaps(spec []*streamv1.TagFamilySpec) (map[string]int,
map[string]map[string]int) {
+ if spec == nil {
+ return nil, nil
+ }
+ specFamilyMap := make(map[string]int)
+ specTagMaps := make(map[string]map[string]int)
+ for i, specFamily := range spec {
+ specFamilyMap[specFamily.GetName()] = i
+ tagMap := make(map[string]int)
Review Comment:
The maps are initialized without capacity hints, which could cause multiple
reallocations. Since the size is known in advance from the spec, the maps
should be initialized with the appropriate capacity.
Suggestion:
```go
specFamilyMap := make(map[string]int, len(spec))
specTagMaps := make(map[string]map[string]int, len(spec))
// ...
tagMap := make(map[string]int, len(specFamily.GetTagNames()))
```
```suggestion
specFamilyMap := make(map[string]int, len(spec))
specTagMaps := make(map[string]map[string]int, len(spec))
for i, specFamily := range spec {
specFamilyMap[specFamily.GetName()] = i
tagMap := make(map[string]int, len(specFamily.GetTagNames()))
```
##########
banyand/liaison/grpc/stream.go:
##########
@@ -74,33 +76,149 @@ func (s *streamService) activeQueryAccessLog(root string,
sampled bool) (err err
return nil
}
-func (s *streamService) validateTimestamp(writeEntity *streamv1.WriteRequest)
error {
- if err := timestamp.CheckPb(writeEntity.GetElement().Timestamp); err !=
nil {
- s.l.Error().Stringer("written", writeEntity).Err(err).Msg("the
element time is invalid")
- return err
+func (s *streamService) validateWriteRequest(writeEntity
*streamv1.WriteRequest,
+ metadata *commonv1.Metadata, stream streamv1.StreamService_WriteServer,
+) modelv1.Status {
+ if errTime := timestamp.CheckPb(writeEntity.GetElement().Timestamp);
errTime != nil {
+ s.l.Error().Err(errTime).Stringer("written",
writeEntity).Msg("the element time is invalid")
+ s.sendReply(metadata, modelv1.Status_STATUS_INVALID_TIMESTAMP,
writeEntity.GetMessageId(), stream)
+ return modelv1.Status_STATUS_INVALID_TIMESTAMP
}
- return nil
-}
-func (s *streamService) validateMetadata(writeEntity *streamv1.WriteRequest)
error {
- if writeEntity.Metadata.ModRevision > 0 {
- streamCache, existed :=
s.entityRepo.getLocator(getID(writeEntity.GetMetadata()))
+ if metadata.ModRevision > 0 {
+ streamCache, existed := s.entityRepo.getLocator(getID(metadata))
if !existed {
- return errors.New("stream schema not found")
+ s.l.Error().Stringer("written",
writeEntity).Msg("stream schema not found")
+ s.sendReply(metadata, modelv1.Status_STATUS_NOT_FOUND,
writeEntity.GetMessageId(), stream)
+ return modelv1.Status_STATUS_NOT_FOUND
}
- if writeEntity.Metadata.ModRevision != streamCache.ModRevision {
- return errors.New("expired stream schema")
+ if metadata.ModRevision != streamCache.ModRevision {
+ s.l.Error().Stringer("written", writeEntity).Msg("the
stream schema is expired")
+ s.sendReply(metadata,
modelv1.Status_STATUS_EXPIRED_SCHEMA, writeEntity.GetMessageId(), stream)
+ return modelv1.Status_STATUS_EXPIRED_SCHEMA
+ }
+ }
+
+ return modelv1.Status_STATUS_SUCCEED
+}
+
+func (s *streamService) navigate(metadata *commonv1.Metadata,
+ writeRequest *streamv1.WriteRequest, spec []*streamv1.TagFamilySpec,
+) (pbv1.EntityValues, common.ShardID, error) {
+ tagFamilies := writeRequest.GetElement().GetTagFamilies()
+ if spec == nil {
+ return s.navigateByLocator(metadata, tagFamilies)
+ }
+ return s.navigateByTagSpec(metadata, spec, tagFamilies)
+}
+
+func (s *streamService) navigateByTagSpec(
+ metadata *commonv1.Metadata, spec []*streamv1.TagFamilySpec,
tagFamilies []*modelv1.TagFamilyForWrite,
+) (pbv1.EntityValues, common.ShardID, error) {
+ shardNum, existed := s.groupRepo.shardNum(metadata.Group)
+ if !existed {
+ return nil, common.ShardID(0), errors.Wrapf(errNotExist,
"finding the shard num by: %v", metadata)
+ }
+ id := getID(metadata)
+ stream, ok := s.entityRepo.getStream(id)
+ if !ok {
+ return nil, common.ShardID(0), errors.Wrapf(errNotExist,
"finding stream schema by: %v", metadata)
+ }
+ specFamilyMap, specTagMaps := s.buildSpecMaps(spec)
+
+ entityValues := s.findTagValuesByNames(
+ metadata.Name,
+ stream.GetTagFamilies(),
+ tagFamilies,
+ stream.GetEntity().GetTagNames(),
+ specFamilyMap,
+ specTagMaps,
+ )
+ entity, err := entityValues.ToEntity()
+ if err != nil {
+ return nil, common.ShardID(0), err
+ }
+
+ shardID, err := partition.ShardID(entity.Marshal(), shardNum)
+ if err != nil {
+ return nil, common.ShardID(0), err
+ }
+ return entityValues, common.ShardID(shardID), nil
+}
+
+func (s *streamService) buildSpecMaps(spec []*streamv1.TagFamilySpec)
(map[string]int, map[string]map[string]int) {
+ specFamilyMap := make(map[string]int)
+ specTagMaps := make(map[string]map[string]int)
+ for i, specFamily := range spec {
+ specFamilyMap[specFamily.GetName()] = i
+ tagMap := make(map[string]int)
Review Comment:
The maps are initialized without capacity hints, which could cause multiple
reallocations. Since the size is known in advance from the spec, the maps
should be initialized with the appropriate capacity.
Suggestion:
```go
specFamilyMap := make(map[string]int, len(spec))
specTagMaps := make(map[string]map[string]int, len(spec))
// ...
tagMap := make(map[string]int, len(specFamily.GetTagNames()))
```
```suggestion
specFamilyMap := make(map[string]int, len(spec))
specTagMaps := make(map[string]map[string]int, len(spec))
for i, specFamily := range spec {
specFamilyMap[specFamily.GetName()] = i
tagMap := make(map[string]int, len(specFamily.GetTagNames()))
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]