hanahmily commented on code in PR #698:
URL:
https://github.com/apache/skywalking-banyandb/pull/698#discussion_r2218009497
##########
banyand/gossip/client.go:
##########
Review Comment:
Move the gossip pkg into `property`
##########
pkg/cmdsetup/standalone.go:
##########
@@ -50,7 +51,7 @@ func newStandaloneCmd(runners ...run.Unit) *cobra.Command {
}
metricSvc := observability.NewMetricService(metaSvc, dataPipeline,
"standalone", nil)
pm := protector.NewMemory(metricSvc)
- propertySvc, err := property.NewService(metaSvc, dataPipeline,
metricSvc, pm)
+ propertySvc, err := property.NewService(metaSvc, dataPipeline,
metricSvc, pm, gossip.NewLocalMessenger())
Review Comment:
The standalone doesn't need repair since it is in this mode.
##########
pkg/cmdsetup/data.go:
##########
@@ -49,7 +52,9 @@ func newDataCmd(runners ...run.Unit) *cobra.Command {
metricSvc := observability.NewMetricService(metaSvc, metricsPipeline,
"data", nil)
pm := protector.NewMemory(metricSvc)
pipeline := sub.NewServer(metricSvc)
- propertySvc, err := property.NewService(metaSvc, pipeline, metricSvc,
pm)
+ client := pub.New(metaSvc, databasev1.Role_ROLE_DATA)
+ messenger := gossip.NewMessenger(metricSvc, client, pipeline)
Review Comment:
The gossip server should be initialized within the `property` module, as it
is not a top-level module.
##########
banyand/gossip/client.go:
##########
@@ -0,0 +1,98 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package gossip
+
+import (
+ "context"
+ "fmt"
+ "time"
+
+ "google.golang.org/protobuf/proto"
+
+ "github.com/apache/skywalking-banyandb/api/data"
+ gossipv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/gossip/v1"
+ "github.com/apache/skywalking-banyandb/banyand/queue"
+ "github.com/apache/skywalking-banyandb/pkg/bus"
+)
+
+func (s *service) Propagation(nodes []string, topic string, message
bus.Message) (Future, error) {
+ if len(nodes) < 2 {
+ return nil, fmt.Errorf("must provide at least 2 node")
+ }
+
+ // building propagation context
+ ctx := &gossipv1.PropagationContext{
+ Nodes: nodes,
+ OriginNode: s.nodeID,
+ OriginMessageId: uint64(message.ID()),
+ }
+ if len(nodes) == 2 {
+ ctx.MaxPropagationCount = 1
+ } else {
+ // two rounds of all nodes except the lasted node
+ // such when there have three nodes A, B, C,
+ // the propagation will be A -> B, B -> C, C -> A, A -> B
+ ctx.MaxPropagationCount = int32(len(nodes)*2 - 3)
+ }
+
+ // building propagation message request
+ d, ok := message.Data().(proto.Message)
+ if !ok {
+ return nil, fmt.Errorf("invalid message type %T",
message.Data())
+ }
+ payload, err := proto.Marshal(d)
+ if err != nil {
+ return nil, fmt.Errorf("failed to marshal message %T: %w", d,
err)
+ }
+ request := &gossipv1.PropagationMessageRequest{
+ Topic: topic,
+ MessageId: uint64(message.ID()),
+ Context: ctx,
+ Body: payload,
+ }
+
+ totalTimeout := time.Second * 15 *
time.Duration(ctx.MaxPropagationCount)
Review Comment:
The timeout is independent of the MaxPropagationCount.
##########
banyand/gossip/client.go:
##########
@@ -0,0 +1,98 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package gossip
+
+import (
+ "context"
+ "fmt"
+ "time"
+
+ "google.golang.org/protobuf/proto"
+
+ "github.com/apache/skywalking-banyandb/api/data"
+ gossipv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/gossip/v1"
+ "github.com/apache/skywalking-banyandb/banyand/queue"
+ "github.com/apache/skywalking-banyandb/pkg/bus"
+)
+
+func (s *service) Propagation(nodes []string, topic string, message
bus.Message) (Future, error) {
+ if len(nodes) < 2 {
+ return nil, fmt.Errorf("must provide at least 2 node")
+ }
+
+ // building propagation context
+ ctx := &gossipv1.PropagationContext{
+ Nodes: nodes,
+ OriginNode: s.nodeID,
+ OriginMessageId: uint64(message.ID()),
+ }
+ if len(nodes) == 2 {
+ ctx.MaxPropagationCount = 1
+ } else {
+ // two rounds of all nodes except the lasted node
+ // such when there have three nodes A, B, C,
+ // the propagation will be A -> B, B -> C, C -> A, A -> B
+ ctx.MaxPropagationCount = int32(len(nodes)*2 - 3)
+ }
Review Comment:
Why use two branches? `2 * 2 - 3 != 1` ?
##########
pkg/cmdsetup/data.go:
##########
@@ -49,7 +52,9 @@ func newDataCmd(runners ...run.Unit) *cobra.Command {
metricSvc := observability.NewMetricService(metaSvc, metricsPipeline,
"data", nil)
pm := protector.NewMemory(metricSvc)
pipeline := sub.NewServer(metricSvc)
- propertySvc, err := property.NewService(metaSvc, pipeline, metricSvc,
pm)
+ client := pub.New(metaSvc, databasev1.Role_ROLE_DATA)
Review Comment:
You should avoid using the `pub` package. Its client may reconnect to the
node, which is not needed by the gossip protocol. According to the design,
gossip should stop syncing when the connection is lost.
##########
banyand/gossip/client.go:
##########
@@ -0,0 +1,98 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package gossip
+
+import (
+ "context"
+ "fmt"
+ "time"
+
+ "google.golang.org/protobuf/proto"
+
+ "github.com/apache/skywalking-banyandb/api/data"
+ gossipv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/gossip/v1"
+ "github.com/apache/skywalking-banyandb/banyand/queue"
+ "github.com/apache/skywalking-banyandb/pkg/bus"
+)
+
+func (s *service) Propagation(nodes []string, topic string, message
bus.Message) (Future, error) {
+ if len(nodes) < 2 {
+ return nil, fmt.Errorf("must provide at least 2 node")
+ }
+
+ // building propagation context
+ ctx := &gossipv1.PropagationContext{
+ Nodes: nodes,
+ OriginNode: s.nodeID,
+ OriginMessageId: uint64(message.ID()),
+ }
+ if len(nodes) == 2 {
+ ctx.MaxPropagationCount = 1
+ } else {
+ // two rounds of all nodes except the lasted node
+ // such when there have three nodes A, B, C,
+ // the propagation will be A -> B, B -> C, C -> A, A -> B
+ ctx.MaxPropagationCount = int32(len(nodes)*2 - 3)
+ }
+
+ // building propagation message request
+ d, ok := message.Data().(proto.Message)
+ if !ok {
+ return nil, fmt.Errorf("invalid message type %T",
message.Data())
+ }
+ payload, err := proto.Marshal(d)
+ if err != nil {
+ return nil, fmt.Errorf("failed to marshal message %T: %w", d,
err)
+ }
+ request := &gossipv1.PropagationMessageRequest{
+ Topic: topic,
+ MessageId: uint64(message.ID()),
+ Context: ctx,
+ Body: payload,
+ }
+
+ totalTimeout := time.Second * 15 *
time.Duration(ctx.MaxPropagationCount)
+
+ var sendMsg bus.Message
+ var sendTo queue.Client
+ if nodes[0] == s.nodeID {
Review Comment:
This scenario should not happen: In a node, there are no two replicas for an
identical shard.
##########
banyand/gossip/client.go:
##########
@@ -0,0 +1,98 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package gossip
+
+import (
+ "context"
+ "fmt"
+ "time"
+
+ "google.golang.org/protobuf/proto"
+
+ "github.com/apache/skywalking-banyandb/api/data"
+ gossipv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/gossip/v1"
+ "github.com/apache/skywalking-banyandb/banyand/queue"
+ "github.com/apache/skywalking-banyandb/pkg/bus"
+)
+
+func (s *service) Propagation(nodes []string, topic string, message
bus.Message) (Future, error) {
+ if len(nodes) < 2 {
+ return nil, fmt.Errorf("must provide at least 2 node")
+ }
+
+ // building propagation context
+ ctx := &gossipv1.PropagationContext{
+ Nodes: nodes,
+ OriginNode: s.nodeID,
+ OriginMessageId: uint64(message.ID()),
+ }
+ if len(nodes) == 2 {
+ ctx.MaxPropagationCount = 1
+ } else {
+ // two rounds of all nodes except the lasted node
+ // such when there have three nodes A, B, C,
+ // the propagation will be A -> B, B -> C, C -> A, A -> B
+ ctx.MaxPropagationCount = int32(len(nodes)*2 - 3)
+ }
+
+ // building propagation message request
+ d, ok := message.Data().(proto.Message)
+ if !ok {
+ return nil, fmt.Errorf("invalid message type %T",
message.Data())
+ }
+ payload, err := proto.Marshal(d)
+ if err != nil {
+ return nil, fmt.Errorf("failed to marshal message %T: %w", d,
err)
+ }
+ request := &gossipv1.PropagationMessageRequest{
+ Topic: topic,
+ MessageId: uint64(message.ID()),
+ Context: ctx,
+ Body: payload,
+ }
+
+ totalTimeout := time.Second * 15 *
time.Duration(ctx.MaxPropagationCount)
+
+ var sendMsg bus.Message
+ var sendTo queue.Client
+ if nodes[0] == s.nodeID {
+ sendMsg = bus.NewMessage(message.ID(), request)
+ sendTo = s.local
+ } else {
+ sendMsg = bus.NewMessageWithNode(message.ID(), nodes[0],
request)
Review Comment:
If this function always gets the first node, how about using a node string
instead of a slice?
##########
banyand/gossip/client.go:
##########
@@ -0,0 +1,98 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package gossip
+
+import (
+ "context"
+ "fmt"
+ "time"
+
+ "google.golang.org/protobuf/proto"
+
+ "github.com/apache/skywalking-banyandb/api/data"
+ gossipv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/gossip/v1"
+ "github.com/apache/skywalking-banyandb/banyand/queue"
+ "github.com/apache/skywalking-banyandb/pkg/bus"
+)
+
+func (s *service) Propagation(nodes []string, topic string, message
bus.Message) (Future, error) {
+ if len(nodes) < 2 {
+ return nil, fmt.Errorf("must provide at least 2 node")
+ }
+
+ // building propagation context
+ ctx := &gossipv1.PropagationContext{
+ Nodes: nodes,
+ OriginNode: s.nodeID,
+ OriginMessageId: uint64(message.ID()),
+ }
+ if len(nodes) == 2 {
+ ctx.MaxPropagationCount = 1
+ } else {
+ // two rounds of all nodes except the lasted node
+ // such when there have three nodes A, B, C,
+ // the propagation will be A -> B, B -> C, C -> A, A -> B
+ ctx.MaxPropagationCount = int32(len(nodes)*2 - 3)
+ }
+
+ // building propagation message request
+ d, ok := message.Data().(proto.Message)
+ if !ok {
+ return nil, fmt.Errorf("invalid message type %T",
message.Data())
+ }
+ payload, err := proto.Marshal(d)
+ if err != nil {
+ return nil, fmt.Errorf("failed to marshal message %T: %w", d,
err)
+ }
+ request := &gossipv1.PropagationMessageRequest{
+ Topic: topic,
+ MessageId: uint64(message.ID()),
+ Context: ctx,
+ Body: payload,
+ }
+
+ totalTimeout := time.Second * 15 *
time.Duration(ctx.MaxPropagationCount)
+
+ var sendMsg bus.Message
+ var sendTo queue.Client
+ if nodes[0] == s.nodeID {
+ sendMsg = bus.NewMessage(message.ID(), request)
+ sendTo = s.local
+ } else {
+ sendMsg = bus.NewMessageWithNode(message.ID(), nodes[0],
request)
Review Comment:
It's not a message that is used in a pipeline. You don't have to use the
`bus` pkg.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]