Chen-BUPT commented on code in PR #878:
URL: https://github.com/apache/dubbo-go-pixiu/pull/878#discussion_r2777080782
##########
pkg/filter/ai/kvcache/handlers.go:
##########
@@ -0,0 +1,183 @@
+package kvcache
+
+import (
+ "bytes"
+ "context"
+ "encoding/json"
+ "io"
+ "net/http"
+ "strings"
+ "time"
+)
+
+import (
+ "github.com/apache/dubbo-go-pixiu/pkg/common/constant"
+ contexthttp "github.com/apache/dubbo-go-pixiu/pkg/context/http"
+ "github.com/apache/dubbo-go-pixiu/pkg/logger"
+)
+
+func (f *Filter) manageCache(ctx context.Context, model string, prompt string,
rawBody []byte, cacheStatus *LookupResponse, lookupDone bool) {
+ if ctx.Err() != nil {
+ return
+ }
+ tokens, err := f.tokenManager.GetTokens(ctx, model, prompt, rawBody)
+ if err != nil {
+ logger.Warnf("[KVCache] tokenize failed: %v", err)
+ return
+ }
+ if ctx.Err() != nil {
+ return
+ }
+ if !lookupDone || cacheStatus == nil {
+ cacheStatus, err = f.lmcacheClient.Lookup(ctx,
&LookupRequest{Tokens: tokens})
+ if err != nil {
+ logger.Warnf("[KVCache] lookup failed: %v", err)
+ return
+ }
+ }
+ decision := f.cacheStrategy.MakeDecision(ctx, cacheStatus, model,
prompt)
+ if ctx.Err() != nil {
+ return
+ }
+ if err := f.cacheStrategy.ExecuteDecision(ctx, decision, tokens); err
!= nil {
+ logger.Warnf("[KVCache] execute strategy failed: %v", err)
+ }
+}
+
+func readRequestBody(req *http.Request) ([]byte, error) {
+ if req == nil || req.Body == nil {
+ return nil, nil
+ }
+ if req.GetBody != nil {
+ reader, err := req.GetBody()
+ if err == nil {
+ defer reader.Close()
+ return io.ReadAll(reader)
+ }
+ }
+ bodyBytes, err := io.ReadAll(req.Body)
+ if err != nil {
+ return nil, err
+ }
+ req.Body.Close()
+ req.Body = io.NopCloser(bytes.NewReader(bodyBytes))
+ req.GetBody = func() (io.ReadCloser, error) {
+ return io.NopCloser(bytes.NewReader(bodyBytes)), nil
+ }
+ return bodyBytes, nil
+}
+
+func extractPromptAndModel(body []byte) (string, string, error) {
+ if len(body) == 0 {
+ return "", "", nil
+ }
+ var payload map[string]any
+ if err := json.Unmarshal(body, &payload); err != nil {
+ return "", "", err
+ }
+ model, _ := payload["model"].(string)
+ prompt := coercePrompt(payload["prompt"])
+ if prompt == "" {
+ prompt = extractPromptFromMessages(payload["messages"])
+ }
+ return strings.TrimSpace(prompt), model, nil
+}
+
+func coercePrompt(value any) string {
+ switch v := value.(type) {
+ case string:
+ return v
+ case []any:
+ parts := make([]string, 0, len(v))
+ for _, item := range v {
+ if str, ok := item.(string); ok {
+ parts = append(parts, str)
+ }
+ }
+ return strings.Join(parts, "\n")
+ default:
+ return ""
+ }
+}
+
+func extractPromptFromMessages(value any) string {
+ msgs, ok := value.([]any)
+ if !ok {
+ return ""
+ }
+ parts := make([]string, 0, len(msgs))
+ for _, msg := range msgs {
+ msgMap, ok := msg.(map[string]any)
+ if !ok {
+ continue
+ }
+ if content, ok := msgMap["content"].(string); ok {
+ parts = append(parts, content)
+ }
+ }
+ return strings.Join(parts, "\n")
+}
+
+func selectPreferredInstanceID(resp *LookupResponse) string {
+ if resp == nil || len(resp.LayoutInfo) == 0 {
+ return ""
+ }
+ var (
+ selected string
+ maxCount int
+ )
+ for instanceID, layout := range resp.LayoutInfo {
+ if layout.TokenCount > maxCount || selected == "" {
+ selected = instanceID
+ maxCount = layout.TokenCount
+ }
+ }
+ return selected
+}
+
+func effectiveTimeout(hc *contexthttp.HttpContext, cfg *Config) time.Duration {
+ if cfg == nil {
+ return 0
+ }
+ timeout := cfg.RequestTimeout
+ if hc != nil && hc.Timeout > 0 && (timeout <= 0 || hc.Timeout <
timeout) {
+ timeout = hc.Timeout
+ }
+ if timeout <= 0 {
+ return 2 * time.Second
+ }
+ return timeout
+}
+
+func (f *Filter) tryRouteToCachedInstance(hc *contexthttp.HttpContext, model
string, prompt string) (*LookupResponse, bool) {
+ if f == nil || f.tokenManager == nil || f.lmcacheClient == nil {
+ return nil, false
+ }
+ tokens, ok := f.tokenManager.GetCachedTokens(model, prompt)
+ if !ok || len(tokens) == 0 {
+ logger.Debugf("[KVCache] routing lookup skipped: token cache
miss")
+ return nil, false
+ }
+ timeout := effectiveTimeout(hc, f.cfg)
+ if f.cfg != nil && f.cfg.LookupRoutingTimeout > 0 &&
f.cfg.LookupRoutingTimeout < timeout {
+ timeout = f.cfg.LookupRoutingTimeout
+ }
+ ctx, cancel := context.WithTimeout(hc.Ctx, timeout)
Review Comment:
fixed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]