hanahmily commented on code in PR #961:
URL:
https://github.com/apache/skywalking-banyandb/pull/961#discussion_r2750725413
##########
banyand/measure/block.go:
##########
@@ -1151,6 +1131,95 @@ func fullFieldAppend(bi, b *blockPointer, offset int) {
}
}
+func (bi *blockPointer) mergeAndAppendTopN(left *blockPointer, leftIdx int,
right *blockPointer, rightIdx int, topNPostAggregator PostProcessor) {
+ topNValue := GenerateTopNValue()
+ defer ReleaseTopNValue(topNValue)
+ decoder := GenerateTopNValuesDecoder()
+ defer ReleaseTopNValuesDecoder(decoder)
+
+ uTimestamp := uint64(right.timestamps[rightIdx])
+ marshalBuf := make([]byte, 0, 128)
+ leftVer, rightVer := left.versions[leftIdx], right.versions[rightIdx]
+
+ for idx := range right.field.columns {
+ topNValue.Reset()
+ if err :=
topNValue.Unmarshal(left.field.columns[idx].values[leftIdx], decoder); err !=
nil {
+ log.Error().Err(err).Msg("failed to unmarshal topN
value, skip current batch")
+ // avoid index out-of-bounds issues
+ bi.field.columns[idx].values =
append(bi.field.columns[idx].values, []byte{})
+ continue
+ }
+
+ valueName := topNValue.valueName
+ entityTagNames := topNValue.entityTagNames
+
+ putEntitiesToAggregator(topNValue, topNPostAggregator,
uTimestamp, leftVer)
+
+ topNValue.Reset()
+ if err :=
topNValue.Unmarshal(right.field.columns[idx].values[rightIdx], decoder); err !=
nil {
+ log.Error().Err(err).Msg("failed to unmarshal topN
value, skip current batch")
+ bi.field.columns[idx].values =
append(bi.field.columns[idx].values, []byte{})
+ continue
+ }
+
+ putEntitiesToAggregator(topNValue, topNPostAggregator,
uTimestamp, rightVer)
+
+ topNValue.Reset()
+ topNValue.setMetadata(valueName, entityTagNames)
+
+ items, err := topNPostAggregator.Flush()
+ if err != nil {
+ log.Error().Err(err).Msg("failed to flush aggregator,
skip current batch")
Review Comment:
Panic when flushing fails. This is related to an implementation bug that the
DB can not recover from.
##########
banyand/measure/block.go:
##########
@@ -1151,6 +1131,95 @@ func fullFieldAppend(bi, b *blockPointer, offset int) {
}
}
+func (bi *blockPointer) mergeAndAppendTopN(left *blockPointer, leftIdx int,
right *blockPointer, rightIdx int, topNPostAggregator PostProcessor) {
+ topNValue := GenerateTopNValue()
+ defer ReleaseTopNValue(topNValue)
+ decoder := GenerateTopNValuesDecoder()
+ defer ReleaseTopNValuesDecoder(decoder)
+
+ uTimestamp := uint64(right.timestamps[rightIdx])
+ marshalBuf := make([]byte, 0, 128)
+ leftVer, rightVer := left.versions[leftIdx], right.versions[rightIdx]
+
+ for idx := range right.field.columns {
+ topNValue.Reset()
+ if err :=
topNValue.Unmarshal(left.field.columns[idx].values[leftIdx], decoder); err !=
nil {
+ log.Error().Err(err).Msg("failed to unmarshal topN
value, skip current batch")
+ // avoid index out-of-bounds issues
+ bi.field.columns[idx].values =
append(bi.field.columns[idx].values, []byte{})
+ continue
+ }
+
+ valueName := topNValue.valueName
+ entityTagNames := topNValue.entityTagNames
+
+ putEntitiesToAggregator(topNValue, topNPostAggregator,
uTimestamp, leftVer)
+
+ topNValue.Reset()
+ if err :=
topNValue.Unmarshal(right.field.columns[idx].values[rightIdx], decoder); err !=
nil {
+ log.Error().Err(err).Msg("failed to unmarshal topN
value, skip current batch")
+ bi.field.columns[idx].values =
append(bi.field.columns[idx].values, []byte{})
+ continue
+ }
+
+ putEntitiesToAggregator(topNValue, topNPostAggregator,
uTimestamp, rightVer)
+
+ topNValue.Reset()
+ topNValue.setMetadata(valueName, entityTagNames)
+
+ items, err := topNPostAggregator.Flush()
+ if err != nil {
+ log.Error().Err(err).Msg("failed to flush aggregator,
skip current batch")
+ continue
+ }
+
+ for _, item := range items {
+ topNValue.addValue(item.val, item.values)
+ }
+
+ marshalBuf = marshalBuf[:0]
+ buf, err := topNValue.marshal(marshalBuf)
+ if err != nil {
+ log.Error().Err(err).Msg("failed to marshal merged topN
value")
+ continue
+ }
+
+ bi.field.columns[idx].values =
append(bi.field.columns[idx].values, buf)
+ }
Review Comment:
It makes sense. Refer to blockPointer to implement the initialization of
columns.
##########
banyand/measure/block.go:
##########
@@ -1151,6 +1131,95 @@ func fullFieldAppend(bi, b *blockPointer, offset int) {
}
}
+func (bi *blockPointer) mergeAndAppendTopN(left *blockPointer, leftIdx int,
right *blockPointer, rightIdx int, topNPostAggregator PostProcessor) {
+ topNValue := GenerateTopNValue()
+ defer ReleaseTopNValue(topNValue)
+ decoder := GenerateTopNValuesDecoder()
+ defer ReleaseTopNValuesDecoder(decoder)
+
+ uTimestamp := uint64(right.timestamps[rightIdx])
+ marshalBuf := make([]byte, 0, 128)
+ leftVer, rightVer := left.versions[leftIdx], right.versions[rightIdx]
+
+ for idx := range right.field.columns {
+ topNValue.Reset()
+ if err :=
topNValue.Unmarshal(left.field.columns[idx].values[leftIdx], decoder); err !=
nil {
+ log.Error().Err(err).Msg("failed to unmarshal topN
value, skip current batch")
+ // avoid index out-of-bounds issues
Review Comment:
Keep the value that can be unmarshaled successfully. Do not give an empty
value when any side is malformed.
##########
banyand/measure/block.go:
##########
@@ -1151,6 +1131,95 @@ func fullFieldAppend(bi, b *blockPointer, offset int) {
}
}
+func (bi *blockPointer) mergeAndAppendTopN(left *blockPointer, leftIdx int,
right *blockPointer, rightIdx int, topNPostAggregator PostProcessor) {
+ topNValue := GenerateTopNValue()
+ defer ReleaseTopNValue(topNValue)
+ decoder := GenerateTopNValuesDecoder()
+ defer ReleaseTopNValuesDecoder(decoder)
+
+ uTimestamp := uint64(right.timestamps[rightIdx])
+ marshalBuf := make([]byte, 0, 128)
+ leftVer, rightVer := left.versions[leftIdx], right.versions[rightIdx]
+
+ for idx := range right.field.columns {
+ topNValue.Reset()
+ if err :=
topNValue.Unmarshal(left.field.columns[idx].values[leftIdx], decoder); err !=
nil {
+ log.Error().Err(err).Msg("failed to unmarshal topN
value, skip current batch")
+ // avoid index out-of-bounds issues
+ bi.field.columns[idx].values =
append(bi.field.columns[idx].values, []byte{})
+ continue
+ }
+
+ valueName := topNValue.valueName
+ entityTagNames := topNValue.entityTagNames
+
+ putEntitiesToAggregator(topNValue, topNPostAggregator,
uTimestamp, leftVer)
+
+ topNValue.Reset()
+ if err :=
topNValue.Unmarshal(right.field.columns[idx].values[rightIdx], decoder); err !=
nil {
+ log.Error().Err(err).Msg("failed to unmarshal topN
value, skip current batch")
+ bi.field.columns[idx].values =
append(bi.field.columns[idx].values, []byte{})
+ continue
+ }
+
+ putEntitiesToAggregator(topNValue, topNPostAggregator,
uTimestamp, rightVer)
+
+ topNValue.Reset()
+ topNValue.setMetadata(valueName, entityTagNames)
+
+ items, err := topNPostAggregator.Flush()
+ if err != nil {
+ log.Error().Err(err).Msg("failed to flush aggregator,
skip current batch")
+ continue
+ }
+
+ for _, item := range items {
+ topNValue.addValue(item.val, item.values)
+ }
+
+ marshalBuf = marshalBuf[:0]
+ buf, err := topNValue.marshal(marshalBuf)
+ if err != nil {
+ log.Error().Err(err).Msg("failed to marshal merged topN
value")
Review Comment:
Panic as well
##########
banyand/measure/metadata.go:
##########
@@ -712,3 +713,35 @@ func GetTopNSchemaMetadata(group string)
*commonv1.Metadata {
func getKey(metadata *commonv1.Metadata) string {
return path.Join(metadata.GetGroup(), metadata.GetName())
}
+
+// TopNParameters defines the structure for the "parameters" tag value (JSON).
+type TopNParameters struct {
Review Comment:
Since there is only 1 value here, we don't put the json into the tag either.
##########
banyand/measure/block.go:
##########
@@ -1151,6 +1131,95 @@ func fullFieldAppend(bi, b *blockPointer, offset int) {
}
}
+func (bi *blockPointer) mergeAndAppendTopN(left *blockPointer, leftIdx int,
right *blockPointer, rightIdx int, topNPostAggregator PostProcessor) {
Review Comment:
You should reset the PostProcessor before putting values into it.
##########
banyand/measure/merger_test.go:
##########
@@ -211,6 +212,102 @@ func Test_mergeTwoBlocks(t *testing.T) {
},
want: &blockPointer{block: mergedBlock, bm:
blockMetadata{timestamps: timestampsMetadata{min: 1, max: 4}}},
},
+ {
Review Comment:
Add more cases:
1. The first timestamps are identical between left and right, which mocks
the issues found by the copilot.
2. Malformed value for either or both right and left.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]