Copilot commented on code in PR #888:
URL: 
https://github.com/apache/skywalking-banyandb/pull/888#discussion_r2596292191


##########
banyand/liaison/grpc/discovery.go:
##########
@@ -255,11 +257,17 @@ func (e *entityRepo) OnAddOrUpdate(schemaMetadata 
schema.Metadata) {
        e.RWMutex.Lock()
        defer e.RWMutex.Unlock()
        e.entitiesMap[id] = partition.Locator{TagLocators: l.TagLocators, 
ModRevision: modRevision}
-       if schemaMetadata.Kind == schema.KindMeasure {
+       switch schemaMetadata.Kind {
+       case schema.KindMeasure:
                measure := schemaMetadata.Spec.(*databasev1.Measure)
                e.measureMap[id] = measure
-       } else {
-               delete(e.measureMap, id) // Ensure measure is not stored for 
streams
+       case schema.KindStream:
+               stream := schemaMetadata.Spec.(*databasev1.Stream)
+               e.streamMap[id] = stream
+       case schema.KindTrace:
+               trace := schemaMetadata.Spec.(*databasev1.Trace)
+               e.traceMap[id] = trace

Review Comment:
   The trace schema is handled twice in this function - once in the first 
switch statement (lines 236-252) where it returns early, and again in the 
second switch statement (lines 267-269). The second case for trace is 
unreachable because the function returns early in the first switch. This 
duplicate code should be removed or the logic should be restructured.
   
   Consider removing the unreachable code at lines 267-269 since traces are 
already handled in lines 236-252 with an early return.
   ```suggestion
   
   ```



##########
banyand/trace/write_standalone.go:
##########
@@ -444,6 +463,32 @@ func (w *writeCallback) Rev(_ context.Context, message 
bus.Message) (resp bus.Me
        return
 }
 
+func buildSpecMap(spec *tracev1.TagSpec) map[string]int {
+       if spec == nil {
+               return nil
+       }
+       specMap := make(map[string]int, 0)

Review Comment:
   The map is initialized with a capacity of 0, which is inefficient. When the 
size is known in advance, the map should be initialized with the appropriate 
capacity.
   
   Suggestion:
   ```go
   specMap := make(map[string]int, len(spec.GetTagNames()))
   ```
   ```suggestion
        specMap := make(map[string]int, len(spec.GetTagNames()))
   ```



##########
banyand/stream/write_standalone.go:
##########
@@ -327,6 +347,41 @@ func (w *writeCallback) Rev(_ context.Context, message 
bus.Message) (resp bus.Me
        return
 }
 
+func buildSpecMaps(spec []*streamv1.TagFamilySpec) (map[string]int, 
map[string]map[string]int) {
+       if spec == nil {
+               return nil, nil
+       }
+       specFamilyMap := make(map[string]int)
+       specTagMaps := make(map[string]map[string]int)
+       for i, specFamily := range spec {
+               specFamilyMap[specFamily.GetName()] = i
+               tagMap := make(map[string]int)

Review Comment:
   The maps are initialized without capacity hints, which could cause multiple 
reallocations. Since the size is known in advance from the spec, the maps 
should be initialized with the appropriate capacity.
   
   Suggestion:
   ```go
   specFamilyMap := make(map[string]int, len(spec))
   specTagMaps := make(map[string]map[string]int, len(spec))
   // ...
   tagMap := make(map[string]int, len(specFamily.GetTagNames()))
   ```
   ```suggestion
        specFamilyMap := make(map[string]int, len(spec))
        specTagMaps := make(map[string]map[string]int, len(spec))
        for i, specFamily := range spec {
                specFamilyMap[specFamily.GetName()] = i
                tagMap := make(map[string]int, len(specFamily.GetTagNames()))
   ```



##########
banyand/liaison/grpc/stream.go:
##########
@@ -74,33 +76,149 @@ func (s *streamService) activeQueryAccessLog(root string, 
sampled bool) (err err
        return nil
 }
 
-func (s *streamService) validateTimestamp(writeEntity *streamv1.WriteRequest) 
error {
-       if err := timestamp.CheckPb(writeEntity.GetElement().Timestamp); err != 
nil {
-               s.l.Error().Stringer("written", writeEntity).Err(err).Msg("the 
element time is invalid")
-               return err
+func (s *streamService) validateWriteRequest(writeEntity 
*streamv1.WriteRequest,
+       metadata *commonv1.Metadata, stream streamv1.StreamService_WriteServer,
+) modelv1.Status {
+       if errTime := timestamp.CheckPb(writeEntity.GetElement().Timestamp); 
errTime != nil {
+               s.l.Error().Err(errTime).Stringer("written", 
writeEntity).Msg("the element time is invalid")
+               s.sendReply(metadata, modelv1.Status_STATUS_INVALID_TIMESTAMP, 
writeEntity.GetMessageId(), stream)
+               return modelv1.Status_STATUS_INVALID_TIMESTAMP
        }
-       return nil
-}
 
-func (s *streamService) validateMetadata(writeEntity *streamv1.WriteRequest) 
error {
-       if writeEntity.Metadata.ModRevision > 0 {
-               streamCache, existed := 
s.entityRepo.getLocator(getID(writeEntity.GetMetadata()))
+       if metadata.ModRevision > 0 {
+               streamCache, existed := s.entityRepo.getLocator(getID(metadata))
                if !existed {
-                       return errors.New("stream schema not found")
+                       s.l.Error().Stringer("written", 
writeEntity).Msg("stream schema not found")
+                       s.sendReply(metadata, modelv1.Status_STATUS_NOT_FOUND, 
writeEntity.GetMessageId(), stream)
+                       return modelv1.Status_STATUS_NOT_FOUND
                }
-               if writeEntity.Metadata.ModRevision != streamCache.ModRevision {
-                       return errors.New("expired stream schema")
+               if metadata.ModRevision != streamCache.ModRevision {
+                       s.l.Error().Stringer("written", writeEntity).Msg("the 
stream schema is expired")
+                       s.sendReply(metadata, 
modelv1.Status_STATUS_EXPIRED_SCHEMA, writeEntity.GetMessageId(), stream)
+                       return modelv1.Status_STATUS_EXPIRED_SCHEMA
+               }
+       }
+
+       return modelv1.Status_STATUS_SUCCEED
+}
+
+func (s *streamService) navigate(metadata *commonv1.Metadata,
+       writeRequest *streamv1.WriteRequest, spec []*streamv1.TagFamilySpec,
+) (pbv1.EntityValues, common.ShardID, error) {
+       tagFamilies := writeRequest.GetElement().GetTagFamilies()
+       if spec == nil {
+               return s.navigateByLocator(metadata, tagFamilies)
+       }
+       return s.navigateByTagSpec(metadata, spec, tagFamilies)
+}
+
+func (s *streamService) navigateByTagSpec(
+       metadata *commonv1.Metadata, spec []*streamv1.TagFamilySpec, 
tagFamilies []*modelv1.TagFamilyForWrite,
+) (pbv1.EntityValues, common.ShardID, error) {
+       shardNum, existed := s.groupRepo.shardNum(metadata.Group)
+       if !existed {
+               return nil, common.ShardID(0), errors.Wrapf(errNotExist, 
"finding the shard num by: %v", metadata)
+       }
+       id := getID(metadata)
+       stream, ok := s.entityRepo.getStream(id)
+       if !ok {
+               return nil, common.ShardID(0), errors.Wrapf(errNotExist, 
"finding stream schema by: %v", metadata)
+       }
+       specFamilyMap, specTagMaps := s.buildSpecMaps(spec)
+
+       entityValues := s.findTagValuesByNames(
+               metadata.Name,
+               stream.GetTagFamilies(),
+               tagFamilies,
+               stream.GetEntity().GetTagNames(),
+               specFamilyMap,
+               specTagMaps,
+       )
+       entity, err := entityValues.ToEntity()
+       if err != nil {
+               return nil, common.ShardID(0), err
+       }
+
+       shardID, err := partition.ShardID(entity.Marshal(), shardNum)
+       if err != nil {
+               return nil, common.ShardID(0), err
+       }
+       return entityValues, common.ShardID(shardID), nil
+}
+
+func (s *streamService) buildSpecMaps(spec []*streamv1.TagFamilySpec) 
(map[string]int, map[string]map[string]int) {
+       specFamilyMap := make(map[string]int)
+       specTagMaps := make(map[string]map[string]int)
+       for i, specFamily := range spec {
+               specFamilyMap[specFamily.GetName()] = i
+               tagMap := make(map[string]int)

Review Comment:
   The maps are initialized without capacity hints, which could cause multiple 
reallocations. Since the size is known in advance from the spec, the maps 
should be initialized with the appropriate capacity.
   
   Suggestion:
   ```go
   specFamilyMap := make(map[string]int, len(spec))
   specTagMaps := make(map[string]map[string]int, len(spec))
   // ...
   tagMap := make(map[string]int, len(specFamily.GetTagNames()))
   ```
   ```suggestion
        specFamilyMap := make(map[string]int, len(spec))
        specTagMaps := make(map[string]map[string]int, len(spec))
        for i, specFamily := range spec {
                specFamilyMap[specFamily.GetName()] = i
                tagMap := make(map[string]int, len(specFamily.GetTagNames()))
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to