hanahmily commented on code in PR #888:
URL: 
https://github.com/apache/skywalking-banyandb/pull/888#discussion_r2604877530


##########
banyand/liaison/grpc/discovery.go:
##########
@@ -255,11 +257,17 @@ func (e *entityRepo) OnAddOrUpdate(schemaMetadata 
schema.Metadata) {
        e.RWMutex.Lock()
        defer e.RWMutex.Unlock()
        e.entitiesMap[id] = partition.Locator{TagLocators: l.TagLocators, 
ModRevision: modRevision}
-       if schemaMetadata.Kind == schema.KindMeasure {
+       switch schemaMetadata.Kind {
+       case schema.KindMeasure:
                measure := schemaMetadata.Spec.(*databasev1.Measure)
                e.measureMap[id] = measure
-       } else {
-               delete(e.measureMap, id) // Ensure measure is not stored for 
streams
+       case schema.KindStream:
+               stream := schemaMetadata.Spec.(*databasev1.Stream)
+               e.streamMap[id] = stream
+       case schema.KindTrace:
+               trace := schemaMetadata.Spec.(*databasev1.Trace)
+               e.traceMap[id] = trace

Review Comment:
   @ButterBright, it appears this branch is redundant. Could you please 
double-check?



##########
banyand/liaison/grpc/stream.go:
##########
@@ -74,33 +76,149 @@ func (s *streamService) activeQueryAccessLog(root string, 
sampled bool) (err err
        return nil
 }
 
-func (s *streamService) validateTimestamp(writeEntity *streamv1.WriteRequest) 
error {
-       if err := timestamp.CheckPb(writeEntity.GetElement().Timestamp); err != 
nil {
-               s.l.Error().Stringer("written", writeEntity).Err(err).Msg("the 
element time is invalid")
-               return err
+func (s *streamService) validateWriteRequest(writeEntity 
*streamv1.WriteRequest,
+       metadata *commonv1.Metadata, stream streamv1.StreamService_WriteServer,
+) modelv1.Status {
+       if errTime := timestamp.CheckPb(writeEntity.GetElement().Timestamp); 
errTime != nil {
+               s.l.Error().Err(errTime).Stringer("written", 
writeEntity).Msg("the element time is invalid")
+               s.sendReply(metadata, modelv1.Status_STATUS_INVALID_TIMESTAMP, 
writeEntity.GetMessageId(), stream)
+               return modelv1.Status_STATUS_INVALID_TIMESTAMP
        }
-       return nil
-}
 
-func (s *streamService) validateMetadata(writeEntity *streamv1.WriteRequest) 
error {
-       if writeEntity.Metadata.ModRevision > 0 {
-               streamCache, existed := 
s.entityRepo.getLocator(getID(writeEntity.GetMetadata()))
+       if metadata.ModRevision > 0 {
+               streamCache, existed := s.entityRepo.getLocator(getID(metadata))
                if !existed {
-                       return errors.New("stream schema not found")
+                       s.l.Error().Stringer("written", 
writeEntity).Msg("stream schema not found")
+                       s.sendReply(metadata, modelv1.Status_STATUS_NOT_FOUND, 
writeEntity.GetMessageId(), stream)
+                       return modelv1.Status_STATUS_NOT_FOUND
                }
-               if writeEntity.Metadata.ModRevision != streamCache.ModRevision {
-                       return errors.New("expired stream schema")
+               if metadata.ModRevision != streamCache.ModRevision {
+                       s.l.Error().Stringer("written", writeEntity).Msg("the 
stream schema is expired")
+                       s.sendReply(metadata, 
modelv1.Status_STATUS_EXPIRED_SCHEMA, writeEntity.GetMessageId(), stream)
+                       return modelv1.Status_STATUS_EXPIRED_SCHEMA
+               }
+       }
+
+       return modelv1.Status_STATUS_SUCCEED
+}
+
+func (s *streamService) navigate(metadata *commonv1.Metadata,
+       writeRequest *streamv1.WriteRequest, spec []*streamv1.TagFamilySpec,
+) (pbv1.EntityValues, common.ShardID, error) {
+       tagFamilies := writeRequest.GetElement().GetTagFamilies()
+       if spec == nil {
+               return s.navigateByLocator(metadata, tagFamilies)
+       }
+       return s.navigateByTagSpec(metadata, spec, tagFamilies)
+}
+
+func (s *streamService) navigateByTagSpec(

Review Comment:
   This function has a performance flaw due to parsing the entity. We verified 
this flaw 2 years ago. That's why we introduce the "navigateByLocator".
   
   You should build "entityLocator" and "shardingKeyLocator" outside the grpc 
write loop. Then pass them into navigateByLocator. 



##########
banyand/metadata/schema/etcd.go:
##########
@@ -400,7 +401,11 @@ func (e *etcdSchemaRegistry) listWithPrefix(ctx 
context.Context, prefix string,
                return nil, ErrClosed
        }
        defer e.closer.Done()
+       hasTrailingSlash := strings.HasSuffix(prefix, "/")
        prefix = e.prependNamespace(prefix)
+       if hasTrailingSlash && !strings.HasSuffix(prefix, "/") {

Review Comment:
   Update prependNamespace to make sure it ends with "/". 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to