hanahmily commented on code in PR #1200:
URL: 
https://github.com/apache/skywalking-banyandb/pull/1200#discussion_r3506744186


##########
fodc/agent/internal/cmd/root.go:
##########
@@ -139,6 +154,20 @@ func init() {
                "Directory where lifecycle sidecar writes report files")
        rootCmd.Flags().DurationVar(&lifecycleCacheTTL, "lifecycle-cache-ttl", 
10*time.Minute,
                "TTL for cached lifecycle data. After expiry, the next 
collection call refreshes the cache")
+       rootCmd.Flags().BoolVar(&pressureProfilerEnabled, 
"pressure-profiler-enabled", true,
+               "Enable automatic heap+goroutine pprof capture when the 
monitored container's RSS approaches its cgroup memory limit")
+       rootCmd.Flags().IntVar(&pressureTriggerPercent, 
"pressure-profiler-trigger-percent", defaultPressureTriggerPercent,

Review Comment:
   `--pressure-profiler-trigger-percent` is unvalidated. In `OnPollComplete` 
the threshold is `limit * TriggerPercent / 100`, so:
   - `0` (or a negative, which `IntVar` allows) makes `threshold <= 0`, so `rss 
< threshold` is never true and capture fires on every poll (throttled only by 
the cooldown + capturing CAS) regardless of real pressure;
   - `> 100` makes `threshold > limit`, silently disabling capture until RSS 
already exceeds the cgroup limit (i.e. you're effectively already OOMing).
   
   Both are silent misconfigurations with no startup error. Please add a bound 
in `validateFlags()` (e.g. reject `< 1 || > 100`) with a clear message, 
matching how the other bounded flags are validated.



##########
fodc/proxy/internal/pressure/aggregator.go:
##########
@@ -0,0 +1,280 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package pressure aggregates memory-pressure pprof profile metadata from all 
agents.
+package pressure
+
+import (
+       "context"
+       "strings"
+       "sync"
+       "time"
+
+       fodcv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/fodc/v1"
+       "github.com/apache/skywalking-banyandb/fodc/proxy/internal/registry"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+// listTimeout bounds how long CollectProfiles waits for agents to finish 
streaming their
+// metadata. It is a fallback only: the collection normally returns as soon as 
every contacted
+// agent signals completion, well before this elapses.
+const listTimeout = 10 * time.Second
+
+// ProfileInfo is one pprof profile's metadata in the HTTP list response.
+type ProfileInfo struct {
+       Type      string `json:"type"`
+       Filename  string `json:"filename"`
+       Filepath  string `json:"filepath"`
+       Format    string `json:"format"`
+       SizeBytes int64  `json:"size_bytes"`
+}
+
+// AggregatedPressureProfile enriches a capture event with the agent's 
identity.
+type AggregatedPressureProfile struct {
+       CapturedAt       time.Time     `json:"captured_at"`
+       AgentID          string        `json:"agent_id"`
+       PodName          string        `json:"pod_name"`
+       Role             string        `json:"role"`
+       ProfileID        string        `json:"profile_id"`
+       SourceEndpoint   string        `json:"source_endpoint"`
+       Profiles         []ProfileInfo `json:"profiles"`
+       RSSBytes         uint64        `json:"rss_bytes"`
+       CgroupLimitBytes uint64        `json:"cgroup_limit_bytes"`
+       ThresholdBytes   uint64        `json:"threshold_bytes"`
+       TriggerPercent   uint32        `json:"trigger_percent"`
+}
+
+// Filter narrows the listed/collected profiles by role and pod name.
+type Filter struct {
+       Role    string
+       PodName string
+}
+
+// RequestSender drives the agents to stream their profile metadata. 
CollectList sends a
+// list command to every given agent under one request and returns a channel 
closed once all
+// of them have signaled completion.
+type RequestSender interface {
+       CollectList(agentIDs []string) <-chan struct{}
+}
+
+// Aggregator keeps a dedup cache of capture-event metadata nested by agentID 
-> profileID.
+// On each list request it pulls fresh metadata from every matching agent, 
waits a short
+// window, then returns the cache snapshot. The authoritative copy lives on 
the agent disk.
+type Aggregator struct {
+       registry    *registry.AgentRegistry
+       grpcService RequestSender
+       // cache is the live, queryable set: agentID -> profileID -> record.
+       cache map[string]map[string]*AggregatedPressureProfile
+       // staging holds the records of an in-progress list round, same shape 
as cache. A round's
+       // records land here as they stream in and the whole per-agent map is 
promoted to cache in
+       // one swap on ListComplete (FinalizeAgentList), so events the agent 
has evicted from its
+       // disk drop out of the cache instead of lingering as unservable 
entries.
+       staging map[string]map[string]*AggregatedPressureProfile
+       log     *logger.Logger
+       cacheMu sync.RWMutex
+       mu      sync.RWMutex
+}
+
+// NewAggregator creates a new Aggregator instance.
+func NewAggregator(reg *registry.AgentRegistry, grpcService RequestSender, log 
*logger.Logger) *Aggregator {
+       return &Aggregator{
+               registry:    reg,
+               grpcService: grpcService,
+               log:         log,
+               cache:       
make(map[string]map[string]*AggregatedPressureProfile),
+               staging:     
make(map[string]map[string]*AggregatedPressureProfile),
+       }
+}
+
+// SetGRPCService sets the gRPC service used to request profiles from agents.
+func (a *Aggregator) SetGRPCService(grpcService RequestSender) {
+       a.mu.Lock()
+       defer a.mu.Unlock()
+       a.grpcService = grpcService
+}
+
+// RemoveAgent drops all cached events for an agent (called on disconnect).
+func (a *Aggregator) RemoveAgent(agentID string) {
+       a.cacheMu.Lock()
+       defer a.cacheMu.Unlock()
+       delete(a.cache, agentID)
+       delete(a.staging, agentID)
+}
+
+// ProcessProfileFromAgent caches the metadata of one capture event, enriched 
with the
+// agent's pod name and role.
+func (a *Aggregator) ProcessProfileFromAgent(agentID string, agentInfo 
*registry.AgentInfo, record *fodcv1.PressureProfileRecord) {
+       if record == nil {
+               return
+       }
+       profiles := make([]ProfileInfo, 0, len(record.Profiles))
+       for _, p := range record.Profiles {
+               profiles = append(profiles, ProfileInfo{
+                       Type:      p.Type,
+                       Filename:  p.Filename,
+                       Filepath:  p.Filepath,
+                       Format:    p.Format,
+                       SizeBytes: p.SizeBytes,
+               })
+       }
+       agg := &AggregatedPressureProfile{
+               AgentID:          agentID,
+               PodName:          agentInfo.AgentIdentity.PodName,
+               Role:             agentInfo.AgentIdentity.Role,
+               ProfileID:        record.ProfileId,
+               SourceEndpoint:   record.SourceEndpoint,
+               RSSBytes:         record.RssBytes,
+               CgroupLimitBytes: record.CgroupLimitBytes,
+               TriggerPercent:   record.TriggerPercent,
+               ThresholdBytes:   record.ThresholdBytes,
+               Profiles:         profiles,
+       }
+       if record.CapturedAt != nil {
+               agg.CapturedAt = record.CapturedAt.AsTime()
+       }
+
+       a.cacheMu.Lock()
+       agentStaging := a.staging[agentID]
+       if agentStaging == nil {
+               agentStaging = make(map[string]*AggregatedPressureProfile)
+               a.staging[agentID] = agentStaging

Review Comment:
   This can repopulate `staging[agentID]` after `RemoveAgent` has already 
cleared it. If a `Record` message for agent X is being processed here while X's 
connection tears down (`cleanupConnection -> RemoveAgent(X)` on another 
goroutine), the ordering `RemoveAgent` (deletes `staging[X]`) then this write 
(recreates `staging[X]`) leaves an orphaned staging entry. Because X 
disconnected, no `ListComplete` arrives, so `FinalizeAgentList(X)` never runs 
to promote/clear it; and since agent IDs are fresh per reconnect, nothing later 
cleans it. The entry is never served (it's only in `staging`, not `cache`), so 
it's functionally harmless, but it leaks one map per 
disconnect-with-in-flight-record over the proxy's lifetime.
   
   Suggest guarding the write so it doesn't create a staging entry for an agent 
that is no longer registered/connected (the caller in `handlePressurePayload` 
already resolves `agentInfo` via the registry), or having `RemoveAgent` leave a 
short-lived tombstone that `ProcessProfileFromAgent` checks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to