mrproliu commented on code in PR #1200:
URL: 
https://github.com/apache/skywalking-banyandb/pull/1200#discussion_r3510072522


##########
fodc/proxy/internal/pressure/aggregator.go:
##########
@@ -0,0 +1,280 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package pressure aggregates memory-pressure pprof profile metadata from all 
agents.
+package pressure
+
+import (
+       "context"
+       "strings"
+       "sync"
+       "time"
+
+       fodcv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/fodc/v1"
+       "github.com/apache/skywalking-banyandb/fodc/proxy/internal/registry"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+// listTimeout bounds how long CollectProfiles waits for agents to finish 
streaming their
+// metadata. It is a fallback only: the collection normally returns as soon as 
every contacted
+// agent signals completion, well before this elapses.
+const listTimeout = 10 * time.Second
+
+// ProfileInfo is one pprof profile's metadata in the HTTP list response.
+type ProfileInfo struct {
+       Type      string `json:"type"`
+       Filename  string `json:"filename"`
+       Filepath  string `json:"filepath"`
+       Format    string `json:"format"`
+       SizeBytes int64  `json:"size_bytes"`
+}
+
+// AggregatedPressureProfile enriches a capture event with the agent's 
identity.
+type AggregatedPressureProfile struct {
+       CapturedAt       time.Time     `json:"captured_at"`
+       AgentID          string        `json:"agent_id"`
+       PodName          string        `json:"pod_name"`
+       Role             string        `json:"role"`
+       ProfileID        string        `json:"profile_id"`
+       SourceEndpoint   string        `json:"source_endpoint"`
+       Profiles         []ProfileInfo `json:"profiles"`
+       RSSBytes         uint64        `json:"rss_bytes"`
+       CgroupLimitBytes uint64        `json:"cgroup_limit_bytes"`
+       ThresholdBytes   uint64        `json:"threshold_bytes"`
+       TriggerPercent   uint32        `json:"trigger_percent"`
+}
+
+// Filter narrows the listed/collected profiles by role and pod name.
+type Filter struct {
+       Role    string
+       PodName string
+}
+
+// RequestSender drives the agents to stream their profile metadata. 
CollectList sends a
+// list command to every given agent under one request and returns a channel 
closed once all
+// of them have signaled completion.
+type RequestSender interface {
+       CollectList(agentIDs []string) <-chan struct{}
+}
+
+// Aggregator keeps a dedup cache of capture-event metadata nested by agentID 
-> profileID.
+// On each list request it pulls fresh metadata from every matching agent, 
waits a short
+// window, then returns the cache snapshot. The authoritative copy lives on 
the agent disk.
+type Aggregator struct {
+       registry    *registry.AgentRegistry
+       grpcService RequestSender
+       // cache is the live, queryable set: agentID -> profileID -> record.
+       cache map[string]map[string]*AggregatedPressureProfile
+       // staging holds the records of an in-progress list round, same shape 
as cache. A round's
+       // records land here as they stream in and the whole per-agent map is 
promoted to cache in
+       // one swap on ListComplete (FinalizeAgentList), so events the agent 
has evicted from its
+       // disk drop out of the cache instead of lingering as unservable 
entries.
+       staging map[string]map[string]*AggregatedPressureProfile
+       log     *logger.Logger
+       cacheMu sync.RWMutex
+       mu      sync.RWMutex
+}
+
+// NewAggregator creates a new Aggregator instance.
+func NewAggregator(reg *registry.AgentRegistry, grpcService RequestSender, log 
*logger.Logger) *Aggregator {
+       return &Aggregator{
+               registry:    reg,
+               grpcService: grpcService,
+               log:         log,
+               cache:       
make(map[string]map[string]*AggregatedPressureProfile),
+               staging:     
make(map[string]map[string]*AggregatedPressureProfile),
+       }
+}
+
+// SetGRPCService sets the gRPC service used to request profiles from agents.
+func (a *Aggregator) SetGRPCService(grpcService RequestSender) {
+       a.mu.Lock()
+       defer a.mu.Unlock()
+       a.grpcService = grpcService
+}
+
+// RemoveAgent drops all cached events for an agent (called on disconnect).
+func (a *Aggregator) RemoveAgent(agentID string) {
+       a.cacheMu.Lock()
+       defer a.cacheMu.Unlock()
+       delete(a.cache, agentID)
+       delete(a.staging, agentID)
+}
+
+// ProcessProfileFromAgent caches the metadata of one capture event, enriched 
with the
+// agent's pod name and role.
+func (a *Aggregator) ProcessProfileFromAgent(agentID string, agentInfo 
*registry.AgentInfo, record *fodcv1.PressureProfileRecord) {
+       if record == nil {
+               return
+       }
+       profiles := make([]ProfileInfo, 0, len(record.Profiles))
+       for _, p := range record.Profiles {
+               profiles = append(profiles, ProfileInfo{
+                       Type:      p.Type,
+                       Filename:  p.Filename,
+                       Filepath:  p.Filepath,
+                       Format:    p.Format,
+                       SizeBytes: p.SizeBytes,
+               })
+       }
+       agg := &AggregatedPressureProfile{
+               AgentID:          agentID,
+               PodName:          agentInfo.AgentIdentity.PodName,
+               Role:             agentInfo.AgentIdentity.Role,
+               ProfileID:        record.ProfileId,
+               SourceEndpoint:   record.SourceEndpoint,
+               RSSBytes:         record.RssBytes,
+               CgroupLimitBytes: record.CgroupLimitBytes,
+               TriggerPercent:   record.TriggerPercent,
+               ThresholdBytes:   record.ThresholdBytes,
+               Profiles:         profiles,
+       }
+       if record.CapturedAt != nil {
+               agg.CapturedAt = record.CapturedAt.AsTime()
+       }
+
+       a.cacheMu.Lock()
+       agentStaging := a.staging[agentID]
+       if agentStaging == nil {
+               agentStaging = make(map[string]*AggregatedPressureProfile)
+               a.staging[agentID] = agentStaging

Review Comment:
   The staging map has been deleted. So no need to process it again. 



##########
fodc/agent/internal/cmd/root.go:
##########
@@ -139,6 +154,20 @@ func init() {
                "Directory where lifecycle sidecar writes report files")
        rootCmd.Flags().DurationVar(&lifecycleCacheTTL, "lifecycle-cache-ttl", 
10*time.Minute,
                "TTL for cached lifecycle data. After expiry, the next 
collection call refreshes the cache")
+       rootCmd.Flags().BoolVar(&pressureProfilerEnabled, 
"pressure-profiler-enabled", true,
+               "Enable automatic heap+goroutine pprof capture when the 
monitored container's RSS approaches its cgroup memory limit")
+       rootCmd.Flags().IntVar(&pressureTriggerPercent, 
"pressure-profiler-trigger-percent", defaultPressureTriggerPercent,

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to