Copilot commented on code in PR #1200:
URL:
https://github.com/apache/skywalking-banyandb/pull/1200#discussion_r3502569061
##########
fodc/agent/internal/watchdog/watchdog.go:
##########
@@ -320,6 +330,13 @@ func (w *Watchdog) pollAndForward(ctx context.Context)
(context.Context, error)
"metric_count": fmt.Sprintf("%d", len(rawMetrics)),
})
+ w.mu.RLock()
+ hooks := w.postPollHooks
+ w.mu.RUnlock()
+ for _, hook := range hooks {
+ hook(ctx)
+ }
Review Comment:
`postPollHooks` is read under the mutex but the slice is then iterated
without copying. If `AddPostPollHook` is ever called concurrently with polling
(even accidentally), `append` can mutate the underlying slice array while this
loop is reading it, causing a data race. Copy the slice header+backing array
under the lock before iterating.
##########
fodc/proxy/internal/grpc/service.go:
##########
@@ -732,3 +912,214 @@ func (s *FODCService) RequestDiagnostics(agentID string)
error {
}
return agentConn.sendDiagnosticsRequest()
}
+
+// fetchChunkTimeout bounds the wait for each download chunk; it resets per
chunk, so a
+// large but steadily-streaming profile never trips it.
+const fetchChunkTimeout = 30 * time.Second
+
+// StreamPressureProfiles handles the bi-directional memory-pressure pprof
stream: agents
+// reply to list/fetch commands with metadata records or binary chunks.
+func (s *FODCService) StreamPressureProfiles(stream
fodcv1.FODCService_StreamPressureProfilesServer) error {
+ ctx := stream.Context()
+ agentID := s.getAgentIDFromContext(ctx)
+ if agentID == "" {
+ agentID = s.getAgentIDFromPeer(ctx)
+ if agentID != "" {
+ s.logger.Warn().Str("agent_id", agentID).
+ Msg("Agent ID not found in metadata, using peer
address fallback (this may be unreliable)")
+ }
+ }
+ if agentID == "" {
+ s.logger.Error().Msg("Agent ID not found in context metadata or
peer address for pressure profiles stream")
+ return status.Errorf(codes.Unauthenticated, "agent ID not found
in context or peer address")
+ }
+
+ s.connectionsMu.Lock()
+ existingConn, exists := s.connections[agentID]
+ if exists {
+ existingConn.setPressureProfilesStream(stream)
+ existingConn.updateActivity()
+ } else {
+ s.connections[agentID] = &agentConnection{
+ agentID: agentID,
+ pressureProfilesStream: stream,
+ lastActivity: time.Now(),
+ }
+ }
+ s.connectionsMu.Unlock()
+
+ for {
+ req, recvErr := stream.Recv()
+ if errors.Is(recvErr, io.EOF) {
+ s.logger.Debug().Str("agent_id", agentID).Msg("Pressure
profiles stream closed by agent")
+ return nil
+ }
+ if recvErr != nil {
+ if errors.Is(recvErr, context.Canceled) ||
errors.Is(recvErr, context.DeadlineExceeded) {
+ s.logger.Debug().Err(recvErr).Str("agent_id",
agentID).Msg("Pressure profiles stream closed")
+ } else if st, ok := status.FromError(recvErr); ok &&
(st.Code() == codes.Canceled || st.Code() == codes.DeadlineExceeded) {
+ s.logger.Debug().Err(recvErr).Str("agent_id",
agentID).Msg("Pressure profiles stream closed")
+ } else {
+ s.logger.Error().Err(recvErr).Str("agent_id",
agentID).Msg("Error receiving pressure profiles")
+ }
+ return recvErr
+ }
+ s.handlePressurePayload(agentID, req)
+ }
+}
+
+// handlePressurePayload routes one stream message: a metadata record to the
aggregator,
+// or a download chunk to the waiting fetch handler.
+func (s *FODCService) handlePressurePayload(agentID string, req
*fodcv1.StreamPressureProfilesRequest) {
+ switch payload := req.Payload.(type) {
+ case *fodcv1.StreamPressureProfilesRequest_Record:
+ if s.pressureAggregator == nil {
+ return
+ }
+ agentInfo, getErr := s.registry.GetAgentByID(agentID)
+ if getErr != nil {
+ s.logger.Error().Err(getErr).Str("agent_id",
agentID).Msg("Failed to get agent info for pressure profile record")
+ return
+ }
+ s.pressureAggregator.ProcessProfileFromAgent(agentID,
agentInfo, payload.Record)
+ case *fodcv1.StreamPressureProfilesRequest_Chunk:
+ s.connectionsMu.RLock()
+ conn := s.connections[agentID]
+ s.connectionsMu.RUnlock()
+ if conn != nil {
+ conn.deliverChunk(payload.Chunk)
+ }
+ case *fodcv1.StreamPressureProfilesRequest_ListComplete:
+ // Promote the agent's staged list into the cache before waking
the waiting
+ // CollectProfiles, so the snapshot it returns reflects this
round's full set
+ // (and drops events the agent has evicted).
+ if s.pressureAggregator != nil {
+ s.pressureAggregator.FinalizeAgentList(agentID)
+ }
+ s.ackList(payload.ListComplete.RequestId, agentID)
+ }
+}
+
+// collectListTTL bounds how long a list waiter lingers in the registry,
independent of the
+// aggregator's own wait timeout, so a request whose agents never all ack does
not leak.
+const collectListTTL = 30 * time.Second
+
+// CollectList sends a list_profiles command (under one request id) to every
agent and returns a
+// channel closed once each successfully-contacted agent has reported
ListComplete. Agents that
+// cannot be reached are dropped immediately so they never hold the channel
open.
+func (s *FODCService) CollectList(agentIDs []string) <-chan struct{} {
+ if len(agentIDs) == 0 {
+ done := make(chan struct{})
+ close(done)
+ return done
+ }
+ requestID := uuid.NewString()
+ waiter := &listWaiter{pending: make(map[string]struct{}), done:
make(chan struct{})}
+ for _, id := range agentIDs {
+ waiter.pending[id] = struct{}{}
+ }
+
+ s.listMu.Lock()
+ s.listWaiters[requestID] = waiter
+ s.listMu.Unlock()
+ // Bound the registry entry's lifetime and guarantee the waiter
unblocks even if some agent
+ // never acks, regardless of whether collectListTTL stays above the
aggregator's timeout.
+ time.AfterFunc(collectListTTL, func() {
+ s.listMu.Lock()
+ delete(s.listWaiters, requestID)
+ s.listMu.Unlock()
+ waiter.forceDone()
+ })
+
Review Comment:
`CollectList` registers each `listWaiter` in `s.listWaiters` but only
removes it via the fixed TTL timer. Even when all agents ack immediately, the
entry (and its timer) linger for up to 30s, so frequent list calls can
accumulate many waiters/timers unnecessarily. Remove the waiter from the map as
soon as `waiter.done` closes (and stop the TTL timer) to avoid transient leaks
under load.
##########
test/e2e-v2/cases/fodc-pressure/e2e-mac.yaml:
##########
@@ -0,0 +1,58 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# LOCAL macOS-ADAPTED copy of e2e.yaml. Identical scenario, but drops the
CI-only
+# tool-install setup steps (install kubectl / install yq / set PATH) because
those
+# scripts download Linux binaries that cannot execute on macOS. The local
machine
+# already has darwin kubectl/python3/curl/go on PATH. Do NOT commit this file.
Review Comment:
This file is committed into the repository, but its header says "Do NOT
commit this file." That makes the documentation misleading for future
maintainers and reviewers. Either remove this file from the PR, or update the
header comment to reflect that it is intentionally kept in-repo for local macOS
runs.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]