Copilot commented on code in PR #900:
URL:
https://github.com/apache/skywalking-banyandb/pull/900#discussion_r2638354735
##########
banyand/cmd/dump/stream.go:
##########
@@ -516,6 +528,20 @@ func openStreamPart(id uint64, root string, fileSystem
fs.FileSystem) (*streamPa
return nil, fmt.Errorf("cannot open timestamps.bin: %w", err)
}
+ // Try to open series metadata file (optional, for backward
compatibility)
+ seriesMetadataPath := filepath.Join(partPath, "smeta.bin")
+ reader, err := fileSystem.OpenFile(seriesMetadataPath)
+ if err != nil {
+ // Only ignore file not found errors; other errors should be
reported
+ var fsErr *fs.FileSystemError
+ if !errors.As(err, &fsErr) || fsErr.Code != fs.IsNotExistError {
+ fmt.Fprintf(os.Stderr, "Warning: Failed to open series
metadata file %s: %v\n", seriesMetadataPath, err)
+ }
+ // File doesn't exist, it's okay - just continue without it
+ } else {
+ p.seriesMetadata = reader
+ }
Review Comment:
The error handling logic for opening the series metadata file (lines
531-543) is duplicated identically across openFilePart in trace.go,
openStreamPart in stream.go, and openMeasurePart in measure.go. This represents
significant code duplication. Consider extracting this into a shared helper
function to reduce duplication and improve maintainability.
##########
banyand/cmd/dump/trace.go:
##########
@@ -1297,3 +1341,38 @@ func writeTraceRowAsCSV(writer *csv.Writer, row
traceRowData, tagColumns []strin
return writer.Write(csvRow)
}
+
+func (ctx *traceDumpContext) parseAndDisplaySeriesMetadata(partID uint64, p
*part) error {
+ // Read all data from series metadata file
+ seqReader := p.seriesMetadata.SequentialRead()
+ defer seqReader.Close()
+
+ readMetadataBytes, err := io.ReadAll(seqReader)
+ if err != nil {
+ return fmt.Errorf("failed to read series metadata: %w", err)
+ }
+
+ if len(readMetadataBytes) == 0 {
+ return nil // Empty file, nothing to parse
+ }
+
+ // Unmarshal Documents
+ var docs index.Documents
+ if err := docs.Unmarshal(readMetadataBytes); err != nil {
+ return fmt.Errorf("failed to unmarshal series metadata: %w",
err)
+ }
+
+ if len(docs) == 0 {
+ return nil // No documents
+ }
+
+ // Store EntityValues in partSeriesMap for use in CSV output
+ partMap := make(map[common.SeriesID]string)
+ for _, doc := range docs {
+ seriesID := common.SeriesID(convert.Hash(doc.EntityValues))
+ partMap[seriesID] = string(doc.EntityValues)
+ }
+ ctx.partSeriesMap[partID] = partMap
+
+ return nil
+}
Review Comment:
The parseAndDisplaySeriesMetadata function is duplicated identically across
trace.go, stream.go, and measure.go (lines 1345-1378, 1145-1178, and 1343-1376
respectively). This violates the DRY principle and makes maintenance harder.
Consider extracting this logic into a shared helper function that accepts a
generic reader interface, since the logic is identical except for the receiver
type.
##########
banyand/cmd/dump/trace_test.go:
##########
@@ -187,3 +191,91 @@ func decodeStringArray(data []byte) []string {
}
return values
}
+
+// TestDumpTracePartWithSeriesMetadata tests that the dump tool can parse
smeta file.
+// This test creates a part with series metadata and verifies the dump tool
can correctly parse it.
+func TestDumpTracePartWithSeriesMetadata(t *testing.T) {
+ tmpPath, defFn := test.Space(require.New(t))
+ defer defFn()
+
+ fileSystem := fs.NewLocalFileSystem()
+
+ // Create a part with series metadata
+ partPath, cleanup := createTestTracePartWithSeriesMetadata(tmpPath,
fileSystem)
+ defer cleanup()
+
+ // Extract part ID from path
+ partName := filepath.Base(partPath)
+ partID, err := strconv.ParseUint(partName, 16, 64)
+ require.NoError(t, err, "part directory should have valid hex name")
+
+ // Parse the part using dump tool functions
+ p, err := openFilePart(partID, filepath.Dir(partPath), fileSystem)
+ require.NoError(t, err, "should be able to open part with series
metadata")
+ defer closePart(p)
+
+ // Verify series metadata reader is available
+ assert.NotNil(t, p.seriesMetadata, "series metadata reader should be
available")
+
+ // Create a dump context to test parsing
+ opts := traceDumpOptions{
+ shardPath: filepath.Dir(partPath),
+ segmentPath: tmpPath, // Not used for this test
+ verbose: false,
+ csvOutput: false,
+ }
+ ctx, err := newTraceDumpContext(opts)
+ require.NoError(t, err)
+ if ctx != nil {
+ defer ctx.close()
+ }
+
+ // Test parsing series metadata
+ err = ctx.parseAndDisplaySeriesMetadata(partID, p)
+ require.NoError(t, err, "should be able to parse series metadata")
+
+ // Verify EntityValues are stored in partSeriesMap
+ require.NotNil(t, ctx.partSeriesMap, "partSeriesMap should be
initialized")
+ partMap, exists := ctx.partSeriesMap[partID]
+ require.True(t, exists, "partSeriesMap should contain entry for partID")
+ require.NotNil(t, partMap, "partMap should not be nil")
+
+ // Verify EntityValues are correctly stored
+ // Calculate expected SeriesIDs from EntityValues
+ expectedSeriesID1 :=
common.SeriesID(convert.Hash([]byte("service.name=test-service")))
+ expectedSeriesID2 :=
common.SeriesID(convert.Hash([]byte("service.name=another-service")))
+
+ assert.Contains(t, partMap, expectedSeriesID1, "partMap should contain
first series")
+ assert.Contains(t, partMap, expectedSeriesID2, "partMap should contain
second series")
+ assert.Equal(t, "service.name=test-service",
partMap[expectedSeriesID1], "EntityValues should match")
+ assert.Equal(t, "service.name=another-service",
partMap[expectedSeriesID2], "EntityValues should match")
+}
+
+// createTestTracePartWithSeriesMetadata creates a test trace part with series
metadata.
+func createTestTracePartWithSeriesMetadata(tmpPath string, fileSystem
fs.FileSystem) (string, func()) {
+ // Use trace package to create a part
+ partPath, cleanup := trace.CreateTestPartForDump(tmpPath, fileSystem)
+
+ // Create sample series metadata file
+ seriesMetadataPath := filepath.Join(partPath, "smeta.bin")
+
+ // Create sample documents
+ docs := index.Documents{
+ {
+ DocID: 1,
+ EntityValues: []byte("service.name=test-service"),
+ },
+ {
+ DocID: 2,
+ EntityValues: []byte("service.name=another-service"),
+ },
+ }
+
+ seriesMetadataBytes, err := docs.Marshal()
+ if err != nil {
+ panic(fmt.Sprintf("failed to marshal series metadata documents:
%v", err))
+ }
+ fs.MustFlush(fileSystem, seriesMetadataBytes, seriesMetadataPath,
storage.FilePerm)
+
+ return partPath, cleanup
+}
Review Comment:
The createTestTracePartWithSeriesMetadata helper function is duplicated with
nearly identical logic across trace_test.go, stream_test.go, and
measure_test.go. The only differences are the package-specific
CreateTestPartForDump call and sample entity values. Consider extracting the
common logic into a shared test helper function to reduce duplication and
improve maintainability.
##########
banyand/cmd/dump/measure.go:
##########
@@ -1294,3 +1339,38 @@ func measureTagValueDecoder(valueType pbv1.ValueType,
value []byte, valueArr [][
return pbv1.NullTagValue
}
}
+
+func (ctx *measureDumpContext) parseAndDisplaySeriesMetadata(partID uint64, p
*measurePart) error {
+ // Read all data from series metadata file
+ seqReader := p.seriesMetadata.SequentialRead()
+ defer seqReader.Close()
+
+ readMetadataBytes, err := io.ReadAll(seqReader)
+ if err != nil {
+ return fmt.Errorf("failed to read series metadata: %w", err)
+ }
+
+ if len(readMetadataBytes) == 0 {
+ return nil // Empty file, nothing to parse
+ }
+
+ // Unmarshal Documents
+ var docs index.Documents
+ if err := docs.Unmarshal(readMetadataBytes); err != nil {
+ return fmt.Errorf("failed to unmarshal series metadata: %w",
err)
+ }
+
+ if len(docs) == 0 {
+ return nil // No documents
+ }
+
+ // Store EntityValues in partSeriesMap for use in CSV output
+ partMap := make(map[common.SeriesID]string)
+ for _, doc := range docs {
+ seriesID := common.SeriesID(convert.Hash(doc.EntityValues))
+ partMap[seriesID] = string(doc.EntityValues)
+ }
+ ctx.partSeriesMap[partID] = partMap
+
+ return nil
+}
Review Comment:
The parseAndDisplaySeriesMetadata function is duplicated identically across
trace.go, stream.go, and measure.go. This violates the DRY principle and makes
maintenance harder. Consider extracting this logic into a shared helper
function that accepts a generic reader interface, since the logic is identical
except for the receiver type.
##########
banyand/cmd/dump/measure_test.go:
##########
@@ -158,3 +163,91 @@ func TestDumpMeasurePartFormat(t *testing.T) {
t.Logf("Successfully parsed part with %d data points across %d primary
blocks (metadata BlocksCount=%d)",
totalDataPoints, len(p.primaryBlockMetadata),
p.partMetadata.BlocksCount)
}
+
+// TestDumpMeasurePartWithSeriesMetadata tests that the dump tool can parse
smeta file.
+// This test creates a part with series metadata and verifies the dump tool
can correctly parse it.
+func TestDumpMeasurePartWithSeriesMetadata(t *testing.T) {
+ tmpPath, defFn := test.Space(require.New(t))
+ defer defFn()
+
+ fileSystem := fs.NewLocalFileSystem()
+
+ // Create a part with series metadata
+ partPath, cleanup := createTestMeasurePartWithSeriesMetadata(tmpPath,
fileSystem)
+ defer cleanup()
+
+ // Extract part ID from path
+ partName := filepath.Base(partPath)
+ partID, err := strconv.ParseUint(partName, 16, 64)
+ require.NoError(t, err, "part directory should have valid hex name")
+
+ // Parse the part using dump tool functions
+ p, err := openMeasurePart(partID, filepath.Dir(partPath), fileSystem)
+ require.NoError(t, err, "should be able to open part with series
metadata")
+ defer closeMeasurePart(p)
+
+ // Verify series metadata reader is available
+ assert.NotNil(t, p.seriesMetadata, "series metadata reader should be
available")
+
+ // Create a dump context to test parsing
+ opts := measureDumpOptions{
+ shardPath: filepath.Dir(partPath),
+ segmentPath: tmpPath, // Not used for this test
+ verbose: false,
+ csvOutput: false,
+ }
+ ctx, err := newMeasureDumpContext(opts)
+ require.NoError(t, err)
+ if ctx != nil {
+ defer ctx.close()
+ }
+
+ // Test parsing series metadata
+ err = ctx.parseAndDisplaySeriesMetadata(partID, p)
+ require.NoError(t, err, "should be able to parse series metadata")
+
+ // Verify EntityValues are stored in partSeriesMap
+ require.NotNil(t, ctx.partSeriesMap, "partSeriesMap should be
initialized")
+ partMap, exists := ctx.partSeriesMap[partID]
+ require.True(t, exists, "partSeriesMap should contain entry for partID")
+ require.NotNil(t, partMap, "partMap should not be nil")
+
+ // Verify EntityValues are correctly stored
+ // Calculate expected SeriesIDs from EntityValues
+ expectedSeriesID1 :=
common.SeriesID(convert.Hash([]byte("service.name=test-service")))
+ expectedSeriesID2 :=
common.SeriesID(convert.Hash([]byte("service.name=another-service")))
+
+ assert.Contains(t, partMap, expectedSeriesID1, "partMap should contain
first series")
+ assert.Contains(t, partMap, expectedSeriesID2, "partMap should contain
second series")
+ assert.Equal(t, "service.name=test-service",
partMap[expectedSeriesID1], "EntityValues should match")
+ assert.Equal(t, "service.name=another-service",
partMap[expectedSeriesID2], "EntityValues should match")
+}
+
+// createTestMeasurePartWithSeriesMetadata creates a test measure part with
series metadata.
+func createTestMeasurePartWithSeriesMetadata(tmpPath string, fileSystem
fs.FileSystem) (string, func()) {
+ // Use measure package to create a part
+ partPath, cleanup := measure.CreateTestPartForDump(tmpPath, fileSystem)
+
+ // Create sample series metadata file
+ seriesMetadataPath := filepath.Join(partPath, "smeta.bin")
+
+ // Create sample documents
+ docs := index.Documents{
+ {
+ DocID: 1,
+ EntityValues: []byte("service.name=test-service"),
+ },
+ {
+ DocID: 2,
+ EntityValues: []byte("service.name=another-service"),
+ },
+ }
+
+ seriesMetadataBytes, err := docs.Marshal()
+ if err != nil {
+ panic(fmt.Sprintf("failed to marshal series metadata documents:
%v", err))
+ }
+ fs.MustFlush(fileSystem, seriesMetadataBytes, seriesMetadataPath,
storage.FilePerm)
+
+ return partPath, cleanup
+}
Review Comment:
The createTestMeasurePartWithSeriesMetadata helper function is duplicated
with nearly identical logic across trace_test.go, stream_test.go, and
measure_test.go. The only differences are the package-specific
CreateTestPartForDump call and sample entity values. Consider extracting the
common logic into a shared test helper function to reduce duplication and
improve maintainability.
##########
banyand/cmd/dump/measure.go:
##########
@@ -589,6 +601,20 @@ func openMeasurePart(id uint64, root string, fileSystem
fs.FileSystem) (*measure
return nil, fmt.Errorf("cannot open fv.bin: %w", err)
}
+ // Try to open series metadata file (optional, for backward
compatibility)
+ seriesMetadataPath := filepath.Join(partPath, "smeta.bin")
+ reader, err := fileSystem.OpenFile(seriesMetadataPath)
+ if err != nil {
+ // Only ignore file not found errors; other errors should be
reported
+ var fsErr *fs.FileSystemError
+ if !errors.As(err, &fsErr) || fsErr.Code != fs.IsNotExistError {
+ fmt.Fprintf(os.Stderr, "Warning: Failed to open series
metadata file %s: %v\n", seriesMetadataPath, err)
+ }
+ // File doesn't exist, it's okay - just continue without it
+ } else {
+ p.seriesMetadata = reader
+ }
Review Comment:
The error handling logic for opening the series metadata file (lines
604-616) is duplicated identically across openFilePart in trace.go,
openStreamPart in stream.go, and openMeasurePart in measure.go. This represents
significant code duplication. Consider extracting this into a shared helper
function to reduce duplication and improve maintainability.
##########
banyand/cmd/dump/stream_test.go:
##########
@@ -148,3 +153,92 @@ func TestDumpStreamPartFormat(t *testing.T) {
func createTestStreamPartForDump(tmpPath string, fileSystem fs.FileSystem)
(string, func()) {
return stream.CreateTestPartForDump(tmpPath, fileSystem)
}
+
+// TestDumpStreamPartWithSeriesMetadata tests that the dump tool can parse
smeta file.
+// This test creates a part with series metadata and verifies the dump tool
can correctly parse it.
+func TestDumpStreamPartWithSeriesMetadata(t *testing.T) {
+ tmpPath, defFn := test.Space(require.New(t))
+ defer defFn()
+
+ fileSystem := fs.NewLocalFileSystem()
+
+ // Create a part with series metadata
+ partPath, cleanup := createTestStreamPartWithSeriesMetadata(tmpPath,
fileSystem)
+ defer cleanup()
+
+ // Extract part ID from path
+ partName := filepath.Base(partPath)
+ partID, err := strconv.ParseUint(partName, 16, 64)
+ require.NoError(t, err, "part directory should have valid hex name")
+
+ // Parse the part using dump tool functions
+ p, err := openStreamPart(partID, filepath.Dir(partPath), fileSystem)
+ require.NoError(t, err, "should be able to open part with series
metadata")
+ defer closeStreamPart(p)
+
+ // Verify series metadata reader is available
+ assert.NotNil(t, p.seriesMetadata, "series metadata reader should be
available")
+
+ // Create a dump context to test parsing
+ opts := streamDumpOptions{
+ shardPath: filepath.Dir(partPath),
+ segmentPath: tmpPath, // Not used for this test
+ verbose: false,
+ csvOutput: false,
+ }
+ ctx, err := newStreamDumpContext(opts)
+ require.NoError(t, err)
+ if ctx != nil {
+ defer ctx.close()
+ }
+
+ // Test parsing series metadata
+ err = ctx.parseAndDisplaySeriesMetadata(partID, p)
+ require.NoError(t, err, "should be able to parse series metadata")
+
+ // Verify EntityValues are stored in partSeriesMap
+ require.NotNil(t, ctx.partSeriesMap, "partSeriesMap should be
initialized")
+ partMap, exists := ctx.partSeriesMap[partID]
+ require.True(t, exists, "partSeriesMap should contain entry for partID")
+ require.NotNil(t, partMap, "partMap should not be nil")
+
+ // Verify EntityValues are correctly stored
+ // Calculate expected SeriesIDs from EntityValues
+ expectedSeriesID1 :=
common.SeriesID(convert.Hash([]byte("test=entity1")))
+ expectedSeriesID2 :=
common.SeriesID(convert.Hash([]byte("test=entity2")))
+
+ assert.Contains(t, partMap, expectedSeriesID1, "partMap should contain
first series")
+ assert.Contains(t, partMap, expectedSeriesID2, "partMap should contain
second series")
+ assert.Equal(t, "test=entity1", partMap[expectedSeriesID1],
"EntityValues should match")
+ assert.Equal(t, "test=entity2", partMap[expectedSeriesID2],
"EntityValues should match")
+}
+
+// createTestStreamPartWithSeriesMetadata creates a test stream part with
series metadata.
+func createTestStreamPartWithSeriesMetadata(tmpPath string, fileSystem
fs.FileSystem) (string, func()) {
+ // Use stream package to create a part
+ partPath, cleanup := stream.CreateTestPartForDump(tmpPath, fileSystem)
+
+ // Create sample series metadata file
+ // This is a simplified version - in real scenarios, smeta is created
during flush
+ seriesMetadataPath := filepath.Join(partPath, "smeta.bin")
+
+ // Create sample documents
+ docs := index.Documents{
+ {
+ DocID: 1,
+ EntityValues: []byte("test=entity1"),
+ },
+ {
+ DocID: 2,
+ EntityValues: []byte("test=entity2"),
+ },
+ }
+
+ seriesMetadataBytes, err := docs.Marshal()
+ if err != nil {
+ panic(fmt.Sprintf("failed to marshal series metadata documents:
%v", err))
+ }
+ fs.MustFlush(fileSystem, seriesMetadataBytes, seriesMetadataPath,
storage.FilePerm)
+
+ return partPath, cleanup
+}
Review Comment:
The createTestStreamPartWithSeriesMetadata helper function is duplicated
with nearly identical logic across trace_test.go, stream_test.go, and
measure_test.go. The only differences are the package-specific
CreateTestPartForDump call and sample entity values. Consider extracting the
common logic into a shared test helper function to reduce duplication and
improve maintainability.
##########
banyand/cmd/dump/trace.go:
##########
@@ -575,6 +587,20 @@ func openFilePart(id uint64, root string, fileSystem
fs.FileSystem) (*part, erro
return nil, fmt.Errorf("cannot open spans.bin: %w", err)
}
+ // Try to open series metadata file (optional, for backward
compatibility)
+ seriesMetadataPath := filepath.Join(partPath, "smeta.bin")
+ reader, err := fileSystem.OpenFile(seriesMetadataPath)
+ if err != nil {
+ // Only ignore file not found errors; other errors should be
reported
+ var fsErr *fs.FileSystemError
+ if !errors.As(err, &fsErr) || fsErr.Code != fs.IsNotExistError {
+ fmt.Fprintf(os.Stderr, "Warning: Failed to open series
metadata file %s: %v\n", seriesMetadataPath, err)
+ }
+ // File doesn't exist, it's okay - just continue without it
+ } else {
+ p.seriesMetadata = reader
+ }
Review Comment:
The error handling logic for opening the series metadata file (lines
590-602) is duplicated identically across openFilePart in trace.go,
openStreamPart in stream.go, and openMeasurePart in measure.go. This represents
significant code duplication. Consider extracting this into a shared helper
function to reduce duplication and improve maintainability.
##########
banyand/cmd/dump/stream.go:
##########
@@ -1097,3 +1141,38 @@ func streamTagValueDecoder(valueType pbv1.ValueType,
value []byte, valueArr [][]
return pbv1.NullTagValue
}
}
+
+func (ctx *streamDumpContext) parseAndDisplaySeriesMetadata(partID uint64, p
*streamPart) error {
+ // Read all data from series metadata file
+ seqReader := p.seriesMetadata.SequentialRead()
+ defer seqReader.Close()
+
+ readMetadataBytes, err := io.ReadAll(seqReader)
+ if err != nil {
+ return fmt.Errorf("failed to read series metadata: %w", err)
+ }
+
+ if len(readMetadataBytes) == 0 {
+ return nil // Empty file, nothing to parse
+ }
+
+ // Unmarshal Documents
+ var docs index.Documents
+ if err := docs.Unmarshal(readMetadataBytes); err != nil {
+ return fmt.Errorf("failed to unmarshal series metadata: %w",
err)
+ }
+
+ if len(docs) == 0 {
+ return nil // No documents
+ }
+
+ // Store EntityValues in partSeriesMap for use in CSV output
+ partMap := make(map[common.SeriesID]string)
+ for _, doc := range docs {
+ seriesID := common.SeriesID(convert.Hash(doc.EntityValues))
+ partMap[seriesID] = string(doc.EntityValues)
+ }
+ ctx.partSeriesMap[partID] = partMap
+
+ return nil
+}
Review Comment:
The parseAndDisplaySeriesMetadata function is duplicated identically across
trace.go, stream.go, and measure.go. This violates the DRY principle and makes
maintenance harder. Consider extracting this logic into a shared helper
function that accepts a generic reader interface, since the logic is identical
except for the receiver type.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]