This is an automated email from the ASF dual-hosted git repository. wusheng pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push: new 69179312 Implement versioning support in API (#768) 69179312 is described below commit 69179312e966360bad03d2b4a7f4e47177429e9f Author: Gao Hongtao <hanahm...@gmail.com> AuthorDate: Fri Sep 12 16:10:04 2025 +0800 Implement versioning support in API (#768) --- CHANGES.md | 1 + api/proto/banyandb/cluster/v1/rpc.proto | 35 +++ api/proto/banyandb/model/v1/write.proto | 2 + banyand/internal/storage/version.go | 10 + banyand/queue/pub/chunked_sync.go | 12 + banyand/queue/pub/pub.go | 7 + banyand/queue/sub/chunked_sync.go | 89 ++++++- banyand/queue/sub/sub.go | 88 +++++++ banyand/queue/sub/version_compatibility_test.go | 337 ++++++++++++++++++++++++ docs/api-reference.md | 47 ++++ 10 files changed, 626 insertions(+), 2 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 56dcd1e1..dfa07445 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -41,6 +41,7 @@ Release Notes. - Implement a resilient publisher with circuit breaker and retry logic with exponential backoff. - Optimize gRPC message size limits: increase server max receive message size to 16MB and client max receive message size to 32MB for better handling of large time-series data blocks. - Add query access log support for stream, measure, trace, and property services to capture and log all query requests for monitoring and debugging purposes. +- Implement comprehensive version compatibility checking for both regular data transmission and chunked sync operations, ensuring proper API version and file format version validation with detailed error reporting and graceful handling of version mismatches. ### Bug Fixes diff --git a/api/proto/banyandb/cluster/v1/rpc.proto b/api/proto/banyandb/cluster/v1/rpc.proto index 8f49294f..0974843f 100644 --- a/api/proto/banyandb/cluster/v1/rpc.proto +++ b/api/proto/banyandb/cluster/v1/rpc.proto @@ -23,11 +23,37 @@ import "banyandb/model/v1/write.proto"; option go_package = "github.com/apache/skywalking-banyandb/api/proto/banyandb/cluster/v1"; +message VersionInfo { + // file_format_version indicates the file format version used + string file_format_version = 1; + // compatible_file_format_version lists backward compatible versions + repeated string compatible_file_format_version = 2; + // api_version indicates the API semantic version + string api_version = 3; +} + +message VersionCompatibility { + // supported indicates whether the client version is supported + bool supported = 1; + // server_api_version is the API version of the server + string server_api_version = 2; + // supported_api_versions lists API versions supported by the server + repeated string supported_api_versions = 3; + // server_file_format_version is the file format version of the server + string server_file_format_version = 4; + // supported_file_format_versions lists file format versions supported by the server + repeated string supported_file_format_versions = 5; + // reason provides human-readable explanation of version incompatibility + string reason = 6; +} + message SendRequest { string topic = 1; uint64 message_id = 2; bytes body = 3; bool batch_mod = 4; + // version_info contains version information + VersionInfo version_info = 5; } message SendResponse { @@ -35,6 +61,8 @@ message SendResponse { string error = 2; bytes body = 3; model.v1.Status status = 4; + // version_compatibility contains version compatibility information when status indicates version issues + VersionCompatibility version_compatibility = 5; } message HealthCheckRequest { @@ -59,6 +87,9 @@ message SyncPartRequest { SyncMetadata metadata = 7; // Sent with first chunk (chunk_index = 0). SyncCompletion completion = 8; // Sent with last chunk to finalize. } + + // version_info contains version information + VersionInfo version_info = 9; } // Information about a part contained within a chunk. @@ -103,6 +134,8 @@ message SyncPartResponse { SyncStatus status = 3; string error = 4; SyncResult sync_result = 5; // Final result when sync completes. + // version_compatibility contains version compatibility information when status indicates version issues + VersionCompatibility version_compatibility = 6; } // SyncResult contains the result of a sync operation. @@ -130,6 +163,8 @@ enum SyncStatus { SYNC_STATUS_CHUNK_OUT_OF_ORDER = 3; // Chunk received out of expected order. SYNC_STATUS_SESSION_NOT_FOUND = 4; // Session ID not recognized. SYNC_STATUS_SYNC_COMPLETE = 5; // Entire sync operation completed successfully. + SYNC_STATUS_VERSION_UNSUPPORTED = 6; // Version not supported for sync operations. + SYNC_STATUS_FORMAT_VERSION_MISMATCH = 7; // File format version incompatible. } service Service { diff --git a/api/proto/banyandb/model/v1/write.proto b/api/proto/banyandb/model/v1/write.proto index 7a683407..3f25e1ee 100644 --- a/api/proto/banyandb/model/v1/write.proto +++ b/api/proto/banyandb/model/v1/write.proto @@ -31,4 +31,6 @@ enum Status { STATUS_EXPIRED_SCHEMA = 4; STATUS_INTERNAL_ERROR = 5; STATUS_DISK_FULL = 6; + STATUS_VERSION_UNSUPPORTED = 7; // Client version not supported + STATUS_VERSION_DEPRECATED = 8; // Client version deprecated but still supported } diff --git a/banyand/internal/storage/version.go b/banyand/internal/storage/version.go index 2fdac4ca..6f9d6ccf 100644 --- a/banyand/internal/storage/version.go +++ b/banyand/internal/storage/version.go @@ -68,3 +68,13 @@ func readCompatibleVersions() []string { } return vv } + +// GetCurrentVersion returns the current storage version. +func GetCurrentVersion() string { + return currentVersion +} + +// GetCompatibleVersions returns the list of compatible storage versions. +func GetCompatibleVersions() []string { + return compatibleVersions +} diff --git a/banyand/queue/pub/chunked_sync.go b/banyand/queue/pub/chunked_sync.go index 8e6cb679..35afb070 100644 --- a/banyand/queue/pub/chunked_sync.go +++ b/banyand/queue/pub/chunked_sync.go @@ -28,7 +28,9 @@ import ( "google.golang.org/grpc" + apiversion "github.com/apache/skywalking-banyandb/api/proto/banyandb" clusterv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/cluster/v1" + "github.com/apache/skywalking-banyandb/banyand/internal/storage" "github.com/apache/skywalking-banyandb/banyand/queue" "github.com/apache/skywalking-banyandb/pkg/fs" "github.com/apache/skywalking-banyandb/pkg/logger" @@ -287,6 +289,11 @@ func (c *chunkedSyncClient) streamPartsAsChunks( TotalChunks: totalChunks, }, }, + VersionInfo: &clusterv1.VersionInfo{ + ApiVersion: apiversion.Version, + FileFormatVersion: storage.GetCurrentVersion(), + CompatibleFileFormatVersion: storage.GetCompatibleVersions(), + }, } if err := stream.Send(completionReq); err != nil { @@ -319,6 +326,11 @@ func (c *chunkedSyncClient) sendChunk( ChunkData: chunkData, ChunkChecksum: chunkChecksum, PartsInfo: partsInfo, + VersionInfo: &clusterv1.VersionInfo{ + ApiVersion: apiversion.Version, + FileFormatVersion: storage.GetCurrentVersion(), + CompatibleFileFormatVersion: storage.GetCompatibleVersions(), + }, } if isFirstChunk { diff --git a/banyand/queue/pub/pub.go b/banyand/queue/pub/pub.go index 7a40046e..74fa2432 100644 --- a/banyand/queue/pub/pub.go +++ b/banyand/queue/pub/pub.go @@ -35,9 +35,11 @@ import ( "github.com/apache/skywalking-banyandb/api/common" "github.com/apache/skywalking-banyandb/api/data" + apiversion "github.com/apache/skywalking-banyandb/api/proto/banyandb" clusterv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/cluster/v1" databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + "github.com/apache/skywalking-banyandb/banyand/internal/storage" "github.com/apache/skywalking-banyandb/banyand/metadata" "github.com/apache/skywalking-banyandb/banyand/metadata/schema" "github.com/apache/skywalking-banyandb/banyand/queue" @@ -336,6 +338,11 @@ func messageToRequest(topic bus.Topic, m bus.Message) (*clusterv1.SendRequest, e Topic: topic.String(), MessageId: uint64(m.ID()), BatchMod: m.BatchModeEnabled(), + VersionInfo: &clusterv1.VersionInfo{ + ApiVersion: apiversion.Version, + FileFormatVersion: storage.GetCurrentVersion(), + CompatibleFileFormatVersion: storage.GetCompatibleVersions(), + }, } switch data := m.Data().(type) { diff --git a/banyand/queue/sub/chunked_sync.go b/banyand/queue/sub/chunked_sync.go index c87191e6..2db9c4ee 100644 --- a/banyand/queue/sub/chunked_sync.go +++ b/banyand/queue/sub/chunked_sync.go @@ -25,11 +25,75 @@ import ( "time" "github.com/apache/skywalking-banyandb/api/data" + apiversion "github.com/apache/skywalking-banyandb/api/proto/banyandb" clusterv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/cluster/v1" + "github.com/apache/skywalking-banyandb/banyand/internal/storage" "github.com/apache/skywalking-banyandb/banyand/queue" "github.com/apache/skywalking-banyandb/pkg/bus" ) +func checkSyncVersionCompatibility(versionInfo *clusterv1.VersionInfo) (*clusterv1.VersionCompatibility, clusterv1.SyncStatus) { + if versionInfo == nil { + return &clusterv1.VersionCompatibility{ + Supported: true, + ServerApiVersion: apiversion.Version, + SupportedApiVersions: []string{apiversion.Version}, + ServerFileFormatVersion: storage.GetCurrentVersion(), + SupportedFileFormatVersions: storage.GetCompatibleVersions(), + Reason: "No version info provided, assuming compatible", + }, clusterv1.SyncStatus_SYNC_STATUS_CHUNK_RECEIVED + } + + serverAPIVersion := apiversion.Version + serverFileFormatVersion := storage.GetCurrentVersion() + compatibleFileFormatVersions := storage.GetCompatibleVersions() + + // Check API version compatibility + apiCompatible := versionInfo.ApiVersion == serverAPIVersion + + // Check file format version compatibility + fileFormatCompatible := false + if versionInfo.FileFormatVersion == serverFileFormatVersion { + fileFormatCompatible = true + } else { + // Check if client's file format version is in our compatible list + for _, compatVer := range compatibleFileFormatVersions { + if compatVer == versionInfo.FileFormatVersion { + fileFormatCompatible = true + break + } + } + } + + versionCompatibility := &clusterv1.VersionCompatibility{ + ServerApiVersion: serverAPIVersion, + SupportedApiVersions: []string{serverAPIVersion}, + ServerFileFormatVersion: serverFileFormatVersion, + SupportedFileFormatVersions: compatibleFileFormatVersions, + } + + switch { + case !apiCompatible && !fileFormatCompatible: + versionCompatibility.Supported = false + versionCompatibility.Reason = fmt.Sprintf("API version %s not supported (server: %s) and file format version %s not compatible (server: %s, supported: %v)", + versionInfo.ApiVersion, serverAPIVersion, versionInfo.FileFormatVersion, serverFileFormatVersion, compatibleFileFormatVersions) + return versionCompatibility, clusterv1.SyncStatus_SYNC_STATUS_VERSION_UNSUPPORTED + case !apiCompatible: + versionCompatibility.Supported = false + versionCompatibility.Reason = fmt.Sprintf("API version %s not supported (server: %s)", versionInfo.ApiVersion, serverAPIVersion) + return versionCompatibility, clusterv1.SyncStatus_SYNC_STATUS_VERSION_UNSUPPORTED + case !fileFormatCompatible: + versionCompatibility.Supported = false + versionCompatibility.Reason = fmt.Sprintf("File format version %s not compatible (server: %s, supported: %v)", + versionInfo.FileFormatVersion, serverFileFormatVersion, compatibleFileFormatVersions) + return versionCompatibility, clusterv1.SyncStatus_SYNC_STATUS_FORMAT_VERSION_MISMATCH + } + + versionCompatibility.Supported = true + versionCompatibility.Reason = "Client version compatible with server" + return versionCompatibility, clusterv1.SyncStatus_SYNC_STATUS_CHUNK_RECEIVED +} + type chunkBuffer struct { chunks map[uint32]*clusterv1.SyncPartRequest lastActivity time.Time @@ -122,6 +186,21 @@ func (s *server) SyncPart(stream clusterv1.ChunkedSyncService_SyncPartServer) er } func (s *server) processChunk(stream clusterv1.ChunkedSyncService_SyncPartServer, session *syncSession, req *clusterv1.SyncPartRequest) error { + // Check version compatibility on every chunk + if req.VersionInfo != nil { + versionCompatibility, status := checkSyncVersionCompatibility(req.VersionInfo) + if status != clusterv1.SyncStatus_SYNC_STATUS_CHUNK_RECEIVED { + s.log.Warn(). + Str("session_id", req.SessionId). + Str("client_api_version", req.VersionInfo.ApiVersion). + Str("client_file_format_version", req.VersionInfo.FileFormatVersion). + Str("reason", versionCompatibility.Reason). + Msg("sync version compatibility check failed") + + return s.sendResponse(stream, req, status, versionCompatibility.Reason, versionCompatibility) + } + } + if !s.enableChunkReordering { return s.processChunkSequential(stream, session, req) } @@ -444,14 +523,20 @@ func (s *server) sendResponse( req *clusterv1.SyncPartRequest, status clusterv1.SyncStatus, errorMsg string, - syncResult *clusterv1.SyncResult, + result interface{}, ) error { resp := &clusterv1.SyncPartResponse{ SessionId: req.SessionId, ChunkIndex: req.ChunkIndex, Status: status, Error: errorMsg, - SyncResult: syncResult, + } + + switch r := result.(type) { + case *clusterv1.SyncResult: + resp.SyncResult = r + case *clusterv1.VersionCompatibility: + resp.VersionCompatibility = r } return stream.Send(resp) diff --git a/banyand/queue/sub/sub.go b/banyand/queue/sub/sub.go index 1e7023ce..479ab5bc 100644 --- a/banyand/queue/sub/sub.go +++ b/banyand/queue/sub/sub.go @@ -28,11 +28,76 @@ import ( "github.com/apache/skywalking-banyandb/api/common" "github.com/apache/skywalking-banyandb/api/data" + apiversion "github.com/apache/skywalking-banyandb/api/proto/banyandb" clusterv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/cluster/v1" + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + "github.com/apache/skywalking-banyandb/banyand/internal/storage" "github.com/apache/skywalking-banyandb/pkg/bus" "github.com/apache/skywalking-banyandb/pkg/logger" ) +func checkVersionCompatibility(versionInfo *clusterv1.VersionInfo) (*clusterv1.VersionCompatibility, modelv1.Status) { + if versionInfo == nil { + return &clusterv1.VersionCompatibility{ + Supported: true, + ServerApiVersion: apiversion.Version, + SupportedApiVersions: []string{apiversion.Version}, + ServerFileFormatVersion: storage.GetCurrentVersion(), + SupportedFileFormatVersions: storage.GetCompatibleVersions(), + Reason: "No version info provided, assuming compatible", + }, modelv1.Status_STATUS_SUCCEED + } + + serverAPIVersion := apiversion.Version + serverFileFormatVersion := storage.GetCurrentVersion() + compatibleFileFormatVersions := storage.GetCompatibleVersions() + + // Check API version compatibility + apiCompatible := versionInfo.ApiVersion == serverAPIVersion + + // Check file format version compatibility + fileFormatCompatible := false + if versionInfo.FileFormatVersion == serverFileFormatVersion { + fileFormatCompatible = true + } else { + // Check if client's file format version is in our compatible list + for _, compatVer := range compatibleFileFormatVersions { + if compatVer == versionInfo.FileFormatVersion { + fileFormatCompatible = true + break + } + } + } + + versionCompatibility := &clusterv1.VersionCompatibility{ + ServerApiVersion: serverAPIVersion, + SupportedApiVersions: []string{serverAPIVersion}, + ServerFileFormatVersion: serverFileFormatVersion, + SupportedFileFormatVersions: compatibleFileFormatVersions, + } + + switch { + case !apiCompatible && !fileFormatCompatible: + versionCompatibility.Supported = false + versionCompatibility.Reason = fmt.Sprintf("API version %s not supported (server: %s) and file format version %s not compatible (server: %s, supported: %v)", + versionInfo.ApiVersion, serverAPIVersion, versionInfo.FileFormatVersion, serverFileFormatVersion, compatibleFileFormatVersions) + return versionCompatibility, modelv1.Status_STATUS_VERSION_UNSUPPORTED + case !apiCompatible: + versionCompatibility.Supported = false + versionCompatibility.Reason = fmt.Sprintf("API version %s not supported (server: %s)", versionInfo.ApiVersion, serverAPIVersion) + return versionCompatibility, modelv1.Status_STATUS_VERSION_UNSUPPORTED + case !fileFormatCompatible: + versionCompatibility.Supported = false + versionCompatibility.Reason = fmt.Sprintf("File format version %s not compatible (server: %s, supported: %v)", + versionInfo.FileFormatVersion, serverFileFormatVersion, compatibleFileFormatVersions) + return versionCompatibility, modelv1.Status_STATUS_VERSION_UNSUPPORTED + } + + versionCompatibility.Supported = true + versionCompatibility.Reason = "Client version compatible with server" + return versionCompatibility, modelv1.Status_STATUS_SUCCEED +} + func (s *server) Send(stream clusterv1.Service_SendServer) error { ctx := stream.Context() var topic *bus.Topic @@ -59,6 +124,29 @@ func (s *server) Send(stream clusterv1.Service_SendServer) error { if err != nil { return s.handleRecvError(err) } + + // Check version compatibility on first message received + if writeEntity.VersionInfo != nil { + versionCompatibility, status := checkVersionCompatibility(writeEntity.VersionInfo) + if status != modelv1.Status_STATUS_SUCCEED { + s.log.Warn(). + Str("client_api_version", writeEntity.VersionInfo.ApiVersion). + Str("client_file_format_version", writeEntity.VersionInfo.FileFormatVersion). + Str("reason", versionCompatibility.Reason). + Msg("version compatibility check failed") + + if errSend := stream.Send(&clusterv1.SendResponse{ + MessageId: writeEntity.MessageId, + Status: status, + Error: versionCompatibility.Reason, + VersionCompatibility: versionCompatibility, + }); errSend != nil { + s.log.Error().Err(errSend).Msg("failed to send version incompatibility response") + } + return fmt.Errorf("version incompatibility: %s", versionCompatibility.Reason) + } + } + s.metrics.totalMsgReceived.Inc(1, writeEntity.Topic) if writeEntity.Topic != "" && topic == nil { t, ok := data.TopicMap[writeEntity.Topic] diff --git a/banyand/queue/sub/version_compatibility_test.go b/banyand/queue/sub/version_compatibility_test.go new file mode 100644 index 00000000..9fc515da --- /dev/null +++ b/banyand/queue/sub/version_compatibility_test.go @@ -0,0 +1,337 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package sub + +import ( + "testing" + + apiversion "github.com/apache/skywalking-banyandb/api/proto/banyandb" + clusterv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/cluster/v1" + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + "github.com/apache/skywalking-banyandb/banyand/internal/storage" +) + +func TestCheckVersionCompatibility(t *testing.T) { + serverAPIVersion := apiversion.Version + serverFileFormatVersion := storage.GetCurrentVersion() + compatibleVersions := storage.GetCompatibleVersions() + + tests := []struct { + versionInfo *clusterv1.VersionInfo + name string + expectReason string + expectedStatus modelv1.Status + expectSupported bool + }{ + { + name: "nil version info - should be compatible", + versionInfo: nil, + expectedStatus: modelv1.Status_STATUS_SUCCEED, + expectSupported: true, + expectReason: "No version info provided, assuming compatible", + }, + { + name: "matching versions - should be compatible", + versionInfo: &clusterv1.VersionInfo{ + ApiVersion: serverAPIVersion, + FileFormatVersion: serverFileFormatVersion, + }, + expectedStatus: modelv1.Status_STATUS_SUCCEED, + expectSupported: true, + expectReason: "Client version compatible with server", + }, + { + name: "mismatched API version - should be incompatible", + versionInfo: &clusterv1.VersionInfo{ + ApiVersion: "0.8", // Different from server version + FileFormatVersion: serverFileFormatVersion, + }, + expectedStatus: modelv1.Status_STATUS_VERSION_UNSUPPORTED, + expectSupported: false, + expectReason: "API version 0.8 not supported", + }, + { + name: "incompatible file format version - should be incompatible", + versionInfo: &clusterv1.VersionInfo{ + ApiVersion: serverAPIVersion, + FileFormatVersion: "999.0.0", // Non-existent version + }, + expectedStatus: modelv1.Status_STATUS_VERSION_UNSUPPORTED, + expectSupported: false, + expectReason: "File format version 999.0.0 not compatible", + }, + { + name: "both API and file format incompatible - should be incompatible", + versionInfo: &clusterv1.VersionInfo{ + ApiVersion: "0.8", + FileFormatVersion: "999.0.0", + }, + expectedStatus: modelv1.Status_STATUS_VERSION_UNSUPPORTED, + expectSupported: false, + expectReason: "API version 0.8 not supported", + }, + { + name: "compatible file format from compatible list", + versionInfo: &clusterv1.VersionInfo{ + ApiVersion: serverAPIVersion, + FileFormatVersion: getFirstCompatibleVersion(compatibleVersions, serverFileFormatVersion), + }, + expectedStatus: modelv1.Status_STATUS_SUCCEED, + expectSupported: true, + expectReason: "Client version compatible with server", + }, + { + name: "empty version strings - should be incompatible", + versionInfo: &clusterv1.VersionInfo{ + ApiVersion: "", + FileFormatVersion: "", + }, + expectedStatus: modelv1.Status_STATUS_VERSION_UNSUPPORTED, + expectSupported: false, + expectReason: "API version not supported", + }, + { + name: "with compatible file format versions array", + versionInfo: &clusterv1.VersionInfo{ + ApiVersion: serverAPIVersion, + FileFormatVersion: serverFileFormatVersion, + CompatibleFileFormatVersion: []string{"1.2.0", "1.1.0"}, + }, + expectedStatus: modelv1.Status_STATUS_SUCCEED, + expectSupported: true, + expectReason: "Client version compatible with server", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + compatibility, status := checkVersionCompatibility(tt.versionInfo) + + // Check status + if status != tt.expectedStatus { + t.Errorf("expected status %v, got %v", tt.expectedStatus, status) + } + + // Check supported flag + if compatibility.Supported != tt.expectSupported { + t.Errorf("expected supported %v, got %v", tt.expectSupported, compatibility.Supported) + } + + // Check reason contains expected content + if len(tt.expectReason) > 0 && len(compatibility.Reason) > 0 { + if !contains(compatibility.Reason, tt.expectReason) { + t.Errorf("expected reason to contain '%s', got '%s'", tt.expectReason, compatibility.Reason) + } + } + + // Check server version info is always populated + if compatibility.ServerApiVersion != serverAPIVersion { + t.Errorf("expected server API version %s, got %s", serverAPIVersion, compatibility.ServerApiVersion) + } + + if compatibility.ServerFileFormatVersion != serverFileFormatVersion { + t.Errorf("expected server file format version %s, got %s", serverFileFormatVersion, compatibility.ServerFileFormatVersion) + } + + // Check supported versions are populated + if len(compatibility.SupportedApiVersions) == 0 { + t.Error("expected supported API versions to be populated") + } + + if len(compatibility.SupportedFileFormatVersions) == 0 { + t.Error("expected supported file format versions to be populated") + } + }) + } +} + +func TestCheckSyncVersionCompatibility(t *testing.T) { + serverAPIVersion := apiversion.Version + serverFileFormatVersion := storage.GetCurrentVersion() + compatibleVersions := storage.GetCompatibleVersions() + + tests := []struct { + versionInfo *clusterv1.VersionInfo + name string + expectReason string + expectedStatus clusterv1.SyncStatus + expectSupported bool + }{ + { + name: "nil version info - should be compatible", + versionInfo: nil, + expectedStatus: clusterv1.SyncStatus_SYNC_STATUS_CHUNK_RECEIVED, + expectSupported: true, + expectReason: "No version info provided, assuming compatible", + }, + { + name: "matching versions - should be compatible", + versionInfo: &clusterv1.VersionInfo{ + ApiVersion: serverAPIVersion, + FileFormatVersion: serverFileFormatVersion, + }, + expectedStatus: clusterv1.SyncStatus_SYNC_STATUS_CHUNK_RECEIVED, + expectSupported: true, + expectReason: "Client version compatible with server", + }, + { + name: "mismatched API version - should be version unsupported", + versionInfo: &clusterv1.VersionInfo{ + ApiVersion: "0.8", + FileFormatVersion: serverFileFormatVersion, + }, + expectedStatus: clusterv1.SyncStatus_SYNC_STATUS_VERSION_UNSUPPORTED, + expectSupported: false, + expectReason: "API version 0.8 not supported", + }, + { + name: "incompatible file format - should be format version mismatch", + versionInfo: &clusterv1.VersionInfo{ + ApiVersion: serverAPIVersion, + FileFormatVersion: "999.0.0", + }, + expectedStatus: clusterv1.SyncStatus_SYNC_STATUS_FORMAT_VERSION_MISMATCH, + expectSupported: false, + expectReason: "File format version 999.0.0 not compatible", + }, + { + name: "both incompatible - should prioritize API version error", + versionInfo: &clusterv1.VersionInfo{ + ApiVersion: "0.8", + FileFormatVersion: "999.0.0", + }, + expectedStatus: clusterv1.SyncStatus_SYNC_STATUS_VERSION_UNSUPPORTED, + expectSupported: false, + expectReason: "API version 0.8 not supported", + }, + { + name: "compatible file format from list", + versionInfo: &clusterv1.VersionInfo{ + ApiVersion: serverAPIVersion, + FileFormatVersion: getFirstCompatibleVersion(compatibleVersions, serverFileFormatVersion), + }, + expectedStatus: clusterv1.SyncStatus_SYNC_STATUS_CHUNK_RECEIVED, + expectSupported: true, + expectReason: "Client version compatible with server", + }, + { + name: "empty version strings - should be incompatible", + versionInfo: &clusterv1.VersionInfo{ + ApiVersion: "", + FileFormatVersion: "", + }, + expectedStatus: clusterv1.SyncStatus_SYNC_STATUS_VERSION_UNSUPPORTED, + expectSupported: false, + expectReason: "API version not supported", + }, + { + name: "with compatible file format versions array", + versionInfo: &clusterv1.VersionInfo{ + ApiVersion: serverAPIVersion, + FileFormatVersion: serverFileFormatVersion, + CompatibleFileFormatVersion: []string{"1.2.0", "1.1.0"}, + }, + expectedStatus: clusterv1.SyncStatus_SYNC_STATUS_CHUNK_RECEIVED, + expectSupported: true, + expectReason: "Client version compatible with server", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + compatibility, status := checkSyncVersionCompatibility(tt.versionInfo) + + // Check status + if status != tt.expectedStatus { + t.Errorf("expected status %v, got %v", tt.expectedStatus, status) + } + + // Check supported flag + if compatibility.Supported != tt.expectSupported { + t.Errorf("expected supported %v, got %v", tt.expectSupported, compatibility.Supported) + } + + // Check reason contains expected content + if len(tt.expectReason) > 0 && len(compatibility.Reason) > 0 { + if !contains(compatibility.Reason, tt.expectReason) { + t.Errorf("expected reason to contain '%s', got '%s'", tt.expectReason, compatibility.Reason) + } + } + + // Check server version info is always populated + if compatibility.ServerApiVersion != serverAPIVersion { + t.Errorf("expected server API version %s, got %s", serverAPIVersion, compatibility.ServerApiVersion) + } + + if compatibility.ServerFileFormatVersion != serverFileFormatVersion { + t.Errorf("expected server file format version %s, got %s", serverFileFormatVersion, compatibility.ServerFileFormatVersion) + } + + // Check supported versions are populated + if len(compatibility.SupportedApiVersions) == 0 { + t.Error("expected supported API versions to be populated") + } + + if len(compatibility.SupportedFileFormatVersions) == 0 { + t.Error("expected supported file format versions to be populated") + } + }) + } +} + +// Helper function to get first compatible version that differs from current. +func getFirstCompatibleVersion(compatibleVersions []string, currentVersion string) string { + for _, version := range compatibleVersions { + if version != currentVersion { + return version + } + } + // If no different version found, return current version (still compatible) + return currentVersion +} + +// Helper function to check if a string contains a substring. +func contains(str, substr string) bool { + return len(str) >= len(substr) && (str == substr || + (len(str) > len(substr) && + (findInString(str, substr) != -1))) +} + +// Simple string search helper. +func findInString(str, substr string) int { + if len(substr) == 0 { + return 0 + } + if len(str) < len(substr) { + return -1 + } + + for i := 0; i <= len(str)-len(substr); i++ { + match := true + for j := 0; j < len(substr); j++ { + if str[i+j] != substr[j] { + match = false + break + } + } + if match { + return i + } + } + return -1 +} diff --git a/docs/api-reference.md b/docs/api-reference.md index 67a5e8f9..5237d4f8 100644 --- a/docs/api-reference.md +++ b/docs/api-reference.md @@ -19,6 +19,8 @@ - [SyncPartRequest](#banyandb-cluster-v1-SyncPartRequest) - [SyncPartResponse](#banyandb-cluster-v1-SyncPartResponse) - [SyncResult](#banyandb-cluster-v1-SyncResult) + - [VersionCompatibility](#banyandb-cluster-v1-VersionCompatibility) + - [VersionInfo](#banyandb-cluster-v1-VersionInfo) - [SyncStatus](#banyandb-cluster-v1-SyncStatus) @@ -349,6 +351,8 @@ Status is the response status for write | STATUS_EXPIRED_SCHEMA | 4 | | | STATUS_INTERNAL_ERROR | 5 | | | STATUS_DISK_FULL | 6 | | +| STATUS_VERSION_UNSUPPORTED | 7 | Client version not supported | +| STATUS_VERSION_DEPRECATED | 8 | Client version deprecated but still supported | @@ -466,6 +470,7 @@ PartResult contains the result for individual parts. | message_id | [uint64](#uint64) | | | | body | [bytes](#bytes) | | | | batch_mod | [bool](#bool) | | | +| version_info | [VersionInfo](#banyandb-cluster-v1-VersionInfo) | | version_info contains version information | @@ -484,6 +489,7 @@ PartResult contains the result for individual parts. | error | [string](#string) | | | | body | [bytes](#bytes) | | | | status | [banyandb.model.v1.Status](#banyandb-model-v1-Status) | | | +| version_compatibility | [VersionCompatibility](#banyandb-cluster-v1-VersionCompatibility) | | version_compatibility contains version compatibility information when status indicates version issues | @@ -541,6 +547,7 @@ Chunked Sync Service Messages. | parts_info | [PartInfo](#banyandb-cluster-v1-PartInfo) | repeated | Information about parts contained in this chunk. | | metadata | [SyncMetadata](#banyandb-cluster-v1-SyncMetadata) | | Sent with first chunk (chunk_index = 0). | | completion | [SyncCompletion](#banyandb-cluster-v1-SyncCompletion) | | Sent with last chunk to finalize. | +| version_info | [VersionInfo](#banyandb-cluster-v1-VersionInfo) | | version_info contains version information | @@ -560,6 +567,7 @@ SyncPartResponse contains the response for a sync part request. | status | [SyncStatus](#banyandb-cluster-v1-SyncStatus) | | | | error | [string](#string) | | | | sync_result | [SyncResult](#banyandb-cluster-v1-SyncResult) | | Final result when sync completes. | +| version_compatibility | [VersionCompatibility](#banyandb-cluster-v1-VersionCompatibility) | | version_compatibility contains version compatibility information when status indicates version issues | @@ -585,6 +593,43 @@ SyncResult contains the result of a sync operation. + +<a name="banyandb-cluster-v1-VersionCompatibility"></a> + +### VersionCompatibility + + + +| Field | Type | Label | Description | +| ----- | ---- | ----- | ----------- | +| supported | [bool](#bool) | | supported indicates whether the client version is supported | +| server_api_version | [string](#string) | | server_api_version is the API version of the server | +| supported_api_versions | [string](#string) | repeated | supported_api_versions lists API versions supported by the server | +| server_file_format_version | [string](#string) | | server_file_format_version is the file format version of the server | +| supported_file_format_versions | [string](#string) | repeated | supported_file_format_versions lists file format versions supported by the server | +| reason | [string](#string) | | reason provides human-readable explanation of version incompatibility | + + + + + + +<a name="banyandb-cluster-v1-VersionInfo"></a> + +### VersionInfo + + + +| Field | Type | Label | Description | +| ----- | ---- | ----- | ----------- | +| file_format_version | [string](#string) | | file_format_version indicates the file format version used | +| compatible_file_format_version | [string](#string) | repeated | compatible_file_format_version lists backward compatible versions | +| api_version | [string](#string) | | api_version indicates the API semantic version | + + + + + @@ -601,6 +646,8 @@ SyncStatus represents the status of a sync operation. | SYNC_STATUS_CHUNK_OUT_OF_ORDER | 3 | Chunk received out of expected order. | | SYNC_STATUS_SESSION_NOT_FOUND | 4 | Session ID not recognized. | | SYNC_STATUS_SYNC_COMPLETE | 5 | Entire sync operation completed successfully. | +| SYNC_STATUS_VERSION_UNSUPPORTED | 6 | Version not supported for sync operations. | +| SYNC_STATUS_FORMAT_VERSION_MISMATCH | 7 | File format version incompatible. |