Copilot commented on code in PR #1323: URL: https://github.com/apache/dubbo-admin/pull/1323#discussion_r2354166615
########## ai/schema/react_schema.go: ########## @@ -0,0 +1,136 @@ +package schema + +import ( + "dubbo-admin-ai/internal/tools" + "encoding/json" + "fmt" + "reflect" + "strings" + + "github.com/firebase/genkit/go/ai" + "github.com/invopop/jsonschema" +) + +// StreamChunk represents streaming status information for ReAct Agent +type StreamChunk struct { + Stage string `json:"stage"` // "think" | "act" Review Comment: The Stage field comment uses string literals to document allowed values. Consider using constants or an enum type to make these values more maintainable and prevent typos. ```suggestion // StageType represents the allowed values for the StreamChunk Stage field. type StageType string const ( StageThink StageType = "think" StageAct StageType = "act" ) // StreamChunk represents streaming status information for ReAct Agent type StreamChunk struct { Stage StageType `json:"stage"` // see StageThink, StageAct ``` ########## ai/plugins/dashscope/dashscope.go: ########## @@ -84,7 +64,7 @@ func (o *DashScope) Init(ctx context.Context, g *genkit.Genkit) error { } if apiKey == "" { - return fmt.Errorf("DashScope plugin initialization failed: apiKey is required") + panic("DashScope plugin initialization failed: apiKey is required") } Review Comment: Using panic() for missing API key is too aggressive. Consider returning an error instead to allow graceful error handling by callers. ########## ai/plugins/siliconflow/siliconflow.go: ########## @@ -77,7 +60,7 @@ func (o *SiliconFlow) Init(ctx context.Context, g *genkit.Genkit) error { } if apiKey == "" { - return fmt.Errorf("siliconflow plugin initialization failed: apiKey is required") + panic("SiliconFlow plugin initialization failed: apiKey is required") } Review Comment: Using panic() for missing API key is too aggressive. Consider returning an error instead to allow graceful error handling by callers. ########## ai/internal/memory/memory.go: ########## @@ -0,0 +1,482 @@ +package memory + +import ( + "strings" + "time" + + "github.com/firebase/genkit/go/ai" +) + +const ( + SystemWindowLimit = 10 + CoreWindowLimit = 20 + ChatWindowLimit = 50 + + // Block size limits + MaxBlockSize = 2048 // Maximum Block character count + DefaultImportance = 5 // Default importance level + MaxImportance = 10 // Maximum importance level +) + +type Memory struct { + SystemWindow *MemoryWindow + CoreWindow *MemoryWindow + ChatWindow *MemoryWindow +} + +type MemoryWindow struct { + Blocks []MemoryBlock + EvictionStrategy EvictionStrategy + WindowSize int +} + +type MemoryBlock interface { + CompileToPrompt() *ai.Prompt + Limit() int + Size() int + Priority() int + UpdatePriority() error + Reserved() bool + Split(maxSize int) ([]MemoryBlock, error) +} + +type EvictionStrategy interface { + OnAccess(window *MemoryWindow) error + OnInsert(window *MemoryWindow) error + Evict(window *MemoryWindow) error +} + +type FIFO struct { +} + +type LRU struct { +} + +// SystemMemoryBlock system memory block +type SystemMemoryBlock struct { + content string + timestamp int64 + priority int + reserved bool +} + +func (s *SystemMemoryBlock) CompileToPrompt() *ai.Prompt { + return nil +} + +func (s *SystemMemoryBlock) Limit() int { + return MaxBlockSize +} + +func (s *SystemMemoryBlock) Size() int { + return len(s.content) +} + +func (s *SystemMemoryBlock) Priority() int { + return s.priority +} + +func (s *SystemMemoryBlock) UpdatePriority() error { + s.timestamp = time.Now().UnixNano() + s.priority = -int(s.timestamp / 1e6) // Convert to milliseconds and negate + return nil +} + +func (s *SystemMemoryBlock) Reserved() bool { + return s.reserved +} + +func (s *SystemMemoryBlock) Split(maxSize int) ([]MemoryBlock, error) { + if s.Size() <= maxSize { + return []MemoryBlock{s}, nil + } + + parts := splitBySemanticBoundary(s.content, maxSize) + blocks := make([]MemoryBlock, len(parts)) + + for i, part := range parts { + blocks[i] = &SystemMemoryBlock{ + content: part, + timestamp: s.timestamp, + priority: s.priority, + reserved: s.reserved, + } + } + + return blocks, nil +} + +// CoreMemoryBlock core memory block +type CoreMemoryBlock struct { + content string + timestamp int64 + priority int + importance int + reserved bool +} + +func (c *CoreMemoryBlock) CompileToPrompt() *ai.Prompt { + // TODO: Need genkit registry to create prompt, return nil for now + return nil +} + +func (c *CoreMemoryBlock) Limit() int { + return MaxBlockSize +} + +func (c *CoreMemoryBlock) Size() int { + return len(c.content) +} + +func (c *CoreMemoryBlock) Priority() int { + return c.priority +} + +func (c *CoreMemoryBlock) UpdatePriority() error { + c.timestamp = time.Now().UnixMilli() + c.priority = calculatePriority(c.timestamp, c.importance) + return nil +} + +func (c *CoreMemoryBlock) Reserved() bool { + return c.reserved +} + +func (c *CoreMemoryBlock) Split(maxSize int) ([]MemoryBlock, error) { + if c.Size() <= maxSize { + return []MemoryBlock{c}, nil + } + + parts := splitBySemanticBoundary(c.content, maxSize) + blocks := make([]MemoryBlock, len(parts)) + + for i, part := range parts { + blocks[i] = &CoreMemoryBlock{ + content: part, + timestamp: c.timestamp, + priority: c.priority, + importance: c.importance, + reserved: c.reserved, + } + } + + return blocks, nil +} + +// ChatHistoryMemoryBlock chat history memory block +type ChatHistoryMemoryBlock struct { + content string + timestamp int64 + priority int + reserved bool +} + +func (ch *ChatHistoryMemoryBlock) CompileToPrompt() *ai.Prompt { + // TODO: Need genkit registry to create prompt, return nil for now + return nil +} + +func (ch *ChatHistoryMemoryBlock) Limit() int { + return MaxBlockSize +} + +func (ch *ChatHistoryMemoryBlock) Size() int { + return len(ch.content) +} + +func (ch *ChatHistoryMemoryBlock) Priority() int { + return ch.priority +} + +func (ch *ChatHistoryMemoryBlock) UpdatePriority() error { + ch.timestamp = time.Now().UnixNano() + ch.priority = -int(ch.timestamp / 1e6) // Convert to milliseconds and negate + return nil +} + +func (ch *ChatHistoryMemoryBlock) Reserved() bool { + return ch.reserved +} + +func (ch *ChatHistoryMemoryBlock) Split(maxSize int) ([]MemoryBlock, error) { + if ch.Size() <= maxSize { + return []MemoryBlock{ch}, nil + } + + parts := splitBySemanticBoundary(ch.content, maxSize) + blocks := make([]MemoryBlock, len(parts)) + + for i, part := range parts { + blocks[i] = &ChatHistoryMemoryBlock{ + content: part, + timestamp: ch.timestamp, + priority: ch.priority, + reserved: ch.reserved, + } + } + + return blocks, nil +} + +// FIFO strategy implementation +func (f *FIFO) OnAccess(window *MemoryWindow) error { + // FIFO does not need to update priority on access + return nil +} + +func (f *FIFO) OnInsert(window *MemoryWindow) error { + if len(window.Blocks) > 0 { + lastBlock := window.Blocks[len(window.Blocks)-1] + err := lastBlock.UpdatePriority() + if err != nil { + return err + } + } + return nil +} + +func (f *FIFO) Evict(window *MemoryWindow) error { + if len(window.Blocks) == 0 { + return nil + } + + minPriority := int(^uint(0) >> 1) // Max int value + evictIndex := -1 + + for i, block := range window.Blocks { + if !block.Reserved() && block.Priority() < minPriority { + minPriority = block.Priority() + evictIndex = i + } + } + + if evictIndex >= 0 { + // Remove found block + window.Blocks = append(window.Blocks[:evictIndex], window.Blocks[evictIndex+1:]...) + } + + return nil +} + +// LRU strategy implementation +func (l *LRU) OnAccess(window *MemoryWindow) error { + // Update priority of recently accessed block + if len(window.Blocks) > 0 { + lastBlock := window.Blocks[len(window.Blocks)-1] + err := lastBlock.UpdatePriority() + if err != nil { + return err + } + } + return nil +} + +func (l *LRU) OnInsert(window *MemoryWindow) error { + if len(window.Blocks) > 0 { + lastBlock := window.Blocks[len(window.Blocks)-1] + err := lastBlock.UpdatePriority() + if err != nil { + return err + } + } + return nil +} + +func (l *LRU) Evict(window *MemoryWindow) error { + if len(window.Blocks) == 0 { + return nil + } + + minPriority := int(^uint(0) >> 1) // Max int value + evictIndex := -1 + + for i, block := range window.Blocks { + if !block.Reserved() && block.Priority() < minPriority { + minPriority = block.Priority() + evictIndex = i + } + } + + if evictIndex >= 0 { + // Remove found block + window.Blocks = append(window.Blocks[:evictIndex], window.Blocks[evictIndex+1:]...) + } + + return nil +} + +// MemoryWindow method implementation +func (mw *MemoryWindow) Insert(block MemoryBlock) error { + // Check if eviction is needed + for mw.NeedsEviction() { + err := mw.EvictionStrategy.Evict(mw) + if err != nil { + return err + } + } + + // Insert new block + mw.Blocks = append(mw.Blocks, block) + + // Trigger insert event + return mw.EvictionStrategy.OnInsert(mw) +} + +func (mw *MemoryWindow) NeedsEviction() bool { + return len(mw.Blocks) >= mw.WindowSize +} + +func (mw *MemoryWindow) FindBlock(predicate func(MemoryBlock) bool) *MemoryBlock { + for i := range mw.Blocks { + if predicate(mw.Blocks[i]) { + // Trigger access event + mw.EvictionStrategy.OnAccess(mw) + return &mw.Blocks[i] + } + } + return nil +} + +// Memory method implementation +func NewMemory() *Memory { + return &Memory{ + SystemWindow: &MemoryWindow{ + Blocks: make([]MemoryBlock, 0), + EvictionStrategy: &FIFO{}, + WindowSize: SystemWindowLimit, + }, + CoreWindow: &MemoryWindow{ + Blocks: make([]MemoryBlock, 0), + EvictionStrategy: &LRU{}, + WindowSize: CoreWindowLimit, + }, + ChatWindow: &MemoryWindow{ + Blocks: make([]MemoryBlock, 0), + EvictionStrategy: &LRU{}, + WindowSize: ChatWindowLimit, + }, + } +} + +func (m *Memory) AddSystemMemory(content string, reserved bool) error { + block := &SystemMemoryBlock{ + content: content, + timestamp: time.Now().UnixNano(), + priority: 0, + reserved: reserved, + } + block.UpdatePriority() + + // Check if splitting is needed + if block.Size() > MaxBlockSize { + blocks, err := block.Split(MaxBlockSize) + if err != nil { + return err + } + for _, b := range blocks { + err := m.SystemWindow.Insert(b) + if err != nil { + return err + } + } + } else { + return m.SystemWindow.Insert(block) + } + + return nil +} + +func (m *Memory) AddCoreMemory(content string, importance int, reserved bool) error { + block := &CoreMemoryBlock{ + content: content, + timestamp: time.Now().UnixNano(), + priority: 0, + importance: importance, + reserved: reserved, + } + block.UpdatePriority() + + // Check if splitting is needed + if block.Size() > MaxBlockSize { + blocks, err := block.Split(MaxBlockSize) + if err != nil { + return err + } + for _, b := range blocks { + err := m.CoreWindow.Insert(b) + if err != nil { + return err + } + } + } else { + return m.CoreWindow.Insert(block) + } + + return nil +} + +func (m *Memory) AddChatHistory(content string) error { + block := &ChatHistoryMemoryBlock{ + content: content, + timestamp: time.Now().UnixNano(), + priority: 0, + reserved: false, + } + block.UpdatePriority() + + // Check if splitting is needed + if block.Size() > MaxBlockSize { + blocks, err := block.Split(MaxBlockSize) + if err != nil { + return err + } + for _, b := range blocks { + err := m.ChatWindow.Insert(b) + if err != nil { + return err + } + } + } else { + return m.ChatWindow.Insert(block) + } + + return nil +} + +func (m *Memory) CompileAllToPrompt() *ai.Prompt { + // TODO: Need genkit registry to create unified prompt, return nil for now + return nil +} + +// Helper function implementation +func splitBySemanticBoundary(content string, maxSize int) []string { + if len(content) <= maxSize { + return []string{content} + } + + var parts []string + sentences := strings.Split(content, ".") + currentPart := "" + + for _, sentence := range sentences { + testPart := currentPart + sentence + "." + if len(testPart) > maxSize && currentPart != "" { + parts = append(parts, strings.TrimSpace(currentPart)) + currentPart = sentence + "." + } else { + currentPart = testPart + } + } + + if currentPart != "" { + parts = append(parts, strings.TrimSpace(currentPart)) + } + + return parts +} + +func calculatePriority(timestamp int64, importance int) int { + timeScore := -int(timestamp / 1e6) // Higher timestamp means higher priority + importanceBonus := importance * 1000000 // Importance bonus + return timeScore + importanceBonus Review Comment: The magic number `1000000` used in importance bonus calculation should be defined as a named constant to improve code maintainability and make the weighting factor explicit. ########## ai/agent/react/react.go: ########## @@ -0,0 +1,217 @@ +package react + +import ( + "context" + "dubbo-admin-ai/agent" + "dubbo-admin-ai/config" + "dubbo-admin-ai/internal/manager" + "dubbo-admin-ai/internal/memory" + "dubbo-admin-ai/internal/tools" + "dubbo-admin-ai/schema" + "fmt" + "os" + + "github.com/firebase/genkit/go/ai" + "github.com/firebase/genkit/go/core" + "github.com/firebase/genkit/go/genkit" +) + +type ThinkIn = schema.ThinkInput +type ThinkOut = schema.ThinkOutput +type ActIn = ThinkOut +type ActOut = schema.ToolOutputs + +// ReActAgent implements Agent interface +type ReActAgent struct { + registry *genkit.Genkit + orchestrator agent.Orchestrator +} + +func Create(g *genkit.Genkit) *ReActAgent { + prompt := BuildThinkPrompt(g) + + // thinkStage := agent.NewStage(think(g, prompt), schema.ThinkInput{}, schema.ThinkOutput{}) + actStage := agent.NewStage(act(g), schema.ThinkOutput{}, schema.ToolOutputs{}) + streamThinkStage := agent.NewStage(StreamThink(g, prompt), schema.ThinkInput{}, schema.ThinkOutput{}) + + orchestrator := agent.NewOrderOrchestrator(streamThinkStage, actStage) + + return &ReActAgent{ + registry: g, + orchestrator: orchestrator, + } +} + +func (ra *ReActAgent) Interact(ctx context.Context, input schema.Schema) (output schema.Schema, err error) { + return ra.orchestrator.Run(ctx, input) +} + +func (ra *ReActAgent) StreamingInteract(ctx context.Context, input schema.Schema, streamCallback core.StreamCallback[schema.StreamChunk]) (output schema.Schema, err error) { + return ra.orchestrator.Run(ctx, input) +} + +func BuildThinkPrompt(registry *genkit.Genkit) ai.Prompt { + // Load system prompt from filesystem + data, err := os.ReadFile(config.PROMPT_DIR_PATH + "/agentSystem.prompt") + if err != nil { + panic(fmt.Errorf("failed to read agentSystem prompt: %w", err)) + } + systemPromptText := string(data) + + // Build and Register ReAct think prompt + mockToolManager := tools.NewMockToolManager(registry) + toolRefs, err := mockToolManager.AllToolRefs() + if err != nil { + panic(fmt.Errorf("failed to get mock mock_tools: %v", err)) + } + return genkit.DefinePrompt(registry, "agentThinking", + ai.WithSystem(systemPromptText), + ai.WithInputType(ThinkIn{}), + ai.WithOutputType(ThinkOut{}), + ai.WithPrompt(schema.UserThinkPromptTemplate), + ai.WithTools(toolRefs...), + ai.WithReturnToolRequests(true), + ) +} + +func streamFunc(cb core.StreamCallback[schema.StreamChunk]) ai.ModelStreamCallback { + return func(ctx context.Context, chunk *ai.ModelResponseChunk) error { + if cb != nil { + return cb(ctx, schema.StreamChunk{ + Chunk: chunk, + }) + } + return nil + } +} + +func StreamThink(g *genkit.Genkit, prompt ai.Prompt) agent.StreamFlow { + return genkit.DefineStreamingFlow(g, agent.ThinkFlowName, + func(ctx context.Context, in schema.Schema, cb core.StreamCallback[schema.StreamChunk]) (out schema.Schema, err error) { + manager.GetLogger().Info("Thinking...", "input", in) + defer func() { + manager.GetLogger().Info("Think Done.", "output", out) + }() + + history, ok := ctx.Value(memory.ChatHistoryKey).(*memory.History) + if !ok { + panic(fmt.Errorf("failed to get history from context")) + } + + var resp *ai.ModelResponse + // ai.WithStreaming() receives ai.ModelStreamCallback type callback function + // This callback function is called when the model generates each raw streaming chunk, used for raw chunk processing + + // The passed cb is user-defined callback function for handling streaming data logic, such as printing + if !history.IsEmpty() { + resp, err = prompt.Execute(ctx, + ai.WithInput(in), + ai.WithMessages(history.AllHistory()...), + ai.WithStreaming(streamFunc(cb)), + ) + } else { + resp, err = prompt.Execute(ctx, + ai.WithInput(in), + ai.WithStreaming(streamFunc(cb)), + ) + } + + // Parse output + var response ThinkOut + err = resp.Output(&response) + + if err != nil { + return out, fmt.Errorf("failed to parse agentThink prompt response: %w", err) + } + + history.AddHistory(resp.History()...) + return response, nil + }) +} + +func think(g *genkit.Genkit, prompt ai.Prompt) agent.NormalFlow { + return genkit.DefineFlow(g, agent.ThinkFlowName, + func(ctx context.Context, in schema.Schema) (out schema.Schema, err error) { + manager.GetLogger().Info("Thinking...", "input", in) + defer func() { + manager.GetLogger().Info("Think Done.", "output", out) + }() + + history, ok := ctx.Value(memory.ChatHistoryKey).(*memory.History) + if !ok { + return out, fmt.Errorf("failed to get history from context") + } + + var resp *ai.ModelResponse + if !history.IsEmpty() { + resp, err = prompt.Execute(ctx, + ai.WithInput(in), + ai.WithMessages(history.AllHistory()...), + ) + } else { + resp, err = prompt.Execute(ctx, + ai.WithInput(in), + ) + } + if err != nil { + return nil, fmt.Errorf("failed to execute agentThink prompt: %w", err) + } + + // Parse output + var response ThinkOut + err = resp.Output(&response) + + if err != nil { + return out, fmt.Errorf("failed to parse agentThink prompt response: %w", err) + } + + history.AddHistory(resp.History()...) + return response, nil + }) +} + +// act: Core logic of the executor +func act(g *genkit.Genkit) agent.NormalFlow { + return genkit.DefineFlow(g, agent.ActFlowName, + func(ctx context.Context, in schema.Schema) (out schema.Schema, err error) { + manager.GetLogger().Info("Acting...", "input", in) + defer func() { + manager.GetLogger().Info("Act Done.", "output", out) + }() + + input, ok := in.(ActIn) + if !ok { + return nil, fmt.Errorf("input is not of type ActIn, got %T", in) + } + + var actOuts ActOut + + // Execute tool calls + history, ok := ctx.Value(memory.ChatHistoryKey).(*memory.History) + if !ok { + panic(fmt.Errorf("failed to get history from context")) Review Comment: Using panic() for context value retrieval failures is too aggressive. Consider returning an error instead to allow graceful error handling. ```suggestion return nil, fmt.Errorf("failed to get history from context") ``` ########## ai/internal/manager/manager.go: ########## @@ -2,62 +2,83 @@ package manager import ( "context" - - "dubbo-admin-ai/internal/config" + "dubbo-admin-ai/config" + "sync" "dubbo-admin-ai/plugins/dashscope" "dubbo-admin-ai/plugins/siliconflow" "dubbo-admin-ai/utils" - "fmt" "log/slog" "os" "path/filepath" "strings" "time" - "github.com/firebase/genkit/go/core/logger" + "github.com/dusted-go/logging/prettylog" "github.com/firebase/genkit/go/genkit" "github.com/firebase/genkit/go/plugins/googlegenai" "github.com/joho/godotenv" "github.com/lmittmann/tint" ) var ( - globalGenkit *genkit.Genkit - rootContext *context.Context - globalLogger *slog.Logger + registry *genkit.Genkit + gloLogger *slog.Logger + once sync.Once ) -func InitGlobalGenkit(defaultModel string) (err error) { - ctx := context.Background() - if rootContext == nil { - rootContext = &ctx +func Init(modelName string, logger *slog.Logger) { + once.Do(func() { + loadEnvVars() + registry = defaultRegistry(modelName) + gloLogger = logger + + if logger == nil { + gloLogger = DevLogger() + } + }) +} + +// Load environment variables from PROJECT_ROOT/.env file +func loadEnvVars() { + dotEnvFilePath := filepath.Join(config.PROJECT_ROOT, ".env") + dotEnvExampleFilePath := filepath.Join(config.PROJECT_ROOT, ".env.example") + + // Check if the .env file exists, if not, copy .env.example to .env + if _, err := os.Stat(dotEnvFilePath); os.IsNotExist(err) { + if err = utils.CopyFile(dotEnvExampleFilePath, dotEnvFilePath); err != nil { + panic(err) + } } Review Comment: Using panic() in loadEnvVars function is too aggressive for file operations. Consider returning an error to allow graceful handling of missing or inaccessible files. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@dubbo.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@dubbo.apache.org For additional commands, e-mail: notifications-h...@dubbo.apache.org