Copilot commented on code in PR #893:
URL: 
https://github.com/apache/skywalking-banyandb/pull/893#discussion_r2606681672


##########
test/integration/replication/replication_test.go:
##########
@@ -0,0 +1,137 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package replication_test
+
+import (
+       "context"
+       "time"
+
+       g "github.com/onsi/ginkgo/v2"
+       gm "github.com/onsi/gomega"
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/credentials/insecure"
+
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       "github.com/apache/skywalking-banyandb/banyand/metadata"
+       "github.com/apache/skywalking-banyandb/pkg/grpchelper"
+       "github.com/apache/skywalking-banyandb/pkg/test/flags"
+       "github.com/apache/skywalking-banyandb/pkg/test/helpers"
+       casesmeasuredata 
"github.com/apache/skywalking-banyandb/test/cases/measure/data"
+)
+
+var _ = g.Describe("Replication", func() {
+       var (
+               conn *grpc.ClientConn
+       )
+
+       g.BeforeEach(func() {
+               var err error
+               conn, err = grpchelper.Conn(liaisonAddr, 10*time.Second,
+                       
grpc.WithTransportCredentials(insecure.NewCredentials()))
+               gm.Expect(err).NotTo(gm.HaveOccurred())
+       })
+
+       g.AfterEach(func() {
+               if conn != nil {
+                       gm.Expect(conn.Close()).To(gm.Succeed())
+               }
+       })
+
+       g.Context("with replicated_group", func() {
+               g.It("should survive node failure", func() {
+                       g.By("Verifying the measure exists in replicated_group")
+                       ctx := context.Background()
+                       measureMetadata := &commonv1.Metadata{
+                               Name:  "service_traffic",
+                               Group: "replicated_group",
+                       }
+
+                       schemaClient := 
databasev1.NewMeasureRegistryServiceClient(conn)
+                       resp, err := schemaClient.Get(ctx, 
&databasev1.MeasureRegistryServiceGetRequest{Metadata: measureMetadata})
+                       gm.Expect(err).NotTo(gm.HaveOccurred())
+                       gm.Expect(resp.GetMeasure()).NotTo(gm.BeNil())
+                       
gm.Expect(resp.GetMeasure().GetMetadata().GetGroup()).To(gm.Equal("replicated_group"))
+
+                       g.By("Getting list of all nodes from etcd (includes 
data nodes + liaison)")
+                       nodePath := "/" + metadata.DefaultNamespace + "/nodes"
+                       allNodes, err := helpers.ListKeys(etcdEndpoint, 
nodePath)
+                       gm.Expect(err).NotTo(gm.HaveOccurred())
+
+                       // We have: 3 data nodes + 1 liaison node = 4 nodes 
total
+                       gm.Expect(len(allNodes)).To(gm.Equal(4),
+                               "Should have 4 nodes total (3 data nodes + 1 
liaison node), found %d", len(allNodes))
+
+                       g.By("Stopping one data node")
+                       // We should have 3 data node closers in dataNodeClosers
+                       // Stop the first one
+                       dataNodeClosers[0]()
+                       dataNodeClosers = dataNodeClosers[1:]

Review Comment:
   Mutating the package-level `dataNodeClosers` slice during test execution 
creates a side effect that affects test isolation. If tests were to run in 
parallel or be re-run, this mutation could cause unexpected behavior since the 
slice is permanently modified.
   
   Consider creating a local copy of the closers needed for this specific test, 
or better yet, redesign the approach to avoid modifying shared state. For 
example:
   ```go
   // Create a local copy
   closersToStop := make([]func(), len(dataNodeClosers))
   copy(closersToStop, dataNodeClosers)
   closersToStop[0]()
   ```
   ```suggestion
                        // Create a local copy to avoid mutating the 
package-level slice
                        closersToStop := make([]func(), len(dataNodeClosers))
                        copy(closersToStop, dataNodeClosers)
                        closersToStop[0]()
   ```



##########
test/integration/replication/replication_test.go:
##########
@@ -0,0 +1,137 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package replication_test
+
+import (
+       "context"
+       "time"
+
+       g "github.com/onsi/ginkgo/v2"
+       gm "github.com/onsi/gomega"
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/credentials/insecure"
+
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       "github.com/apache/skywalking-banyandb/banyand/metadata"
+       "github.com/apache/skywalking-banyandb/pkg/grpchelper"
+       "github.com/apache/skywalking-banyandb/pkg/test/flags"
+       "github.com/apache/skywalking-banyandb/pkg/test/helpers"
+       casesmeasuredata 
"github.com/apache/skywalking-banyandb/test/cases/measure/data"
+)
+
+var _ = g.Describe("Replication", func() {
+       var (
+               conn *grpc.ClientConn
+       )
+
+       g.BeforeEach(func() {
+               var err error
+               conn, err = grpchelper.Conn(liaisonAddr, 10*time.Second,
+                       
grpc.WithTransportCredentials(insecure.NewCredentials()))
+               gm.Expect(err).NotTo(gm.HaveOccurred())
+       })
+
+       g.AfterEach(func() {
+               if conn != nil {
+                       gm.Expect(conn.Close()).To(gm.Succeed())
+               }
+       })
+
+       g.Context("with replicated_group", func() {
+               g.It("should survive node failure", func() {
+                       g.By("Verifying the measure exists in replicated_group")
+                       ctx := context.Background()
+                       measureMetadata := &commonv1.Metadata{
+                               Name:  "service_traffic",
+                               Group: "replicated_group",
+                       }
+
+                       schemaClient := 
databasev1.NewMeasureRegistryServiceClient(conn)
+                       resp, err := schemaClient.Get(ctx, 
&databasev1.MeasureRegistryServiceGetRequest{Metadata: measureMetadata})
+                       gm.Expect(err).NotTo(gm.HaveOccurred())
+                       gm.Expect(resp.GetMeasure()).NotTo(gm.BeNil())
+                       
gm.Expect(resp.GetMeasure().GetMetadata().GetGroup()).To(gm.Equal("replicated_group"))
+
+                       g.By("Getting list of all nodes from etcd (includes 
data nodes + liaison)")
+                       nodePath := "/" + metadata.DefaultNamespace + "/nodes"
+                       allNodes, err := helpers.ListKeys(etcdEndpoint, 
nodePath)
+                       gm.Expect(err).NotTo(gm.HaveOccurred())
+
+                       // We have: 3 data nodes + 1 liaison node = 4 nodes 
total
+                       gm.Expect(len(allNodes)).To(gm.Equal(4),
+                               "Should have 4 nodes total (3 data nodes + 1 
liaison node), found %d", len(allNodes))
+
+                       g.By("Stopping one data node")
+                       // We should have 3 data node closers in dataNodeClosers
+                       // Stop the first one
+                       dataNodeClosers[0]()
+                       dataNodeClosers = dataNodeClosers[1:]

Review Comment:
   The `dataNodeClosers` variable is accessed in the test but is not properly 
shared across test processes. In Ginkgo's `SynchronizedBeforeSuite`, the first 
function runs once on process 1 and returns serialized data to other processes 
via the second function. Package-level variables set in the first function are 
not accessible in tests running in other processes.
   
   The `dataNodeClosers` slice should be serialized and passed through the 
suite configuration (like `liaisonAddr`, `etcdEndpoint`, and `now`), but since 
closers are functions that cannot be serialized, the test architecture needs to 
be reconsidered. 
   
   One solution is to avoid stopping nodes in parallel tests, or ensure this 
specific test doesn't run in parallel mode. Another approach is to store node 
information (like node IDs) instead of closers and implement a shutdown 
mechanism through the test framework.



##########
test/integration/replication/replication_suite_test.go:
##########
@@ -0,0 +1,181 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package replication_test
+
+import (
+       "context"
+       "encoding/json"
+       "fmt"
+       "testing"
+       "time"
+
+       . "github.com/onsi/ginkgo/v2"
+       . "github.com/onsi/gomega"
+       "github.com/onsi/gomega/gleak"
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/credentials/insecure"
+
+       "github.com/apache/skywalking-banyandb/banyand/metadata"
+       "github.com/apache/skywalking-banyandb/banyand/metadata/embeddedetcd"
+       "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+       "github.com/apache/skywalking-banyandb/pkg/grpchelper"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/pool"
+       "github.com/apache/skywalking-banyandb/pkg/test"
+       "github.com/apache/skywalking-banyandb/pkg/test/flags"
+       "github.com/apache/skywalking-banyandb/pkg/test/gmatcher"
+       "github.com/apache/skywalking-banyandb/pkg/test/helpers"
+       test_measure "github.com/apache/skywalking-banyandb/pkg/test/measure"
+       test_property "github.com/apache/skywalking-banyandb/pkg/test/property"
+       "github.com/apache/skywalking-banyandb/pkg/test/setup"
+       test_stream "github.com/apache/skywalking-banyandb/pkg/test/stream"
+       test_trace "github.com/apache/skywalking-banyandb/pkg/test/trace"
+       "github.com/apache/skywalking-banyandb/pkg/timestamp"
+       test_cases "github.com/apache/skywalking-banyandb/test/cases"
+       casesmeasure "github.com/apache/skywalking-banyandb/test/cases/measure"
+)
+
+func TestReplication(t *testing.T) {
+       RegisterFailHandler(Fail)
+       RunSpecs(t, "Replication Suite")
+}
+
+var (
+       deferFunc       func()
+       goods           []gleak.Goroutine
+       now             time.Time
+       connection      *grpc.ClientConn
+       liaisonAddr     string
+       etcdEndpoint    string
+       dataNodeClosers []func()
+)
+
+var _ = SynchronizedBeforeSuite(func() []byte {
+       Expect(logger.Init(logger.Logging{
+               Env:   "dev",
+               Level: flags.LogLevel,
+       })).To(Succeed())
+       pool.EnableStackTracking(true)
+       goods = gleak.Goroutines()
+
+       By("Starting etcd server")
+       ports, err := test.AllocateFreePorts(2)
+       Expect(err).NotTo(HaveOccurred())
+       dir, spaceDef, err := test.NewSpace()
+       Expect(err).NotTo(HaveOccurred())
+       ep := fmt.Sprintf("http://127.0.0.1:%d";, ports[0])
+       etcdEndpoint = ep
+
+       server, err := embeddedetcd.NewServer(
+               embeddedetcd.ConfigureListener([]string{ep}, 
[]string{fmt.Sprintf("http://127.0.0.1:%d";, ports[1])}),
+               embeddedetcd.RootDir(dir),
+               embeddedetcd.AutoCompactionMode("periodic"),
+               embeddedetcd.AutoCompactionRetention("1h"),
+               embeddedetcd.QuotaBackendBytes(2*1024*1024*1024),
+       )
+       Expect(err).ShouldNot(HaveOccurred())
+       <-server.ReadyNotify()
+
+       By("Loading schema")
+       schemaRegistry, err := schema.NewEtcdSchemaRegistry(
+               schema.Namespace(metadata.DefaultNamespace),
+               schema.ConfigureServerEndpoints([]string{ep}),
+       )
+       Expect(err).NotTo(HaveOccurred())
+       defer schemaRegistry.Close()
+
+       ctx := context.Background()
+       // Preload all schemas since test_cases.Initialize needs them
+       test_stream.PreloadSchema(ctx, schemaRegistry)
+       test_measure.PreloadSchema(ctx, schemaRegistry)
+       test_trace.PreloadSchema(ctx, schemaRegistry)
+       test_property.PreloadSchema(ctx, schemaRegistry)
+
+       By("Starting 3 data nodes for replication test")
+       dataNodeClosers = make([]func(), 0, 3)
+
+       for i := 0; i < 3; i++ {
+               closeDataNode := setup.DataNode(ep)
+               dataNodeClosers = append(dataNodeClosers, closeDataNode)
+       }
+
+       By("Starting liaison node")
+       liaisonAddr, closerLiaisonNode := setup.LiaisonNode(ep)
+
+       By("Initializing test cases")
+       ns := timestamp.NowMilli().UnixNano()
+       now = time.Unix(0, ns-ns%int64(time.Minute))
+
+       // Initialize test data - this loads data for all test types
+       test_cases.Initialize(liaisonAddr, now)
+
+       deferFunc = func() {
+               closerLiaisonNode()
+               for _, closeDataNode := range dataNodeClosers {
+                       closeDataNode()
+               }

Review Comment:
   The cleanup logic in `deferFunc` will attempt to close data nodes that may 
have already been stopped during test execution (line 83-84 in 
replication_test.go). When a test calls `dataNodeClosers[0]()` and then removes 
it from the slice, the `deferFunc` will still iterate over the remaining 
closers, but if tests fail or multiple tests run, this could lead to attempting 
to close already-closed nodes or missing nodes that should be closed.
   
   Consider implementing idempotent close operations or tracking which nodes 
have been closed to avoid double-closure issues. Additionally, ensure that node 
closures in tests don't interfere with the cleanup process.



##########
pkg/test/measure/testdata/groups/replicated.json:
##########
@@ -0,0 +1,18 @@
+{
+  "metadata": {
+    "name": "replicated_group"
+  },
+  "catalog": "CATALOG_MEASURE",
+  "resource_opts": {
+    "shard_num": 2,
+    "segment_interval": {
+      "unit": "UNIT_DAY",
+      "num": 1
+    },
+    "ttl": {
+      "unit": "UNIT_DAY",
+      "num": 7
+    },
+    "replicas": 2
+  }

Review Comment:
   The `updated_at` field is missing from this group configuration, which is 
inconsistent with other group JSON files in the same directory (e.g., 
`index_mode.json`, `exception.json`, `sw_metric.json`). While it may not be 
strictly required, consistency across test data files is important for 
maintainability. Consider adding:
   ```json
   "updated_at": "2021-04-15T01:30:15.01Z"
   ```
   after the `resource_opts` field to match the pattern used in other group 
files.
   ```suggestion
     },
     "updated_at": "2021-04-15T01:30:15.01Z"
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to