Copilot commented on code in PR #893: URL: https://github.com/apache/skywalking-banyandb/pull/893#discussion_r2606681672
########## test/integration/replication/replication_test.go: ########## @@ -0,0 +1,137 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package replication_test + +import ( + "context" + "time" + + g "github.com/onsi/ginkgo/v2" + gm "github.com/onsi/gomega" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + + commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" + databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" + "github.com/apache/skywalking-banyandb/banyand/metadata" + "github.com/apache/skywalking-banyandb/pkg/grpchelper" + "github.com/apache/skywalking-banyandb/pkg/test/flags" + "github.com/apache/skywalking-banyandb/pkg/test/helpers" + casesmeasuredata "github.com/apache/skywalking-banyandb/test/cases/measure/data" +) + +var _ = g.Describe("Replication", func() { + var ( + conn *grpc.ClientConn + ) + + g.BeforeEach(func() { + var err error + conn, err = grpchelper.Conn(liaisonAddr, 10*time.Second, + grpc.WithTransportCredentials(insecure.NewCredentials())) + gm.Expect(err).NotTo(gm.HaveOccurred()) + }) + + g.AfterEach(func() { + if conn != nil { + gm.Expect(conn.Close()).To(gm.Succeed()) + } + }) + + g.Context("with replicated_group", func() { + g.It("should survive node failure", func() { + g.By("Verifying the measure exists in replicated_group") + ctx := context.Background() + measureMetadata := &commonv1.Metadata{ + Name: "service_traffic", + Group: "replicated_group", + } + + schemaClient := databasev1.NewMeasureRegistryServiceClient(conn) + resp, err := schemaClient.Get(ctx, &databasev1.MeasureRegistryServiceGetRequest{Metadata: measureMetadata}) + gm.Expect(err).NotTo(gm.HaveOccurred()) + gm.Expect(resp.GetMeasure()).NotTo(gm.BeNil()) + gm.Expect(resp.GetMeasure().GetMetadata().GetGroup()).To(gm.Equal("replicated_group")) + + g.By("Getting list of all nodes from etcd (includes data nodes + liaison)") + nodePath := "/" + metadata.DefaultNamespace + "/nodes" + allNodes, err := helpers.ListKeys(etcdEndpoint, nodePath) + gm.Expect(err).NotTo(gm.HaveOccurred()) + + // We have: 3 data nodes + 1 liaison node = 4 nodes total + gm.Expect(len(allNodes)).To(gm.Equal(4), + "Should have 4 nodes total (3 data nodes + 1 liaison node), found %d", len(allNodes)) + + g.By("Stopping one data node") + // We should have 3 data node closers in dataNodeClosers + // Stop the first one + dataNodeClosers[0]() + dataNodeClosers = dataNodeClosers[1:] Review Comment: Mutating the package-level `dataNodeClosers` slice during test execution creates a side effect that affects test isolation. If tests were to run in parallel or be re-run, this mutation could cause unexpected behavior since the slice is permanently modified. Consider creating a local copy of the closers needed for this specific test, or better yet, redesign the approach to avoid modifying shared state. For example: ```go // Create a local copy closersToStop := make([]func(), len(dataNodeClosers)) copy(closersToStop, dataNodeClosers) closersToStop[0]() ``` ```suggestion // Create a local copy to avoid mutating the package-level slice closersToStop := make([]func(), len(dataNodeClosers)) copy(closersToStop, dataNodeClosers) closersToStop[0]() ``` ########## test/integration/replication/replication_test.go: ########## @@ -0,0 +1,137 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package replication_test + +import ( + "context" + "time" + + g "github.com/onsi/ginkgo/v2" + gm "github.com/onsi/gomega" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + + commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" + databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" + "github.com/apache/skywalking-banyandb/banyand/metadata" + "github.com/apache/skywalking-banyandb/pkg/grpchelper" + "github.com/apache/skywalking-banyandb/pkg/test/flags" + "github.com/apache/skywalking-banyandb/pkg/test/helpers" + casesmeasuredata "github.com/apache/skywalking-banyandb/test/cases/measure/data" +) + +var _ = g.Describe("Replication", func() { + var ( + conn *grpc.ClientConn + ) + + g.BeforeEach(func() { + var err error + conn, err = grpchelper.Conn(liaisonAddr, 10*time.Second, + grpc.WithTransportCredentials(insecure.NewCredentials())) + gm.Expect(err).NotTo(gm.HaveOccurred()) + }) + + g.AfterEach(func() { + if conn != nil { + gm.Expect(conn.Close()).To(gm.Succeed()) + } + }) + + g.Context("with replicated_group", func() { + g.It("should survive node failure", func() { + g.By("Verifying the measure exists in replicated_group") + ctx := context.Background() + measureMetadata := &commonv1.Metadata{ + Name: "service_traffic", + Group: "replicated_group", + } + + schemaClient := databasev1.NewMeasureRegistryServiceClient(conn) + resp, err := schemaClient.Get(ctx, &databasev1.MeasureRegistryServiceGetRequest{Metadata: measureMetadata}) + gm.Expect(err).NotTo(gm.HaveOccurred()) + gm.Expect(resp.GetMeasure()).NotTo(gm.BeNil()) + gm.Expect(resp.GetMeasure().GetMetadata().GetGroup()).To(gm.Equal("replicated_group")) + + g.By("Getting list of all nodes from etcd (includes data nodes + liaison)") + nodePath := "/" + metadata.DefaultNamespace + "/nodes" + allNodes, err := helpers.ListKeys(etcdEndpoint, nodePath) + gm.Expect(err).NotTo(gm.HaveOccurred()) + + // We have: 3 data nodes + 1 liaison node = 4 nodes total + gm.Expect(len(allNodes)).To(gm.Equal(4), + "Should have 4 nodes total (3 data nodes + 1 liaison node), found %d", len(allNodes)) + + g.By("Stopping one data node") + // We should have 3 data node closers in dataNodeClosers + // Stop the first one + dataNodeClosers[0]() + dataNodeClosers = dataNodeClosers[1:] Review Comment: The `dataNodeClosers` variable is accessed in the test but is not properly shared across test processes. In Ginkgo's `SynchronizedBeforeSuite`, the first function runs once on process 1 and returns serialized data to other processes via the second function. Package-level variables set in the first function are not accessible in tests running in other processes. The `dataNodeClosers` slice should be serialized and passed through the suite configuration (like `liaisonAddr`, `etcdEndpoint`, and `now`), but since closers are functions that cannot be serialized, the test architecture needs to be reconsidered. One solution is to avoid stopping nodes in parallel tests, or ensure this specific test doesn't run in parallel mode. Another approach is to store node information (like node IDs) instead of closers and implement a shutdown mechanism through the test framework. ########## test/integration/replication/replication_suite_test.go: ########## @@ -0,0 +1,181 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package replication_test + +import ( + "context" + "encoding/json" + "fmt" + "testing" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/onsi/gomega/gleak" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + + "github.com/apache/skywalking-banyandb/banyand/metadata" + "github.com/apache/skywalking-banyandb/banyand/metadata/embeddedetcd" + "github.com/apache/skywalking-banyandb/banyand/metadata/schema" + "github.com/apache/skywalking-banyandb/pkg/grpchelper" + "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/pool" + "github.com/apache/skywalking-banyandb/pkg/test" + "github.com/apache/skywalking-banyandb/pkg/test/flags" + "github.com/apache/skywalking-banyandb/pkg/test/gmatcher" + "github.com/apache/skywalking-banyandb/pkg/test/helpers" + test_measure "github.com/apache/skywalking-banyandb/pkg/test/measure" + test_property "github.com/apache/skywalking-banyandb/pkg/test/property" + "github.com/apache/skywalking-banyandb/pkg/test/setup" + test_stream "github.com/apache/skywalking-banyandb/pkg/test/stream" + test_trace "github.com/apache/skywalking-banyandb/pkg/test/trace" + "github.com/apache/skywalking-banyandb/pkg/timestamp" + test_cases "github.com/apache/skywalking-banyandb/test/cases" + casesmeasure "github.com/apache/skywalking-banyandb/test/cases/measure" +) + +func TestReplication(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Replication Suite") +} + +var ( + deferFunc func() + goods []gleak.Goroutine + now time.Time + connection *grpc.ClientConn + liaisonAddr string + etcdEndpoint string + dataNodeClosers []func() +) + +var _ = SynchronizedBeforeSuite(func() []byte { + Expect(logger.Init(logger.Logging{ + Env: "dev", + Level: flags.LogLevel, + })).To(Succeed()) + pool.EnableStackTracking(true) + goods = gleak.Goroutines() + + By("Starting etcd server") + ports, err := test.AllocateFreePorts(2) + Expect(err).NotTo(HaveOccurred()) + dir, spaceDef, err := test.NewSpace() + Expect(err).NotTo(HaveOccurred()) + ep := fmt.Sprintf("http://127.0.0.1:%d", ports[0]) + etcdEndpoint = ep + + server, err := embeddedetcd.NewServer( + embeddedetcd.ConfigureListener([]string{ep}, []string{fmt.Sprintf("http://127.0.0.1:%d", ports[1])}), + embeddedetcd.RootDir(dir), + embeddedetcd.AutoCompactionMode("periodic"), + embeddedetcd.AutoCompactionRetention("1h"), + embeddedetcd.QuotaBackendBytes(2*1024*1024*1024), + ) + Expect(err).ShouldNot(HaveOccurred()) + <-server.ReadyNotify() + + By("Loading schema") + schemaRegistry, err := schema.NewEtcdSchemaRegistry( + schema.Namespace(metadata.DefaultNamespace), + schema.ConfigureServerEndpoints([]string{ep}), + ) + Expect(err).NotTo(HaveOccurred()) + defer schemaRegistry.Close() + + ctx := context.Background() + // Preload all schemas since test_cases.Initialize needs them + test_stream.PreloadSchema(ctx, schemaRegistry) + test_measure.PreloadSchema(ctx, schemaRegistry) + test_trace.PreloadSchema(ctx, schemaRegistry) + test_property.PreloadSchema(ctx, schemaRegistry) + + By("Starting 3 data nodes for replication test") + dataNodeClosers = make([]func(), 0, 3) + + for i := 0; i < 3; i++ { + closeDataNode := setup.DataNode(ep) + dataNodeClosers = append(dataNodeClosers, closeDataNode) + } + + By("Starting liaison node") + liaisonAddr, closerLiaisonNode := setup.LiaisonNode(ep) + + By("Initializing test cases") + ns := timestamp.NowMilli().UnixNano() + now = time.Unix(0, ns-ns%int64(time.Minute)) + + // Initialize test data - this loads data for all test types + test_cases.Initialize(liaisonAddr, now) + + deferFunc = func() { + closerLiaisonNode() + for _, closeDataNode := range dataNodeClosers { + closeDataNode() + } Review Comment: The cleanup logic in `deferFunc` will attempt to close data nodes that may have already been stopped during test execution (line 83-84 in replication_test.go). When a test calls `dataNodeClosers[0]()` and then removes it from the slice, the `deferFunc` will still iterate over the remaining closers, but if tests fail or multiple tests run, this could lead to attempting to close already-closed nodes or missing nodes that should be closed. Consider implementing idempotent close operations or tracking which nodes have been closed to avoid double-closure issues. Additionally, ensure that node closures in tests don't interfere with the cleanup process. ########## pkg/test/measure/testdata/groups/replicated.json: ########## @@ -0,0 +1,18 @@ +{ + "metadata": { + "name": "replicated_group" + }, + "catalog": "CATALOG_MEASURE", + "resource_opts": { + "shard_num": 2, + "segment_interval": { + "unit": "UNIT_DAY", + "num": 1 + }, + "ttl": { + "unit": "UNIT_DAY", + "num": 7 + }, + "replicas": 2 + } Review Comment: The `updated_at` field is missing from this group configuration, which is inconsistent with other group JSON files in the same directory (e.g., `index_mode.json`, `exception.json`, `sw_metric.json`). While it may not be strictly required, consistency across test data files is important for maintainability. Consider adding: ```json "updated_at": "2021-04-15T01:30:15.01Z" ``` after the `resource_opts` field to match the pattern used in other group files. ```suggestion }, "updated_at": "2021-04-15T01:30:15.01Z" ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
