This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch liaison-lb in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 9b09ef1535cf31f8b4fe55cbca04a480e2d19ef8 Merge: 556635b7 bc60047d Author: Gao Hongtao <hanahm...@gmail.com> AuthorDate: Wed Jun 18 09:02:58 2025 +0800 Merge remote-tracking branch 'origin/main' into liaison-lb CHANGES.md | 1 + api/proto/banyandb/property/v1/rpc.proto | 3 + banyand/liaison/grpc/property.go | 261 ++++++++++++++++-------- banyand/liaison/grpc/server.go | 9 +- banyand/property/db.go | 13 +- banyand/property/listener.go | 12 +- banyand/property/property.go | 19 +- banyand/property/shard.go | 105 ++++++++-- bydbctl/internal/cmd/property_test.go | 327 ++++++++++++++++++++++--------- docs/api-reference.md | 1 + pkg/convert/number.go | 16 ++ pkg/convert/number_test.go | 19 ++ pkg/index/index.go | 4 +- pkg/index/inverted/inverted.go | 4 + pkg/index/inverted/inverted_series.go | 9 + pkg/test/setup/setup.go | 47 +++-- ui/README.md | 2 +- 17 files changed, 627 insertions(+), 225 deletions(-) diff --cc CHANGES.md index 3a25d826,43cc1e2b..582827ea --- a/CHANGES.md +++ b/CHANGES.md @@@ -14,7 -14,7 +14,8 @@@ Release Notes - Replica: Support configurable replica count on Group. - Replica: Move the TopN pre-calculation flow from the Data Node to the Liaison Node. - Add a wait and retry to write handlers to avoid the local metadata cache being loaded. + - Implement versioning properties and replace physical deletion with the tombstone mechanism for the property database. +- Add Load Balancer Feature to Liaison. ### Bug Fixes diff --cc banyand/liaison/grpc/server.go index bd7fc591,b12d1839..dadf6ad6 --- a/banyand/liaison/grpc/server.go +++ b/banyand/liaison/grpc/server.go @@@ -171,9 -147,10 +171,10 @@@ func NewServer(_ context.Context, tir1C schemaRegistry: schemaRegistry, }, propertyServer: &propertyServer{ - schemaRegistry: schemaRegistry, - pipeline: tir2Client, - nodeRegistry: nr.PropertyNodeRegistry, + schemaRegistry: schemaRegistry, - pipeline: pipeline, ++ pipeline: tir2Client, + nodeRegistry: nr.PropertyNodeRegistry, - discoveryService: newDiscoveryService(schema.KindProperty, schemaRegistry, nr.MeasureNodeRegistry), ++ discoveryService: newDiscoveryService(schema.KindProperty, schemaRegistry, nr.PropertyNodeRegistry, gr), }, propertyRegistryServer: &propertyRegistryServer{ schemaRegistry: schemaRegistry, @@@ -188,15 -163,14 +189,17 @@@ func (s *server) PreRun(_ context.Context) error { s.log = logger.GetLogger("liaison-grpc") - s.streamSVC.setLogger(s.log) + s.streamSVC.setLogger(s.log.Named("stream-t1")) + s.streamCallback.l = s.log.Named("stream-t2") s.measureSVC.setLogger(s.log) + s.propertyServer.SetLogger(s.log) + s.measureCallback.l = s.log.Named("measure-t2") components := []*discoveryService{ s.streamSVC.discoveryService, s.measureSVC.discoveryService, + s.propertyServer.discoveryService, } + s.schemaRepo.RegisterHandler("liaison", schema.KindGroup, s.groupRepo) for _, c := range components { c.SetLogger(s.log) if err := c.initialize(); err != nil { diff --cc pkg/test/setup/setup.go index 3cfc128f,6cd6b7ba..d61a40d3 --- a/pkg/test/setup/setup.go +++ b/pkg/test/setup/setup.go @@@ -258,28 -265,32 +265,34 @@@ func DataNodeWithAddrAndDir(etcdEndpoin } // LiaisonNode runs a liaison node. - func LiaisonNode(etcdEndpoint string, flags ...string) (string, func()) { + func LiaisonNode(etcdEndpoint string, flags ...string) (grpcAddr string, closeFn func()) { + grpcAddr, _, closeFn = LiaisonNodeWithHTTP(etcdEndpoint, flags...) + return + } + + // LiaisonNodeWithHTTP runs a liaison node with HTTP enabled and returns the gRPC and HTTP addresses. + func LiaisonNodeWithHTTP(etcdEndpoint string, flags ...string) (grpcAddr, httpAddr string, closeFn func()) { - ports, err := test.AllocateFreePorts(2) + ports, err := test.AllocateFreePorts(3) gomega.Expect(err).NotTo(gomega.HaveOccurred()) - addr := fmt.Sprintf("%s:%d", host, ports[0]) - httpAddr := fmt.Sprintf("%s:%d", host, ports[1]) + grpcAddr = fmt.Sprintf("%s:%d", host, ports[0]) + httpAddr = fmt.Sprintf("%s:%d", host, ports[1]) nodeHost := "127.0.0.1" flags = append(flags, "liaison", "--grpc-host="+host, fmt.Sprintf("--grpc-port=%d", ports[0]), "--http-host="+host, fmt.Sprintf("--http-port=%d", ports[1]), + "--liaison-server-grpc-host="+host, + fmt.Sprintf("--liaison-server-grpc-port=%d", ports[2]), - "--http-grpc-addr="+addr, + "--http-grpc-addr="+grpcAddr, "--etcd-endpoints", etcdEndpoint, "--node-host-provider", "flag", "--node-host", nodeHost, ) - closeFn := CMD(flags...) + closeFn = CMD(flags...) gomega.Eventually(helpers.HTTPHealthCheck(httpAddr, ""), testflags.EventuallyTimeout).Should(gomega.Succeed()) gomega.Eventually(func() (map[string]*databasev1.Node, error) { - return helpers.ListKeys(etcdEndpoint, fmt.Sprintf("/%s/nodes/%s:%d", metadata.DefaultNamespace, nodeHost, ports[0])) + return helpers.ListKeys(etcdEndpoint, fmt.Sprintf("/%s/nodes/%s:%d", metadata.DefaultNamespace, nodeHost, ports[2])) }, testflags.EventuallyTimeout).Should(gomega.HaveLen(1)) - return addr, closeFn + return }