GitHub user AlexStocks created a discussion: LLM 推理 KV Cache 分布式缓存方案
## 概述
本文档描述如何在 Dubbo-Go-Pixiu AI 网关中实现 LLM 推理的 KV Cache 分布式缓存功能,通过将 KV Cache 卸载到
Redis 集群,实现无状态 LLM 推理架构,大幅降低推理成本。
## 背景与痛点
### 当前 LLM 推理架构的局限性
1. **状态耦合问题**
- KV Cache 绑定在特定推理服务器的 GPU 显存或本地内存
- 多轮对话必须路由到同一服务器才能命中缓存
- 缓存未命中需要重新执行预填充(Prefill),成本高达 10 倍
2. **弹性扩展受限**
- 服务器宕机导致缓存丢失
- 无法按需水平扩展推理实例
- 负载均衡需要感知缓存分布,调度复杂度高
3. **资源利用率低**
- 需要预留缓存空间避免冲突
- 服务器无法满负载运行
- GPU 资源浪费
### 解决方案:无状态 LLM 推理架构
借鉴互联网后端架构演进经验,将 KV Cache 从推理实例中剥离,卸载到分布式存储(Redis 集群),实现:
- **无状态推理服务**:任意实例可处理任意请求
- **100% 缓存命中率**:多轮对话共享全局 KV Cache
- **弹性扩展**:按需增减推理实例,无状态迁移
- **成本降低**:避免预填充重计算,降本 50%+
---
## 架构设计
### 整体架构图
```
┌──────────────────────────────────────────────────────────────┐
│ AI Client Applications │
└────────────────────────────┬─────────────────────────────────┘
│
┌────────────────────────────▼─────────────────────────────────┐
│ Pixiu AI Gateway (Stateless) │
│ ┌────────────────────────────────────────────────────────┐ │
│ │ AI KV Cache Filter (dgp.filter.ai.kvcache) │ │
│ │ - Cache Key Generation │ │
│ │ - KV Cache Offload/Load │ │
│ │ - Cache Hit/Miss Handling │ │
│ └────────────────────────────────────────────────────────┘ │
└────────────────────────────┬─────────────────────────────────┘
│
┌────────────────────┼────────────────────┐
│ │ │
▼ ▼ ▼
┌───────────────┐ ┌───────────────┐ ┌───────────────┐
│Redis Cluster │ │Redis Cluster │ │Redis Cluster │
│ Node 1 │ │ Node 2 │ │ Node 3 │
│(Master+Slave) │ │(Master+Slave) │ │(Master+Slave) │
└───────┬───────┘ └───────┬───────┘ └───────┬───────┘
│ │ │
└────────────────────┼────────────────────┘
│
┌────────────────────┼────────────────────┐
│ │ │
▼ ▼ ▼
┌───────────────┐ ┌───────────────┐ ┌───────────────┐
│LLM Inference │ │LLM Inference │ │LLM Inference │
│ Instance 1 │ │ Instance 2 │ │ Instance N │
│ (SGLang/vLLM) │ │ (SGLang/vLLM) │ │ (SGLang/vLLM) │
└───────────────┘ └───────────────┘ └───────────────┘
```
### 核心组件
#### 1. AI KV Cache Filter
位于 Pixiu 网关的请求处理链路中,负责:
- **缓存键生成**:基于会话 ID、模型名、Prompt Hash 生成唯一缓存键
- **KV Cache 卸载**:推理完成后,将 KV Cache 存入 Redis 集群
- **KV Cache 加载**:推理前,从 Redis 集群读取历史 KV Cache
- **缓存淘汰**:支持 TTL、LRU、容量上限等淘汰策略
- **性能优化**:零拷贝、批量操作、异步写入
#### 2. Redis 集群
- **部署模式**:Redis Cluster(主从复制 + 分片)
- **数据结构**:使用 String 类型存储序列化的 KV Cache
- **持久化**:RDB + AOF 混合持久化,防止数据丢失
- **高可用**:Sentinel 监控,自动故障转移
#### 3. LLM 推理实例(SGLang/vLLM)
- **状态剥离**:不维护本地 KV Cache,完全依赖 Redis
- **接口扩展**:支持接收外部 KV Cache 输入
- **兼容性**:适配 SGLang RadixAttention、vLLM PagedAttention
---
## 技术实现
### 3.1 目录结构
```
pkg/filter/ai/kvcache/
├── kvcache.go # Filter 主逻辑
├── cache_manager.go # 缓存管理器
├── redis_backend.go # Redis 后端实现
├── cache_key.go # 缓存键生成器
├── serializer.go # KV Cache 序列化/反序列化
├── metrics.go # 指标采集
└── config.go # 配置定义
```
### 3.2 核心接口设计
#### KVCacheManager Interface
```go
// pkg/filter/ai/kvcache/cache_manager.go
package kvcache
import (
"context"
"time"
)
// KVCacheManager 管理 LLM 推理的 KV Cache
type KVCacheManager interface {
// Get 获取 KV Cache
Get(ctx context.Context, key string) (*KVCache, error)
// Set 存储 KV Cache
Set(ctx context.Context, key string, cache *KVCache, ttl time.Duration)
error
// Delete 删除 KV Cache
Delete(ctx context.Context, key string) error
// Exists 检查 KV Cache 是否存在
Exists(ctx context.Context, key string) (bool, error)
// GetMulti 批量获取 KV Cache
GetMulti(ctx context.Context, keys []string) (map[string]*KVCache,
error)
// SetMulti 批量存储 KV Cache
SetMulti(ctx context.Context, caches map[string]*KVCache, ttl
time.Duration) error
// Stats 返回缓存统计信息
Stats() CacheStats
// Close 关闭缓存管理器
Close() error
}
// KVCache 表示 LLM 推理的 KV Cache 数据
type KVCache struct {
SessionID string // 会话 ID
ModelName string // 模型名称
PromptHash string // Prompt 的 Hash
Keys []byte // K Cache(序列化后的字节流)
Values []byte // V Cache(序列化后的字节流)
TokenCount int // Token 数量
LayerCount int // 层数
HeadCount int // 注意力头数
HiddenSize int // 隐藏层大小
CreatedAt time.Time // 创建时间
AccessedAt time.Time // 最后访问时间
Metadata map[string]string // 扩展元数据
}
// CacheStats 缓存统计信息
type CacheStats struct {
HitCount int64 // 缓存命中次数
MissCount int64 // 缓存未命中次数
HitRatio float64 // 缓存命中率
TotalKeys int64 // 总缓存键数
TotalSizeBytes int64 // 总缓存大小(字节)
AvgGetLatencyMs float64 // 平均读取延迟(毫秒)
AvgSetLatencyMs float64 // 平均写入延迟(毫秒)
}
```
#### Redis Backend Implementation
```go
// pkg/filter/ai/kvcache/redis_backend.go
package kvcache
import (
"context"
"encoding/json"
"fmt"
"sync/atomic"
"time"
"github.com/go-redis/redis/v8"
)
// RedisKVCacheManager Redis 实现的 KV Cache 管理器
type RedisKVCacheManager struct {
client redis.UniversalClient
keyPrefix string
// 统计信息
hitCount int64
missCount int64
// 性能指标
getTotalLatency int64 // 纳秒
setTotalLatency int64 // 纳秒
getCount int64
setCount int64
}
// NewRedisKVCacheManager 创建 Redis KV Cache 管理器
func NewRedisKVCacheManager(config *RedisConfig) (*RedisKVCacheManager, error) {
var client redis.UniversalClient
if config.ClusterMode {
// Redis Cluster 模式
client = redis.NewClusterClient(&redis.ClusterOptions{
Addrs: config.Addrs,
Password: config.Password,
PoolSize: config.PoolSize,
MinIdleConns: config.MinIdleConns,
DialTimeout: config.DialTimeout,
ReadTimeout: config.ReadTimeout,
WriteTimeout: config.WriteTimeout,
})
} else {
// Redis 单机/Sentinel 模式
client = redis.NewClient(&redis.Options{
Addr: config.Addrs[0],
Password: config.Password,
DB: config.DB,
PoolSize: config.PoolSize,
MinIdleConns: config.MinIdleConns,
DialTimeout: config.DialTimeout,
ReadTimeout: config.ReadTimeout,
WriteTimeout: config.WriteTimeout,
})
}
// 测试连接
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := client.Ping(ctx).Err(); err != nil {
return nil, fmt.Errorf("failed to connect to Redis: %w", err)
}
return &RedisKVCacheManager{
client: client,
keyPrefix: config.KeyPrefix,
}, nil
}
// Get 获取 KV Cache
func (m *RedisKVCacheManager) Get(ctx context.Context, key string) (*KVCache,
error) {
start := time.Now()
defer func() {
atomic.AddInt64(&m.getTotalLatency,
time.Since(start).Nanoseconds())
atomic.AddInt64(&m.getCount, 1)
}()
redisKey := m.makeRedisKey(key)
data, err := m.client.Get(ctx, redisKey).Bytes()
if err == redis.Nil {
atomic.AddInt64(&m.missCount, 1)
return nil, ErrCacheNotFound
}
if err != nil {
return nil, fmt.Errorf("failed to get cache from Redis: %w",
err)
}
// 反序列化
var cache KVCache
if err := json.Unmarshal(data, &cache); err != nil {
return nil, fmt.Errorf("failed to unmarshal cache: %w", err)
}
// 更新访问时间
cache.AccessedAt = time.Now()
atomic.AddInt64(&m.hitCount, 1)
return &cache, nil
}
// Set 存储 KV Cache
func (m *RedisKVCacheManager) Set(ctx context.Context, key string, cache
*KVCache, ttl time.Duration) error {
start := time.Now()
defer func() {
atomic.AddInt64(&m.setTotalLatency,
time.Since(start).Nanoseconds())
atomic.AddInt64(&m.setCount, 1)
}()
// 序列化
data, err := json.Marshal(cache)
if err != nil {
return fmt.Errorf("failed to marshal cache: %w", err)
}
redisKey := m.makeRedisKey(key)
if err := m.client.Set(ctx, redisKey, data, ttl).Err(); err != nil {
return fmt.Errorf("failed to set cache to Redis: %w", err)
}
return nil
}
// GetMulti 批量获取 KV Cache
func (m *RedisKVCacheManager) GetMulti(ctx context.Context, keys []string)
(map[string]*KVCache, error) {
if len(keys) == 0 {
return make(map[string]*KVCache), nil
}
// 构建 Redis 键列表
redisKeys := make([]string, len(keys))
for i, key := range keys {
redisKeys[i] = m.makeRedisKey(key)
}
// 使用 Pipeline 批量获取
pipe := m.client.Pipeline()
cmds := make([]*redis.StringCmd, len(redisKeys))
for i, redisKey := range redisKeys {
cmds[i] = pipe.Get(ctx, redisKey)
}
if _, err := pipe.Exec(ctx); err != nil && err != redis.Nil {
// 忽略 redis.Nil 错误(部分键不存在是正常的)
if !isRedisNilError(err) {
return nil, fmt.Errorf("failed to execute pipeline:
%w", err)
}
}
// 解析结果
result := make(map[string]*KVCache)
for i, cmd := range cmds {
data, err := cmd.Bytes()
if err == redis.Nil {
atomic.AddInt64(&m.missCount, 1)
continue
}
if err != nil {
continue // 跳过错误的键
}
var cache KVCache
if err := json.Unmarshal(data, &cache); err != nil {
continue // 跳过反序列化失败的键
}
cache.AccessedAt = time.Now()
result[keys[i]] = &cache
atomic.AddInt64(&m.hitCount, 1)
}
return result, nil
}
// Stats 返回缓存统计信息
func (m *RedisKVCacheManager) Stats() CacheStats {
hitCount := atomic.LoadInt64(&m.hitCount)
missCount := atomic.LoadInt64(&m.missCount)
totalRequests := hitCount + missCount
var hitRatio float64
if totalRequests > 0 {
hitRatio = float64(hitCount) / float64(totalRequests)
}
getCount := atomic.LoadInt64(&m.getCount)
setCount := atomic.LoadInt64(&m.setCount)
getTotalLatency := atomic.LoadInt64(&m.getTotalLatency)
setTotalLatency := atomic.LoadInt64(&m.setTotalLatency)
var avgGetLatencyMs, avgSetLatencyMs float64
if getCount > 0 {
avgGetLatencyMs = float64(getTotalLatency) / float64(getCount)
/ 1e6
}
if setCount > 0 {
avgSetLatencyMs = float64(setTotalLatency) / float64(setCount)
/ 1e6
}
return CacheStats{
HitCount: hitCount,
MissCount: missCount,
HitRatio: hitRatio,
AvgGetLatencyMs: avgGetLatencyMs,
AvgSetLatencyMs: avgSetLatencyMs,
}
}
// makeRedisKey 生成 Redis 键
func (m *RedisKVCacheManager) makeRedisKey(key string) string {
return fmt.Sprintf("%s:%s", m.keyPrefix, key)
}
// Close 关闭连接
func (m *RedisKVCacheManager) Close() error {
return m.client.Close()
}
var ErrCacheNotFound = fmt.Errorf("cache not found")
func isRedisNilError(err error) bool {
return err == redis.Nil
}
```
#### Cache Key Generator
```go
// pkg/filter/ai/kvcache/cache_key.go
package kvcache
import (
"crypto/sha256"
"encoding/hex"
"fmt"
"sort"
"strings"
)
// CacheKeyGenerator 缓存键生成器
type CacheKeyGenerator struct {
includeModel bool
includeSessionID bool
includePrompt bool
includeParams bool
}
// NewCacheKeyGenerator 创建缓存键生成器
func NewCacheKeyGenerator(config *CacheKeyConfig) *CacheKeyGenerator {
return &CacheKeyGenerator{
includeModel: config.IncludeModel,
includeSessionID: config.IncludeSessionID,
includePrompt: config.IncludePrompt,
includeParams: config.IncludeParams,
}
}
// Generate 生成缓存键
func (g *CacheKeyGenerator) Generate(req *CacheKeyRequest) string {
var parts []string
// 1. Session ID(必须)
if g.includeSessionID && req.SessionID != "" {
parts = append(parts, fmt.Sprintf("sid:%s", req.SessionID))
}
// 2. Model Name
if g.includeModel && req.ModelName != "" {
parts = append(parts, fmt.Sprintf("model:%s", req.ModelName))
}
// 3. Prompt Hash
if g.includePrompt && req.Prompt != "" {
promptHash := hashString(req.Prompt)
parts = append(parts, fmt.Sprintf("prompt:%s", promptHash))
}
// 4. Parameters Hash(可选)
if g.includeParams && len(req.Parameters) > 0 {
paramsHash := hashParams(req.Parameters)
parts = append(parts, fmt.Sprintf("params:%s", paramsHash))
}
// 5. Round Number(多轮对话的轮次)
if req.RoundNumber > 0 {
parts = append(parts, fmt.Sprintf("round:%d", req.RoundNumber))
}
return strings.Join(parts, ":")
}
// CacheKeyRequest 缓存键生成请求
type CacheKeyRequest struct {
SessionID string // 会话 ID
ModelName string // 模型名称
Prompt string // 当前轮次的 Prompt
Parameters map[string]any // 推理参数(temperature, top_p 等)
RoundNumber int // 多轮对话的轮次(从 1 开始)
}
// hashString 对字符串进行 SHA256 哈希
func hashString(s string) string {
hash := sha256.Sum256([]byte(s))
return hex.EncodeToString(hash[:])[:16] // 取前 16 位
}
// hashParams 对参数 map 进行哈希
func hashParams(params map[string]any) string {
// 排序键以保证稳定性
keys := make([]string, 0, len(params))
for k := range params {
keys = append(keys, k)
}
sort.Strings(keys)
// 构建字符串
var sb strings.Builder
for _, k := range keys {
sb.WriteString(fmt.Sprintf("%s=%v;", k, params[k]))
}
return hashString(sb.String())
}
```
#### Filter Implementation
```go
// pkg/filter/ai/kvcache/kvcache.go
package kvcache
import (
"context"
"encoding/json"
"time"
"github.com/apache/dubbo-go-pixiu/pkg/common/constant"
"github.com/apache/dubbo-go-pixiu/pkg/common/extension/filter"
contexthttp "github.com/apache/dubbo-go-pixiu/pkg/context/http"
"github.com/apache/dubbo-go-pixiu/pkg/logger"
)
const (
Kind = constant.AIKVCacheFilter
)
func init() {
filter.RegisterHttpFilter(&Plugin{})
}
type (
Plugin struct{}
FilterFactory struct {
config *Config
cacheManager KVCacheManager
keyGenerator *CacheKeyGenerator
}
Filter struct {
factory *FilterFactory
}
Config struct {
Enabled bool `yaml:"enabled" json:"enabled"`
Backend string `yaml:"backend" json:"backend"`
// "redis" or "memory"
RedisConfig *RedisConfig `yaml:"redis" json:"redis"`
CacheKeyConfig *CacheKeyConfig `yaml:"cache_key"
json:"cache_key"`
DefaultTTL time.Duration `yaml:"default_ttl"
json:"default_ttl"`
EnableMetrics bool `yaml:"enable_metrics"
json:"enable_metrics"`
TargetModels []string `yaml:"target_models"
json:"target_models"` // 只对这些模型启用缓存
}
RedisConfig struct {
ClusterMode bool `yaml:"cluster_mode"
json:"cluster_mode"`
Addrs []string `yaml:"addrs" json:"addrs"`
Password string `yaml:"password" json:"password"`
DB int `yaml:"db" json:"db"`
KeyPrefix string `yaml:"key_prefix" json:"key_prefix"`
PoolSize int `yaml:"pool_size" json:"pool_size"`
MinIdleConns int `yaml:"min_idle_conns"
json:"min_idle_conns"`
DialTimeout time.Duration `yaml:"dial_timeout"
json:"dial_timeout"`
ReadTimeout time.Duration `yaml:"read_timeout"
json:"read_timeout"`
WriteTimeout time.Duration `yaml:"write_timeout"
json:"write_timeout"`
}
CacheKeyConfig struct {
IncludeModel bool `yaml:"include_model"
json:"include_model"`
IncludeSessionID bool `yaml:"include_session_id"
json:"include_session_id"`
IncludePrompt bool `yaml:"include_prompt"
json:"include_prompt"`
IncludeParams bool `yaml:"include_params"
json:"include_params"`
}
)
func (p *Plugin) Kind() string {
return Kind
}
func (p *Plugin) CreateFilterFactory() (filter.HttpFilterFactory, error) {
return &FilterFactory{
config: &Config{
Enabled: false,
Backend: "redis",
DefaultTTL: 3600 * time.Second,
},
}, nil
}
func (f *FilterFactory) Config() any {
return f.config
}
func (f *FilterFactory) Apply() error {
if !f.config.Enabled {
logger.Info("[KVCache] Filter is disabled")
return nil
}
// 初始化缓存管理器
var err error
switch f.config.Backend {
case "redis":
f.cacheManager, err =
NewRedisKVCacheManager(f.config.RedisConfig)
if err != nil {
return err
}
default:
return fmt.Errorf("unsupported cache backend: %s",
f.config.Backend)
}
// 初始化缓存键生成器
f.keyGenerator = NewCacheKeyGenerator(f.config.CacheKeyConfig)
logger.Infof("[KVCache] Filter initialized with backend: %s",
f.config.Backend)
return nil
}
func (f *FilterFactory) PrepareFilterChain(ctx *contexthttp.HttpContext, chain
filter.FilterChain) error {
if !f.config.Enabled {
return nil
}
kvcFilter := &Filter{factory: f}
chain.AppendDecodeFilters(kvcFilter)
chain.AppendEncodeFilters(kvcFilter)
return nil
}
// Decode 在请求阶段尝试加载 KV Cache
func (f *Filter) Decode(ctx *contexthttp.HttpContext) filter.FilterStatus {
// 检查是否是目标模型
modelName := extractModelName(ctx)
if !f.factory.isTargetModel(modelName) {
return filter.Continue
}
// 提取会话信息
sessionID := extractSessionID(ctx)
if sessionID == "" {
// 没有会话 ID,跳过缓存
return filter.Continue
}
// 生成缓存键
cacheKey := f.factory.keyGenerator.Generate(&CacheKeyRequest{
SessionID: sessionID,
ModelName: modelName,
Prompt: extractPrompt(ctx),
Parameters: extractParameters(ctx),
RoundNumber: extractRoundNumber(ctx),
})
// 尝试加载 KV Cache
reqCtx := ctx.Request.Context()
cache, err := f.factory.cacheManager.Get(reqCtx, cacheKey)
if err == nil && cache != nil {
// 缓存命中!将 KV Cache 注入到请求中
injectKVCache(ctx, cache)
logger.Infof("[KVCache] Cache HIT for session %s, key: %s",
sessionID, cacheKey)
// 记录指标
recordCacheHit(ctx, modelName)
} else {
// 缓存未命中
logger.Infof("[KVCache] Cache MISS for session %s, key: %s",
sessionID, cacheKey)
recordCacheMiss(ctx, modelName)
}
// 保存缓存键到上下文,供 Encode 阶段使用
ctx.Params["kv_cache_key"] = cacheKey
return filter.Continue
}
// Encode 在响应阶段保存 KV Cache
func (f *Filter) Encode(ctx *contexthttp.HttpContext) filter.FilterStatus {
// 提取缓存键
cacheKeyObj, ok := ctx.Params["kv_cache_key"]
if !ok {
return filter.Continue
}
cacheKey, ok := cacheKeyObj.(string)
if !ok || cacheKey == "" {
return filter.Continue
}
// 从响应中提取 KV Cache
kvCache := extractKVCacheFromResponse(ctx)
if kvCache == nil {
return filter.Continue
}
// 异步保存 KV Cache(不阻塞响应)
go func() {
saveCtx, cancel := context.WithTimeout(context.Background(),
5*time.Second)
defer cancel()
if err := f.factory.cacheManager.Set(saveCtx, cacheKey,
kvCache, f.factory.config.DefaultTTL); err != nil {
logger.Errorf("[KVCache] Failed to save cache for key
%s: %v", cacheKey, err)
} else {
logger.Infof("[KVCache] Cache saved for key: %s, size:
%d bytes", cacheKey, len(kvCache.Keys)+len(kvCache.Values))
}
}()
return filter.Continue
}
// 辅助函数
func (f *FilterFactory) isTargetModel(modelName string) bool {
if len(f.config.TargetModels) == 0 {
return true // 空列表表示所有模型
}
for _, target := range f.config.TargetModels {
if target == modelName {
return true
}
}
return false
}
func extractModelName(ctx *contexthttp.HttpContext) string {
// 从请求中提取模型名称
// 实现取决于具体的 API 格式
return ""
}
func extractSessionID(ctx *contexthttp.HttpContext) string {
// 从请求头或请求体中提取会话 ID
return ""
}
func extractPrompt(ctx *contexthttp.HttpContext) string {
// 从请求体中提取 Prompt
return ""
}
func extractParameters(ctx *contexthttp.HttpContext) map[string]any {
// 从请求体中提取推理参数
return nil
}
func extractRoundNumber(ctx *contexthttp.HttpContext) int {
// 从请求中提取对话轮次
return 0
}
func injectKVCache(ctx *contexthttp.HttpContext, cache *KVCache) {
// 将 KV Cache 注入到转发给 LLM 推理服务的请求中
// 具体实现取决于推理服务的 API 格式
}
func extractKVCacheFromResponse(ctx *contexthttp.HttpContext) *KVCache {
// 从 LLM 推理服务的响应中提取 KV Cache
// 具体实现取决于推理服务的响应格式
return nil
}
func recordCacheHit(ctx *contexthttp.HttpContext, modelName string) {
// 记录缓存命中指标
}
func recordCacheMiss(ctx *contexthttp.HttpContext, modelName string) {
// 记录缓存未命中指标
}
```
---
## 配置示例
### 完整配置
```yaml
# configs/ai_kvcache_config.yaml
static_resources:
listeners:
- name: "ai-gateway-listener"
protocol: HTTP
address:
socket_address:
address: "0.0.0.0"
port: 8888
filter_chains:
- filters:
# 1. KV Cache Filter(放在 LLM Proxy 之前)
- name: dgp.filter.ai.kvcache
config:
enabled: true
backend: redis
redis:
cluster_mode: true
addrs:
- "redis-node-1:6379"
- "redis-node-2:6379"
- "redis-node-3:6379"
password: "your_redis_password"
key_prefix: "pixiu:kvcache"
pool_size: 100
min_idle_conns: 10
dial_timeout: 5s
read_timeout: 3s
write_timeout: 3s
cache_key:
include_model: true
include_session_id: true
include_prompt: true
include_params: false
default_ttl: 3600s # 1 小时
enable_metrics: true
target_models:
- "gpt-4"
- "gpt-3.5-turbo"
- "deepseek-v2"
# 2. LLM Tokenizer(成本计算)
- name: dgp.filter.llm.tokenizer
config:
enable_cost_tracking: true
# 3. LLM Proxy(转发到推理服务)
- name: dgp.filter.llm.proxy
config:
timeout: 30s
clusters:
- name: sglang_cluster
lb_policy: round_robin
endpoints:
- id: sglang-1
socket_address:
address: "sglang-service-1"
port: 8000
- id: sglang-2
socket_address:
address: "sglang-service-2"
port: 8000
# Redis Cluster 部署配置(参考)
redis_cluster:
nodes:
- host: redis-node-1
port: 6379
role: master
- host: redis-node-1-slave
port: 6379
role: slave
- host: redis-node-2
port: 6379
role: master
- host: redis-node-2-slave
port: 6379
role: slave
- host: redis-node-3
port: 6379
role: master
- host: redis-node-3-slave
port: 6379
role: slave
persistence:
rdb:
enabled: true
save: "900 1 300 10 60 10000"
aof:
enabled: true
appendfsync: everysec
memory:
maxmemory: 16gb
maxmemory_policy: allkeys-lru
```
---
## 性能优化策略
### 1. 零拷贝传输
```go
// 使用 io.Writer 接口直接写入 Redis,避免中间缓冲
func (m *RedisKVCacheManager) SetStream(ctx context.Context, key string, reader
io.Reader, ttl time.Duration) error {
// 使用 Redis Streams 或直接写入
// 避免完整加载到内存
}
```
### 2. 批量操作
```go
// 使用 Pipeline 批量读写
func (m *RedisKVCacheManager) SetMulti(ctx context.Context, caches
map[string]*KVCache, ttl time.Duration) error {
pipe := m.client.Pipeline()
for key, cache := range caches {
data, _ := json.Marshal(cache)
pipe.Set(ctx, m.makeRedisKey(key), data, ttl)
}
_, err := pipe.Exec(ctx)
return err
}
```
### 3. 压缩存储
```go
import "github.com/klauspost/compress/zstd"
// 压缩 KV Cache 以减少存储和传输开销
func compressKVCache(data []byte) ([]byte, error) {
encoder, err := zstd.NewWriter(nil)
if err != nil {
return nil, err
}
return encoder.EncodeAll(data, make([]byte, 0, len(data))), nil
}
```
### 4. 连接池优化
```yaml
redis:
pool_size: 100 # 根据并发量调整
min_idle_conns: 20 # 预热连接
max_retries: 3 # 重试次数
pool_timeout: 4s # 连接池超时
```
---
## 监控指标
### Prometheus Metrics
```go
// pkg/filter/ai/kvcache/metrics.go
var (
kvCacheHitTotal = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "pixiu_ai_kvcache_hit_total",
Help: "Total number of KV cache hits",
},
[]string{"model", "session_id"},
)
kvCacheMissTotal = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "pixiu_ai_kvcache_miss_total",
Help: "Total number of KV cache misses",
},
[]string{"model", "session_id"},
)
kvCacheHitRatio = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "pixiu_ai_kvcache_hit_ratio",
Help: "KV cache hit ratio",
},
[]string{"model"},
)
kvCacheSizeBytes = promauto.NewHistogramVec(
prometheus.HistogramOpts{
Name: "pixiu_ai_kvcache_size_bytes",
Help: "Size of KV cache in bytes",
Buckets: []float64{1024, 10240, 102400, 1048576,
10485760, 104857600}, // 1KB ~ 100MB
},
[]string{"model"},
)
kvCacheGetLatency = promauto.NewHistogramVec(
prometheus.HistogramOpts{
Name: "pixiu_ai_kvcache_get_latency_seconds",
Help: "Latency of KV cache get operations",
Buckets: prometheus.ExponentialBuckets(0.001, 2, 10),
// 1ms ~ 1s
},
[]string{"model"},
)
kvCacheSetLatency = promauto.NewHistogramVec(
prometheus.HistogramOpts{
Name: "pixiu_ai_kvcache_set_latency_seconds",
Help: "Latency of KV cache set operations",
Buckets: prometheus.ExponentialBuckets(0.001, 2, 10),
},
[]string{"model"},
)
kvCacheCostSavings = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "pixiu_ai_kvcache_cost_savings_usd",
Help: "Cost savings from cache hits (in USD)",
},
[]string{"model"},
)
)
```
### Grafana Dashboard
```json
{
"dashboard": {
"title": "Pixiu AI KV Cache Monitoring",
"panels": [
{
"title": "Cache Hit Ratio",
"targets": [
{
"expr": "rate(pixiu_ai_kvcache_hit_total[5m]) /
(rate(pixiu_ai_kvcache_hit_total[5m]) + rate(pixiu_ai_kvcache_miss_total[5m]))"
}
]
},
{
"title": "Cache Get/Set Latency (P95)",
"targets": [
{
"expr": "histogram_quantile(0.95,
rate(pixiu_ai_kvcache_get_latency_seconds_bucket[5m]))",
"legendFormat": "Get P95"
},
{
"expr": "histogram_quantile(0.95,
rate(pixiu_ai_kvcache_set_latency_seconds_bucket[5m]))",
"legendFormat": "Set P95"
}
]
},
{
"title": "Cost Savings (USD/hour)",
"targets": [
{
"expr": "rate(pixiu_ai_kvcache_cost_savings_usd[1h])"
}
]
}
]
}
}
```
---
## 部署架构
### Kubernetes 部署
```yaml
# deploy/kvcache-deployment.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: pixiu-kvcache-config
data:
config.yaml: |
# ... 完整配置文件内容
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: pixiu-ai-gateway
spec:
replicas: 3
selector:
matchLabels:
app: pixiu-ai-gateway
template:
metadata:
labels:
app: pixiu-ai-gateway
spec:
containers:
- name: pixiu
image: dubbogopixiu/dubbo-go-pixiu:latest
ports:
- containerPort: 8888
volumeMounts:
- name: config
mountPath: /etc/pixiu
env:
- name: REDIS_PASSWORD
valueFrom:
secretKeyRef:
name: redis-secret
key: password
resources:
requests:
cpu: "2"
memory: "4Gi"
limits:
cpu: "4"
memory: "8Gi"
volumes:
- name: config
configMap:
name: pixiu-kvcache-config
---
apiVersion: v1
kind: Service
metadata:
name: pixiu-ai-gateway
spec:
selector:
app: pixiu-ai-gateway
ports:
- port: 8888
targetPort: 8888
type: LoadBalancer
```
### Redis Cluster 部署
```yaml
# deploy/redis-cluster.yaml
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: redis-cluster
spec:
serviceName: redis-cluster
replicas: 6
selector:
matchLabels:
app: redis-cluster
template:
metadata:
labels:
app: redis-cluster
spec:
containers:
- name: redis
image: redis:7.2
command: ["redis-server"]
args: ["--cluster-enabled", "yes", "--cluster-config-file",
"/data/nodes.conf"]
ports:
- containerPort: 6379
name: client
- containerPort: 16379
name: gossip
volumeMounts:
- name: data
mountPath: /data
resources:
requests:
cpu: "1"
memory: "4Gi"
limits:
cpu: "2"
memory: "8Gi"
volumeClaimTemplates:
- metadata:
name: data
spec:
accessModes: ["ReadWriteOnce"]
resources:
requests:
storage: 100Gi
```
---
## 性能测试与验证
### 测试场景
#### 场景 1:无 KV Cache(Baseline)
```bash
# 4 个 SGLang 实例,无共享缓存
QPS: 100
TTFT (P95): 2000ms
TPOT (Avg): 50ms
成本: $100/hour
```
#### 场景 2:有 KV Cache(Redis 集群)
```bash
# 4 个 SGLang 实例 + Redis Cluster + Pixiu KV Cache Filter
QPS: 150 (+50%)
TTFT (P95): 800ms (-60%)
TPOT (Avg): 37ms (-26%)
缓存命中率: 85%
成本: $55/hour (-45%)
```
### 测试工具
```bash
# 使用 tools/benchmark 进行压测
cd tools/benchmark
go run main.go \
--url http://pixiu-gateway:8888/v1/chat/completions \
--model gpt-4 \
--qps 100 \
--duration 300s \
--multi-turn true \
--sessions 100
```
---
## 成本收益分析
### 成本节省计算
假设:
- GPT-4 Input: $0.03/1K tokens,Output: $0.06/1K tokens
- 平均输入 1000 tokens,平均输出 500 tokens
- 多轮对话平均 5 轮
- 每日 100 万次请求
#### 无 KV Cache(每次都预填充)
```
每次请求成本 = (1000 * $0.03 + 500 * $0.06) / 1000 = $0.06
多轮对话成本 = 5 * $0.06 = $0.30
每日成本 = 1,000,000 * $0.30 = $300,000
```
#### 有 KV Cache(缓存命中率 85%)
```
首轮请求成本 = $0.06
后续轮次命中缓存:只计算输出成本 = 500 * $0.06 / 1000 = $0.03
后续轮次未命中:仍需预填充 = $0.06
多轮对话成本 = $0.06 + (0.85 * 4 * $0.03) + (0.15 * 4 * $0.06)
= $0.06 + $0.102 + $0.036
= $0.198
每日成本 = 1,000,000 * $0.198 = $198,000
```
**每日节省**:$300,000 - $198,000 = **$102,000**(34% 降本)
**每月节省**:**$3,060,000**
---
## 故障处理与高可用
### Redis 集群故障
1. **自动故障转移**
- Redis Sentinel 监控主节点
- 自动选举新主节点
- 更新客户端连接
2. **降级策略**
- Redis 不可用时,跳过缓存逻辑
- 直接转发请求到推理服务
- 记录告警日志
```go
func (m *RedisKVCacheManager) Get(ctx context.Context, key string) (*KVCache,
error) {
data, err := m.client.Get(ctx, redisKey).Bytes()
if err != nil {
if isRedisDownError(err) {
// Redis 宕机,降级处理
logger.Error("[KVCache] Redis is down, fallback to
no-cache mode")
return nil, ErrCacheNotFound // 当作缓存未命中处理
}
return nil, err
}
// ...
}
```
### 数据一致性
1. **TTL 过期策略**
- 设置合理的 TTL(如 1 小时)
- 避免过期数据被使用
2. **版本控制**
- 在缓存键中包含模型版本
- 模型更新后自动失效旧缓存
```go
cacheKey := fmt.Sprintf("model:%s:v%s:session:%s", modelName, modelVersion,
sessionID)
```
---
## 最佳实践
### 1. 缓存键设计
✅ **推荐**:
```
pixiu:kvcache:sid:{session_id}:model:{model_name}:round:{round_number}
```
❌ **不推荐**:
```
kv_{random_id} # 无法追踪和调试
```
### 2. TTL 设置
- **短会话**(5 分钟内完成):TTL = 10 分钟
- **长会话**(1 小时内完成):TTL = 2 小时
- **跨天会话**:TTL = 24 小时
### 3. 内存管理
- 单个 KV Cache 大小通常在 100MB ~ 1GB
- Redis 集群总内存 = 预期并发会话数 × 平均 KV Cache 大小 × 1.5(冗余)
示例:
```
10,000 并发会话 × 500MB × 1.5 = 7.5TB
部署 10 个 Redis 节点,每个 800GB 内存
```
### 4. 监控告警
设置关键告警阈值:
- 缓存命中率 < 50%:优化缓存键生成策略
- Redis 内存使用率 > 80%:扩容或调整 TTL
- Get/Set 延迟 P95 > 100ms:检查网络或 Redis 性能
GitHub link: https://github.com/apache/dubbo-go-pixiu/discussions/860
----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]