Copilot commented on code in PR #963:
URL: 
https://github.com/apache/skywalking-banyandb/pull/963#discussion_r2752345206


##########
fodc/proxy/internal/api/server.go:
##########
@@ -98,7 +107,7 @@ func (s *Server) handleMetrics(w http.ResponseWriter, r 
*http.Request) {
        }
 
        filter := &metrics.Filter{
-               Role:    r.URL.Query().Get("role"),
+               Role:    r.URL.Query().Get("node_role"),

Review Comment:
   The query parameter name changed from `role` to `node_role` for filtering 
metrics. This is a breaking change to the API that could affect existing 
clients. If this is intentional, it should be documented in the PR description 
or a migration guide should be provided. Otherwise, consider maintaining 
backward compatibility by supporting both parameter names.



##########
fodc/proxy/internal/grpc/service.go:
##########
@@ -215,8 +248,6 @@ func (s *FODCService) StreamMetrics(stream 
fodcv1.FODCService_StreamMetricsServe
                return status.Errorf(codes.Unauthenticated, "agent ID not found 
in context or peer address")
        }
 

Review Comment:
   The `defer s.cleanupConnection(agentID)` statement was removed from 
`StreamMetrics`. This means connections are no longer cleaned up when the 
metrics stream ends. This could lead to stale connections accumulating in the 
`connections` map, causing a resource leak. Connections should be cleaned up 
when streams terminate, or there should be a separate mechanism to handle 
cleanup.
   ```suggestion
   
        defer s.cleanupConnection(agentID)
   ```



##########
fodc/agent/internal/proxy/client_test.go:
##########
@@ -616,10 +720,156 @@ func TestProxyClient_StartMetricsStream_StreamError(t 
*testing.T) {
        assert.Contains(t, err.Error(), "failed to create metrics stream")
 }
 
+func TestProxyClient_StartClusterStateStream_NotConnected(t *testing.T) {
+       testLogger := initTestLogger(t)
+       fr := flightrecorder.NewFlightRecorder(1000000)
+       pc := NewProxyClient("localhost:8080", "datanode-hot", "192.168.1.1", 
[]string{"data"}, nil, 5*time.Second, 10*time.Second, fr, nil, testLogger)
+
+       ctx := context.Background()
+       err := pc.StartClusterStateStream(ctx)
+
+       assert.Error(t, err)
+       assert.Contains(t, err.Error(), "client not connected")
+}
+
+func TestProxyClient_StartClusterStateStream_NoAgentID(t *testing.T) {
+       testLogger := initTestLogger(t)
+       fr := flightrecorder.NewFlightRecorder(1000000)
+       pc := NewProxyClient("localhost:8080", "datanode-hot", "192.168.1.1", 
[]string{"data"}, nil, 5*time.Second, 10*time.Second, fr, nil, testLogger)
+
+       mockClient := &mockFODCServiceClient{}
+       pc.streamsMu.Lock()
+       pc.client = mockClient
+       pc.streamsMu.Unlock()
+
+       ctx := context.Background()
+       err := pc.StartClusterStateStream(ctx)
+
+       assert.Error(t, err)
+       assert.Contains(t, err.Error(), "agent ID not available")
+}
+
+func TestProxyClient_StartClusterStateStream_Success(t *testing.T) {
+       testLogger := initTestLogger(t)
+       fr := flightrecorder.NewFlightRecorder(1000000)
+       pc := NewProxyClient("localhost:8080", "datanode-hot", "192.168.1.1", 
[]string{"data"}, nil, 5*time.Second, 10*time.Second, fr, nil, testLogger)
+
+       ctx := context.Background()
+       mockClient := &mockFODCServiceClient{}
+       mockStream := newMockStreamClusterStateClient(ctx)
+
+       mockClient.streamClusterStateFunc = func(ctxParam context.Context, _ 
...grpc.CallOption) (fodcv1.FODCService_StreamClusterTopologyClient, error) {
+               // Verify metadata contains agent_id
+               md, ok := metadata.FromOutgoingContext(ctxParam)
+               require.True(t, ok)
+               agentIDs := md.Get("agent_id")
+               require.Len(t, agentIDs, 1)
+               assert.Equal(t, testAgentID, agentIDs[0])
+               return mockStream, nil
+       }
+
+       pc.streamsMu.Lock()
+       pc.client = mockClient
+       pc.agentID = testAgentID
+       pc.streamsMu.Unlock()
+
+       err := pc.StartClusterStateStream(ctx)
+
+       require.NoError(t, err)
+       pc.streamsMu.RLock()
+       clusterStateStream := pc.clusterStateStream
+       pc.streamsMu.RUnlock()
+       assert.NotNil(t, clusterStateStream)
+}
+
+func TestProxyClient_StartClusterStateStream_StreamError(t *testing.T) {
+       testLogger := initTestLogger(t)
+       fr := flightrecorder.NewFlightRecorder(1000000)
+       pc := NewProxyClient("localhost:8080", "datanode-hot", "192.168.1.1", 
[]string{"data"}, nil, 5*time.Second, 10*time.Second, fr, nil, testLogger)
+
+       ctx := context.Background()
+       mockClient := &mockFODCServiceClient{}
+       mockClient.streamClusterStateFunc = func(_ context.Context, _ 
...grpc.CallOption) (fodcv1.FODCService_StreamClusterTopologyClient, error) {
+               return nil, errors.New("stream creation failed")
+       }
+
+       pc.streamsMu.Lock()
+       pc.client = mockClient
+       pc.agentID = testAgentID
+       pc.streamsMu.Unlock()
+
+       err := pc.StartClusterStateStream(ctx)
+
+       assert.Error(t, err)
+       assert.Contains(t, err.Error(), "failed to create cluster state stream")
+}
+
+func TestProxyClient_SendClusterTopology_NoStream(t *testing.T) {
+       testLogger := initTestLogger(t)
+       fr := flightrecorder.NewFlightRecorder(1000000)
+       pc := NewProxyClient("localhost:8080", "datanode-hot", "192.168.1.1", 
[]string{"data"}, nil, 5*time.Second, 10*time.Second, fr, nil, testLogger)
+
+       err := pc.sendClusterTopology()
+
+       assert.Error(t, err)
+       assert.Contains(t, err.Error(), "cluster state stream not established")
+}
+
+func TestProxyClient_SendClusterTopology_Success(t *testing.T) {
+       testLogger := initTestLogger(t)
+       fr := flightrecorder.NewFlightRecorder(1000000)
+       collector := cluster.NewCollector(testLogger, 
[]string{"localhost:17914"}, 30*time.Second, "test-pod")
+
+       // Populate collector with test topology data using unsafe to access 
private field
+       testNode := &databasev1.Node{
+               Metadata:    &commonv1.Metadata{Name: "test-node"},
+               GrpcAddress: "localhost:17913",
+               Roles:       []databasev1.Role{databasev1.Role_ROLE_DATA},
+       }
+       testTopology := cluster.TopologyMap{
+               Nodes: []*databasev1.Node{testNode},
+               Calls: []*fodcv1.Call{
+                       {
+                               Id:     "test-call-1",
+                               Source: "test-node",
+                               Target: "other-node",
+                       },
+               },
+       }
+       // Use unsafe to set the private clusterTopology field
+       collectorValue := reflect.ValueOf(collector).Elem()
+       clusterTopologyField := collectorValue.FieldByName("clusterTopology")
+       if clusterTopologyField.IsValid() {
+               // Get the address of the field and use unsafe to set it
+               fieldPtr := unsafe.Pointer(clusterTopologyField.UnsafeAddr())
+               *(*cluster.TopologyMap)(fieldPtr) = testTopology
+       }

Review Comment:
   Using `unsafe` and `reflect` to access and modify private fields in tests is 
a code smell that indicates the API design may need improvement. Consider 
either:
   1. Adding a public method to set topology data for testing purposes
   2. Making the field accessible through a testing-specific build tag
   3. Refactoring to use dependency injection with an interface
   
   This approach makes tests brittle and bypasses Go's encapsulation.



##########
fodc/agent/internal/proxy/client.go:
##########
@@ -230,6 +235,116 @@ func (c *Client) StartMetricsStream(ctx context.Context) 
error {
        return nil
 }
 
+// StartClusterStateStream establishes bi-directional cluster state stream 
with Proxy.
+func (c *Client) StartClusterStateStream(ctx context.Context) error {
+       c.streamsMu.Lock()
+       if c.client == nil {
+               c.streamsMu.Unlock()
+               return fmt.Errorf("client not connected, call Connect() first")
+       }
+       client := c.client
+       agentID := c.agentID
+       c.streamsMu.Unlock()
+
+       if agentID == "" {
+               return fmt.Errorf("agent ID not available, register agent 
first")
+       }
+
+       md := metadata.New(map[string]string{"agent_id": agentID})
+       ctxWithMetadata := metadata.NewOutgoingContext(ctx, md)
+
+       stream, streamErr := client.StreamClusterTopology(ctxWithMetadata)
+       if streamErr != nil {
+               return fmt.Errorf("failed to create cluster state stream: %w", 
streamErr)
+       }
+
+       c.streamsMu.Lock()
+       c.clusterStateStream = stream
+       c.streamsMu.Unlock()
+
+       go c.handleClusterStateStream(ctx, stream)
+
+       c.logger.Info().
+               Str("agent_id", agentID).
+               Msg("Cluster state stream established with Proxy")
+
+       return nil
+}
+
+// sendClusterTopology sends cluster topology to Proxy.
+func (c *Client) sendClusterTopology() error {
+       c.streamsMu.RLock()
+       if c.disconnected || c.clusterStateStream == nil {
+               c.streamsMu.RUnlock()
+               return fmt.Errorf("cluster state stream not established")
+       }
+       clusterStateStream := c.clusterStateStream
+       collector := c.clusterCollector
+       c.streamsMu.RUnlock()
+       if collector == nil {
+               return fmt.Errorf("cluster collector not available")
+       }
+       topology := collector.GetClusterTopology()
+       if len(topology.Nodes) == 0 && len(topology.Calls) == 0 {
+               return fmt.Errorf("no cluster topology available to send")
+       }
+       req := &fodcv1.StreamClusterTopologyRequest{
+               Topology: &fodcv1.Topology{
+                       Nodes: topology.Nodes,
+                       Calls: topology.Calls,
+               },
+               Timestamp: timestamppb.Now(),
+       }
+       if sendErr := clusterStateStream.Send(req); sendErr != nil {
+               return fmt.Errorf("failed to send cluster topology: %w", 
sendErr)
+       }
+       c.logger.Info().
+               Int("nodes_count", len(topology.Nodes)).
+               Int("calls_count", len(topology.Calls)).
+               Msg("Successfully sent cluster topology to proxy")
+       return nil
+}
+
+// handleClusterStateStream handles the cluster state stream.
+func (c *Client) handleClusterStateStream(ctx context.Context, stream 
fodcv1.FODCService_StreamClusterTopologyClient) {
+       c.streamsMu.RLock()
+       stopCh := c.stopCh
+       c.streamsMu.RUnlock()
+       for {
+               select {
+               case <-ctx.Done():
+                       return
+               case <-stopCh:
+                       return
+               default:
+               }
+               resp, recvErr := stream.Recv()
+               if errors.Is(recvErr, io.EOF) {
+                       c.logger.Warn().Msg("Cluster state stream closed by 
Proxy, reconnecting...")
+                       go c.reconnect(ctx)
+                       return
+               }
+               if recvErr != nil {
+                       c.streamsMu.RLock()
+                       disconnected := c.disconnected
+                       c.streamsMu.RUnlock()
+                       if disconnected {
+                               c.logger.Debug().Err(recvErr).Msg("Cluster 
state stream closed")
+                               return
+                       }
+                       c.logger.Error().Err(recvErr).Msg("Error receiving from 
cluster state stream, reconnecting...")
+                       go c.reconnect(ctx)
+                       return
+               }
+               if resp != nil && resp.RequestTopology {
+                       c.logger.Debug().Msg("Received cluster data request 
from proxy")
+                       if sendErr := c.sendClusterTopology(); sendErr != nil {
+                               c.logger.Error().Err(sendErr).Msg("Failed to 
send cluster topology to proxy")
+                       }

Review Comment:
   When `sendClusterTopology` returns an error because no topology is available 
(line 288-290), it's logged but not properly handled. If the proxy requests 
topology when none is available, the error is only logged and the stream 
continues. Consider either:
   1. Sending an empty topology response to acknowledge the request
   2. Implementing a retry mechanism
   3. Returning a different error that doesn't get logged as a failure
   
   This prevents the proxy from knowing whether the agent truly has no topology 
or if there was an error.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to