hanahmily commented on code in PR #888:
URL:
https://github.com/apache/skywalking-banyandb/pull/888#discussion_r2604877530
##########
banyand/liaison/grpc/discovery.go:
##########
@@ -255,11 +257,17 @@ func (e *entityRepo) OnAddOrUpdate(schemaMetadata
schema.Metadata) {
e.RWMutex.Lock()
defer e.RWMutex.Unlock()
e.entitiesMap[id] = partition.Locator{TagLocators: l.TagLocators,
ModRevision: modRevision}
- if schemaMetadata.Kind == schema.KindMeasure {
+ switch schemaMetadata.Kind {
+ case schema.KindMeasure:
measure := schemaMetadata.Spec.(*databasev1.Measure)
e.measureMap[id] = measure
- } else {
- delete(e.measureMap, id) // Ensure measure is not stored for
streams
+ case schema.KindStream:
+ stream := schemaMetadata.Spec.(*databasev1.Stream)
+ e.streamMap[id] = stream
+ case schema.KindTrace:
+ trace := schemaMetadata.Spec.(*databasev1.Trace)
+ e.traceMap[id] = trace
Review Comment:
@ButterBright, it appears this branch is redundant. Could you please
double-check?
##########
banyand/liaison/grpc/stream.go:
##########
@@ -74,33 +76,149 @@ func (s *streamService) activeQueryAccessLog(root string,
sampled bool) (err err
return nil
}
-func (s *streamService) validateTimestamp(writeEntity *streamv1.WriteRequest)
error {
- if err := timestamp.CheckPb(writeEntity.GetElement().Timestamp); err !=
nil {
- s.l.Error().Stringer("written", writeEntity).Err(err).Msg("the
element time is invalid")
- return err
+func (s *streamService) validateWriteRequest(writeEntity
*streamv1.WriteRequest,
+ metadata *commonv1.Metadata, stream streamv1.StreamService_WriteServer,
+) modelv1.Status {
+ if errTime := timestamp.CheckPb(writeEntity.GetElement().Timestamp);
errTime != nil {
+ s.l.Error().Err(errTime).Stringer("written",
writeEntity).Msg("the element time is invalid")
+ s.sendReply(metadata, modelv1.Status_STATUS_INVALID_TIMESTAMP,
writeEntity.GetMessageId(), stream)
+ return modelv1.Status_STATUS_INVALID_TIMESTAMP
}
- return nil
-}
-func (s *streamService) validateMetadata(writeEntity *streamv1.WriteRequest)
error {
- if writeEntity.Metadata.ModRevision > 0 {
- streamCache, existed :=
s.entityRepo.getLocator(getID(writeEntity.GetMetadata()))
+ if metadata.ModRevision > 0 {
+ streamCache, existed := s.entityRepo.getLocator(getID(metadata))
if !existed {
- return errors.New("stream schema not found")
+ s.l.Error().Stringer("written",
writeEntity).Msg("stream schema not found")
+ s.sendReply(metadata, modelv1.Status_STATUS_NOT_FOUND,
writeEntity.GetMessageId(), stream)
+ return modelv1.Status_STATUS_NOT_FOUND
}
- if writeEntity.Metadata.ModRevision != streamCache.ModRevision {
- return errors.New("expired stream schema")
+ if metadata.ModRevision != streamCache.ModRevision {
+ s.l.Error().Stringer("written", writeEntity).Msg("the
stream schema is expired")
+ s.sendReply(metadata,
modelv1.Status_STATUS_EXPIRED_SCHEMA, writeEntity.GetMessageId(), stream)
+ return modelv1.Status_STATUS_EXPIRED_SCHEMA
+ }
+ }
+
+ return modelv1.Status_STATUS_SUCCEED
+}
+
+func (s *streamService) navigate(metadata *commonv1.Metadata,
+ writeRequest *streamv1.WriteRequest, spec []*streamv1.TagFamilySpec,
+) (pbv1.EntityValues, common.ShardID, error) {
+ tagFamilies := writeRequest.GetElement().GetTagFamilies()
+ if spec == nil {
+ return s.navigateByLocator(metadata, tagFamilies)
+ }
+ return s.navigateByTagSpec(metadata, spec, tagFamilies)
+}
+
+func (s *streamService) navigateByTagSpec(
Review Comment:
This function has a performance flaw due to parsing the entity. We verified
this flaw 2 years ago. That's why we introduce the "navigateByLocator".
You should build "entityLocator" and "shardingKeyLocator" outside the grpc
write loop. Then pass them into navigateByLocator.
##########
banyand/metadata/schema/etcd.go:
##########
@@ -400,7 +401,11 @@ func (e *etcdSchemaRegistry) listWithPrefix(ctx
context.Context, prefix string,
return nil, ErrClosed
}
defer e.closer.Done()
+ hasTrailingSlash := strings.HasSuffix(prefix, "/")
prefix = e.prependNamespace(prefix)
+ if hasTrailingSlash && !strings.HasSuffix(prefix, "/") {
Review Comment:
Update prependNamespace to make sure it ends with "/".
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]