Copilot commented on code in PR #869:
URL:
https://github.com/apache/skywalking-banyandb/pull/869#discussion_r2589361462
##########
banyand/liaison/grpc/measure.go:
##########
@@ -190,44 +217,185 @@ func (ms *measureService) processAndPublishRequest(ctx
context.Context, writeReq
EntityValues: tagValues[1:].Encode(),
}
- nodes, err := ms.publishToNodes(ctx, writeRequest, iwr, publisher,
uint32(shardID), measure)
+ nodes, err := ms.publishToNodes(ctx, writeRequest, metadata, spec, iwr,
publisher, uint32(shardID), measure, nodeMetadataSent, nodeSpecSent)
if err != nil {
return err
}
*succeedSent = append(*succeedSent, succeedSentMessage{
- metadata: writeRequest.GetMetadata(),
+ metadata: metadata,
messageID: writeRequest.GetMessageId(),
nodes: nodes,
})
return nil
}
-func (ms *measureService) publishToNodes(ctx context.Context, writeRequest
*measurev1.WriteRequest, iwr *measurev1.InternalWriteRequest,
+func (ms *measureService) publishToNodes(ctx context.Context, writeRequest
*measurev1.WriteRequest,
+ metadata *commonv1.Metadata, spec *measurev1.DataPointSpec, iwr
*measurev1.InternalWriteRequest,
publisher queue.BatchPublisher, shardID uint32, measure
measurev1.MeasureService_WriteServer,
+ nodeMetadataSent map[string]bool, nodeSpecSent map[string]bool,
) ([]string, error) {
- nodeID, errPickNode :=
ms.nodeRegistry.Locate(writeRequest.GetMetadata().GetGroup(),
writeRequest.GetMetadata().GetName(), shardID, 0)
+ nodeID, errPickNode := ms.nodeRegistry.Locate(metadata.GetGroup(),
metadata.GetName(), shardID, 0)
if errPickNode != nil {
ms.l.Error().Err(errPickNode).RawJSON("written",
logger.Proto(writeRequest)).Msg("failed to pick an available node")
- ms.sendReply(writeRequest.GetMetadata(),
modelv1.Status_STATUS_INTERNAL_ERROR, writeRequest.GetMessageId(), measure)
+ ms.sendReply(metadata, modelv1.Status_STATUS_INTERNAL_ERROR,
writeRequest.GetMessageId(), measure)
return nil, errPickNode
}
+ if !nodeMetadataSent[nodeID] {
+ iwr.Request.Metadata = metadata
+ nodeMetadataSent[nodeID] = true
+ }
+ if spec != nil && !nodeSpecSent[nodeID] {
+ iwr.Request.DataPointSpec = spec
+ nodeSpecSent[nodeID] = true
+ }
+
message :=
bus.NewBatchMessageWithNode(bus.MessageID(time.Now().UnixNano()), nodeID, iwr)
_, errWritePub := publisher.Publish(ctx, data.TopicMeasureWrite,
message)
if errWritePub != nil {
ms.l.Error().Err(errWritePub).RawJSON("written",
logger.Proto(writeRequest)).Str("nodeID", nodeID).Msg("failed to send a
message")
var ce *common.Error
if errors.As(errWritePub, &ce) {
- ms.sendReply(writeRequest.GetMetadata(), ce.Status(),
writeRequest.GetMessageId(), measure)
+ ms.sendReply(metadata, ce.Status(),
writeRequest.GetMessageId(), measure)
return nil, errWritePub
}
- ms.sendReply(writeRequest.GetMetadata(),
modelv1.Status_STATUS_INTERNAL_ERROR, writeRequest.GetMessageId(), measure)
+ ms.sendReply(metadata, modelv1.Status_STATUS_INTERNAL_ERROR,
writeRequest.GetMessageId(), measure)
return nil, errWritePub
}
return []string{nodeID}, nil
}
+func (ms *measureService) navigate(metadata *commonv1.Metadata,
+ writeRequest *measurev1.WriteRequest, spec *measurev1.DataPointSpec,
+) (pbv1.EntityValues, common.ShardID, error) {
+ tagFamilies := writeRequest.GetDataPoint().GetTagFamilies()
+ if spec == nil {
+ return ms.navigateByLocator(metadata, tagFamilies)
+ }
+ return ms.navigateByTagSpec(metadata, spec, tagFamilies)
+}
+
+func (ms *measureService) navigateByTagSpec(
+ metadata *commonv1.Metadata, spec *measurev1.DataPointSpec, tagFamilies
[]*modelv1.TagFamilyForWrite,
+) (pbv1.EntityValues, common.ShardID, error) {
+ shardNum, existed := ms.groupRepo.shardNum(metadata.Group)
+ if !existed {
+ return nil, common.ShardID(0), errors.Wrapf(errNotExist,
"finding the shard num by: %v", metadata)
+ }
+ id := getID(metadata)
+ measure, ok := ms.entityRepo.getMeasure(id)
+ if !ok {
+ return nil, common.ShardID(0), errors.Wrapf(errNotExist,
"finding measure schema by: %v", metadata)
+ }
+ specFamilyMap, specTagMaps := ms.buildSpecMaps(spec)
+
+ entityValues := ms.findTagValuesByNames(
+ metadata.Name,
+ measure.GetTagFamilies(),
+ tagFamilies,
+ measure.GetEntity().GetTagNames(),
+ specFamilyMap,
+ specTagMaps,
+ )
+ entity, err := entityValues.ToEntity()
+ if err != nil {
+ return nil, common.ShardID(0), err
+ }
+
+ shardingKey := measure.GetShardingKey()
+ if shardingKey != nil && len(shardingKey.GetTagNames()) > 0 {
+ shardingKeyValues := ms.findTagValuesByNames(
+ metadata.Name,
+ measure.GetTagFamilies(),
+ tagFamilies,
+ shardingKey.GetTagNames(),
+ specFamilyMap,
+ specTagMaps,
+ )
+ shardingEntity, shardingErr := shardingKeyValues.ToEntity()
+ if shardingErr != nil {
+ return nil, common.ShardID(0), shardingErr
+ }
+ shardID, shardingErr :=
partition.ShardID(shardingEntity.Marshal(), shardNum)
+ if shardingErr != nil {
+ return nil, common.ShardID(0), shardingErr
+ }
+ return entityValues, common.ShardID(shardID), nil
+ }
+
+ shardID, err := partition.ShardID(entity.Marshal(), shardNum)
+ if err != nil {
+ return nil, common.ShardID(0), err
+ }
+ return entityValues, common.ShardID(shardID), nil
+}
+
+func (ms *measureService) buildSpecMaps(spec *measurev1.DataPointSpec)
(map[string]int, map[string]map[string]int) {
+ specFamilyMap := make(map[string]int)
+ specTagMaps := make(map[string]map[string]int)
+ for i, specFamily := range spec.GetTagFamilySpec() {
+ specFamilyMap[specFamily.GetName()] = i
+ tagMap := make(map[string]int)
+ for j, tagName := range specFamily.GetTagNames() {
+ tagMap[tagName] = j
+ }
+ specTagMaps[specFamily.GetName()] = tagMap
+ }
+ return specFamilyMap, specTagMaps
+}
+
+func (ms *measureService) findTagValuesByNames(
+ subject string,
+ schemaFamilies []*databasev1.TagFamilySpec,
+ srcTagFamilies []*modelv1.TagFamilyForWrite,
+ tagNames []string,
+ specFamilyMap map[string]int,
+ specTagMaps map[string]map[string]int,
+) pbv1.EntityValues {
+ entityValues := make(pbv1.EntityValues, len(tagNames)+1)
+ entityValues[0] = pbv1.EntityStrValue(subject)
+ for i, tagName := range tagNames {
+ tagValue := ms.findTagValueByName(schemaFamilies,
srcTagFamilies, tagName, specFamilyMap, specTagMaps)
+ if tagValue == nil {
+ entityValues[i+1] = &modelv1.TagValue{Value:
&modelv1.TagValue_Null{}}
+ } else {
+ entityValues[i+1] = tagValue
+ }
+ }
+ return entityValues
+}
+
+func (ms *measureService) findTagValueByName(
+ schemaFamilies []*databasev1.TagFamilySpec,
+ srcTagFamilies []*modelv1.TagFamilyForWrite,
+ tagName string,
+ specFamilyMap map[string]int,
+ specTagMaps map[string]map[string]int,
+) *modelv1.TagValue {
+ for _, schemaFamily := range schemaFamilies {
+ for _, schemaTag := range schemaFamily.GetTags() {
+ if schemaTag.GetName() != tagName {
+ continue
+ }
+ familyIdx, ok := specFamilyMap[schemaFamily.GetName()]
+ if !ok || familyIdx >= len(srcTagFamilies) {
+ return nil
+ }
+ tagMap := specTagMaps[schemaFamily.GetName()]
+ if tagMap == nil {
+ return nil
+ }
+ tagIdx, ok := tagMap[tagName]
+ if !ok || tagIdx >=
len(srcTagFamilies[familyIdx].GetTags()) {
+ return nil
+ }
+ return srcTagFamilies[familyIdx].GetTags()[tagIdx]
+ }
+ }
+ return nil
+}
+
func (ms *measureService) sendReply(metadata *commonv1.Metadata, status
modelv1.Status, messageID uint64, measure measurev1.MeasureService_WriteServer)
{
if status != modelv1.Status_STATUS_SUCCEED {
ms.metrics.totalStreamMsgReceivedErr.Inc(1, metadata.Group,
"measure", "write")
Review Comment:
Potential nil pointer dereference bug. When `sendReply` is called with `nil`
metadata (line 117), it will panic at line 401 when trying to access
`metadata.Group`. This should be fixed by either:
1. Checking if metadata is nil before accessing its fields in `sendReply`, or
2. Using a placeholder/empty metadata value instead of nil when calling
`sendReply`
Recommended fix:
```go
func (ms *measureService) sendReply(metadata *commonv1.Metadata, status
modelv1.Status, messageID uint64, measure measurev1.MeasureService_WriteServer)
{
if metadata != nil {
if status != modelv1.Status_STATUS_SUCCEED {
ms.metrics.totalStreamMsgReceivedErr.Inc(1,
metadata.Group, "measure", "write")
}
ms.metrics.totalStreamMsgSent.Inc(1, metadata.Group, "measure",
"write")
}
if errResp := measure.Send(&measurev1.WriteResponse{Metadata: metadata,
Status: status.String(), MessageId: messageID}); errResp != nil {
if dl := ms.l.Debug(); dl.Enabled() {
dl.Err(errResp).Msg("failed to send measure write
response")
}
if metadata != nil {
ms.metrics.totalStreamMsgSentErr.Inc(1, metadata.Group,
"measure", "write")
}
}
}
```
##########
banyand/measure/write_standalone.go:
##########
@@ -250,35 +270,68 @@ func newDpt(segment storage.Segment[*tsTable, option],
timeRange timestamp.TimeR
return dpt
}
-func handleTagFamily(schema *databasev1.Measure, req *measurev1.WriteRequest,
locator partition.IndexRuleLocator) ([]nameValues, []index.Field) {
+func handleTagFamily(schema *databasev1.Measure, req *measurev1.WriteRequest,
+ locator partition.IndexRuleLocator, spec *measurev1.DataPointSpec,
+) ([]nameValues, []index.Field) {
tagFamilies := make([]nameValues, 0, len(schema.TagFamilies))
+ var specFamilyMap map[string]int
+ var specTagMaps map[string]map[string]int
+ if spec != nil {
+ specFamilyMap = make(map[string]int)
+ specTagMaps = make(map[string]map[string]int)
+ for i, specFamily := range spec.GetTagFamilySpec() {
+ specFamilyMap[specFamily.GetName()] = i
+ tagMap := make(map[string]int)
+ for j, tagName := range specFamily.GetTagNames() {
+ tagMap[tagName] = j
+ }
+ specTagMaps[specFamily.GetName()] = tagMap
+ }
+ }
Review Comment:
There is significant code duplication between `handleTagFamily` and
`handleIndexMode` in the spec map building logic (lines 278-291 in both
functions). Consider extracting this into a shared helper function to reduce
maintenance burden and ensure consistency.
Example:
```go
func buildSpecMaps(spec *measurev1.DataPointSpec) (map[string]int,
map[string]map[string]int) {
if spec == nil {
return nil, nil
}
specFamilyMap := make(map[string]int)
specTagMaps := make(map[string]map[string]int)
for i, specFamily := range spec.GetTagFamilySpec() {
specFamilyMap[specFamily.GetName()] = i
tagMap := make(map[string]int)
for j, tagName := range specFamily.GetTagNames() {
tagMap[tagName] = j
}
specTagMaps[specFamily.GetName()] = tagMap
}
return specFamilyMap, specTagMaps
}
```
##########
banyand/measure/write_standalone.go:
##########
@@ -250,35 +270,68 @@ func newDpt(segment storage.Segment[*tsTable, option],
timeRange timestamp.TimeR
return dpt
}
-func handleTagFamily(schema *databasev1.Measure, req *measurev1.WriteRequest,
locator partition.IndexRuleLocator) ([]nameValues, []index.Field) {
+func handleTagFamily(schema *databasev1.Measure, req *measurev1.WriteRequest,
+ locator partition.IndexRuleLocator, spec *measurev1.DataPointSpec,
+) ([]nameValues, []index.Field) {
tagFamilies := make([]nameValues, 0, len(schema.TagFamilies))
+ var specFamilyMap map[string]int
+ var specTagMaps map[string]map[string]int
+ if spec != nil {
+ specFamilyMap = make(map[string]int)
+ specTagMaps = make(map[string]map[string]int)
+ for i, specFamily := range spec.GetTagFamilySpec() {
+ specFamilyMap[specFamily.GetName()] = i
+ tagMap := make(map[string]int)
+ for j, tagName := range specFamily.GetTagNames() {
+ tagMap[tagName] = j
+ }
+ specTagMaps[specFamily.GetName()] = tagMap
+ }
+ }
+
var fields []index.Field
for i := range schema.GetTagFamilies() {
- var tagFamily *modelv1.TagFamilyForWrite
- if len(req.DataPoint.TagFamilies) <= i {
- tagFamily = pbv1.NullTagFamily
- } else {
- tagFamily = req.DataPoint.TagFamilies[i]
- }
- tfr := locator.TagFamilyTRule[i]
tagFamilySpec := schema.GetTagFamilies()[i]
+ tfr := locator.TagFamilyTRule[i]
+
+ var srcFamily *modelv1.TagFamilyForWrite
+ var specTagMap map[string]int
+ if spec != nil {
+ specIdx, ok := specFamilyMap[tagFamilySpec.Name]
+ if ok && specIdx < len(req.DataPoint.TagFamilies) {
+ srcFamily = req.DataPoint.TagFamilies[specIdx]
+ }
+ specTagMap = specTagMaps[tagFamilySpec.Name]
+ } else if len(req.DataPoint.TagFamilies) > i {
+ srcFamily = req.DataPoint.TagFamilies[i]
+ }
+
tf := nameValues{
name: tagFamilySpec.Name,
}
for j := range tagFamilySpec.Tags {
+ t := tagFamilySpec.Tags[j]
+
var tagValue *modelv1.TagValue
- if tagFamily == pbv1.NullTagFamily ||
len(tagFamily.Tags) <= j {
- tagValue = pbv1.NullTagValue
+ if spec != nil {
+ if srcFamily != nil && specTagMap != nil {
+ if srcTagIdx, ok := specTagMap[t.Name];
ok && srcTagIdx < len(srcFamily.Tags) {
+ tagValue =
srcFamily.Tags[srcTagIdx]
+ }
+ }
+ if tagValue == nil {
+ tagValue = pbv1.NullTagValue
+ }
} else {
- tagValue = tagFamily.Tags[j]
+ if srcFamily == nil || len(srcFamily.Tags) <= j
{
+ tagValue = pbv1.NullTagValue
+ } else {
+ tagValue = srcFamily.Tags[j]
+ }
}
Review Comment:
There is significant code duplication between `handleTagFamily` and
`handleIndexMode` for the tag value extraction logic (lines 298-332 vs
395-426). The logic for determining the source tag family and extracting tag
values is nearly identical. Consider extracting this into a shared helper
function to improve maintainability.
Example:
```go
func getSourceFamilyAndTagMap(spec *measurev1.DataPointSpec, tagFamilySpec
*databasev1.TagFamilySpec,
req *measurev1.WriteRequest, i int, specFamilyMap map[string]int,
specTagMaps map[string]map[string]int) (*modelv1.TagFamilyForWrite,
map[string]int) {
var srcFamily *modelv1.TagFamilyForWrite
var specTagMap map[string]int
if spec != nil {
specIdx, ok := specFamilyMap[tagFamilySpec.Name]
if ok && specIdx < len(req.DataPoint.TagFamilies) {
srcFamily = req.DataPoint.TagFamilies[specIdx]
}
specTagMap = specTagMaps[tagFamilySpec.Name]
} else if len(req.DataPoint.TagFamilies) > i {
srcFamily = req.DataPoint.TagFamilies[i]
}
return srcFamily, specTagMap
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]