This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch liaison-lb
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 9b09ef1535cf31f8b4fe55cbca04a480e2d19ef8
Merge: 556635b7 bc60047d
Author: Gao Hongtao <hanahm...@gmail.com>
AuthorDate: Wed Jun 18 09:02:58 2025 +0800

    Merge remote-tracking branch 'origin/main' into liaison-lb

 CHANGES.md                               |   1 +
 api/proto/banyandb/property/v1/rpc.proto |   3 +
 banyand/liaison/grpc/property.go         | 261 ++++++++++++++++--------
 banyand/liaison/grpc/server.go           |   9 +-
 banyand/property/db.go                   |  13 +-
 banyand/property/listener.go             |  12 +-
 banyand/property/property.go             |  19 +-
 banyand/property/shard.go                | 105 ++++++++--
 bydbctl/internal/cmd/property_test.go    | 327 ++++++++++++++++++++++---------
 docs/api-reference.md                    |   1 +
 pkg/convert/number.go                    |  16 ++
 pkg/convert/number_test.go               |  19 ++
 pkg/index/index.go                       |   4 +-
 pkg/index/inverted/inverted.go           |   4 +
 pkg/index/inverted/inverted_series.go    |   9 +
 pkg/test/setup/setup.go                  |  47 +++--
 ui/README.md                             |   2 +-
 17 files changed, 627 insertions(+), 225 deletions(-)

diff --cc CHANGES.md
index 3a25d826,43cc1e2b..582827ea
--- a/CHANGES.md
+++ b/CHANGES.md
@@@ -14,7 -14,7 +14,8 @@@ Release Notes
  - Replica: Support configurable replica count on Group.
  - Replica: Move the TopN pre-calculation flow from the Data Node to the 
Liaison Node.
  - Add a wait and retry to write handlers to avoid the local metadata cache 
being loaded.
+ - Implement versioning properties and replace physical deletion with the 
tombstone mechanism for the property database.
 +- Add Load Balancer Feature to Liaison. 
  
  ### Bug Fixes
  
diff --cc banyand/liaison/grpc/server.go
index bd7fc591,b12d1839..dadf6ad6
--- a/banyand/liaison/grpc/server.go
+++ b/banyand/liaison/grpc/server.go
@@@ -171,9 -147,10 +171,10 @@@ func NewServer(_ context.Context, tir1C
                        schemaRegistry: schemaRegistry,
                },
                propertyServer: &propertyServer{
-                       schemaRegistry: schemaRegistry,
-                       pipeline:       tir2Client,
-                       nodeRegistry:   nr.PropertyNodeRegistry,
+                       schemaRegistry:   schemaRegistry,
 -                      pipeline:         pipeline,
++                      pipeline:         tir2Client,
+                       nodeRegistry:     nr.PropertyNodeRegistry,
 -                      discoveryService: 
newDiscoveryService(schema.KindProperty, schemaRegistry, 
nr.MeasureNodeRegistry),
++                      discoveryService: 
newDiscoveryService(schema.KindProperty, schemaRegistry, 
nr.PropertyNodeRegistry, gr),
                },
                propertyRegistryServer: &propertyRegistryServer{
                        schemaRegistry: schemaRegistry,
@@@ -188,15 -163,14 +189,17 @@@
  
  func (s *server) PreRun(_ context.Context) error {
        s.log = logger.GetLogger("liaison-grpc")
 -      s.streamSVC.setLogger(s.log)
 +      s.streamSVC.setLogger(s.log.Named("stream-t1"))
 +      s.streamCallback.l = s.log.Named("stream-t2")
        s.measureSVC.setLogger(s.log)
+       s.propertyServer.SetLogger(s.log)
 +      s.measureCallback.l = s.log.Named("measure-t2")
        components := []*discoveryService{
                s.streamSVC.discoveryService,
                s.measureSVC.discoveryService,
+               s.propertyServer.discoveryService,
        }
 +      s.schemaRepo.RegisterHandler("liaison", schema.KindGroup, s.groupRepo)
        for _, c := range components {
                c.SetLogger(s.log)
                if err := c.initialize(); err != nil {
diff --cc pkg/test/setup/setup.go
index 3cfc128f,6cd6b7ba..d61a40d3
--- a/pkg/test/setup/setup.go
+++ b/pkg/test/setup/setup.go
@@@ -258,28 -265,32 +265,34 @@@ func DataNodeWithAddrAndDir(etcdEndpoin
  }
  
  // LiaisonNode runs a liaison node.
- func LiaisonNode(etcdEndpoint string, flags ...string) (string, func()) {
+ func LiaisonNode(etcdEndpoint string, flags ...string) (grpcAddr string, 
closeFn func()) {
+       grpcAddr, _, closeFn = LiaisonNodeWithHTTP(etcdEndpoint, flags...)
+       return
+ }
+ 
+ // LiaisonNodeWithHTTP runs a liaison node with HTTP enabled and returns the 
gRPC and HTTP addresses.
+ func LiaisonNodeWithHTTP(etcdEndpoint string, flags ...string) (grpcAddr, 
httpAddr string, closeFn func()) {
 -      ports, err := test.AllocateFreePorts(2)
 +      ports, err := test.AllocateFreePorts(3)
        gomega.Expect(err).NotTo(gomega.HaveOccurred())
-       addr := fmt.Sprintf("%s:%d", host, ports[0])
-       httpAddr := fmt.Sprintf("%s:%d", host, ports[1])
+       grpcAddr = fmt.Sprintf("%s:%d", host, ports[0])
+       httpAddr = fmt.Sprintf("%s:%d", host, ports[1])
        nodeHost := "127.0.0.1"
        flags = append(flags, "liaison",
                "--grpc-host="+host,
                fmt.Sprintf("--grpc-port=%d", ports[0]),
                "--http-host="+host,
                fmt.Sprintf("--http-port=%d", ports[1]),
 +              "--liaison-server-grpc-host="+host,
 +              fmt.Sprintf("--liaison-server-grpc-port=%d", ports[2]),
-               "--http-grpc-addr="+addr,
+               "--http-grpc-addr="+grpcAddr,
                "--etcd-endpoints", etcdEndpoint,
                "--node-host-provider", "flag",
                "--node-host", nodeHost,
        )
-       closeFn := CMD(flags...)
+       closeFn = CMD(flags...)
        gomega.Eventually(helpers.HTTPHealthCheck(httpAddr, ""), 
testflags.EventuallyTimeout).Should(gomega.Succeed())
        gomega.Eventually(func() (map[string]*databasev1.Node, error) {
 -              return helpers.ListKeys(etcdEndpoint, 
fmt.Sprintf("/%s/nodes/%s:%d", metadata.DefaultNamespace, nodeHost, ports[0]))
 +              return helpers.ListKeys(etcdEndpoint, 
fmt.Sprintf("/%s/nodes/%s:%d", metadata.DefaultNamespace, nodeHost, ports[2]))
        }, testflags.EventuallyTimeout).Should(gomega.HaveLen(1))
-       return addr, closeFn
+       return
  }

Reply via email to