Copilot commented on code in PR #733: URL: https://github.com/apache/dubbo-go-pixiu/pull/733#discussion_r2269437508
########## pkg/filter/llm/tokenizer/tokenizer.go: ########## @@ -24,138 +24,313 @@ import ( "encoding/json" "fmt" "io" + "strconv" "strings" "sync" + "time" +) + +import ( + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric/global" Review Comment: The 'go.opentelemetry.io/otel/metric/global' package is deprecated. Use 'go.opentelemetry.io/otel' instead and get the meter provider from the global tracer provider or dependency injection. ```suggestion "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" ``` ########## pkg/filter/llm/tokenizer/tokenizer.go: ########## @@ -24,138 +24,313 @@ import ( "encoding/json" "fmt" "io" + "strconv" "strings" "sync" + "time" +) + +import ( + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric/global" + "go.opentelemetry.io/otel/metric/instrument" + "go.opentelemetry.io/otel/metric/instrument/syncfloat64" + "go.opentelemetry.io/otel/metric/instrument/syncint64" Review Comment: The sync instrument packages have been reorganized in newer OpenTelemetry versions. Use the newer metric API imports instead. ```suggestion ``` ########## pkg/filter/llm/tokenizer/tokenizer.go: ########## @@ -24,138 +24,313 @@ import ( "encoding/json" "fmt" "io" + "strconv" "strings" "sync" + "time" +) + +import ( + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric/global" + "go.opentelemetry.io/otel/metric/instrument" + "go.opentelemetry.io/otel/metric/instrument/syncfloat64" + "go.opentelemetry.io/otel/metric/instrument/syncint64" Review Comment: The sync instrument packages have been reorganized in newer OpenTelemetry versions. Use the newer metric API imports instead. ```suggestion ``` ########## pkg/filter/llm/tokenizer/tokenizer.go: ########## @@ -24,138 +24,313 @@ import ( "encoding/json" "fmt" "io" + "strconv" "strings" "sync" + "time" +) + +import ( + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric/global" + "go.opentelemetry.io/otel/metric/instrument" + "go.opentelemetry.io/otel/metric/instrument/syncfloat64" + "go.opentelemetry.io/otel/metric/instrument/syncint64" + "go.opentelemetry.io/otel/metric/unit" ) import ( "github.com/apache/dubbo-go-pixiu/pkg/client" "github.com/apache/dubbo-go-pixiu/pkg/common/constant" "github.com/apache/dubbo-go-pixiu/pkg/common/extension/filter" contexthttp "github.com/apache/dubbo-go-pixiu/pkg/context/http" + "github.com/apache/dubbo-go-pixiu/pkg/filter/llm/proxy" "github.com/apache/dubbo-go-pixiu/pkg/logger" ) const ( + // General constants Kind = constant.LLMTokenizerFilter LoggerFmt = "[Tokenizer] [DOWNSTREAM] " PromptTokensDetails = "prompt_tokens_details" + + // Metric meter name + meterName = "dubbo-go-pixiu.ai-gateway" + + // Attribute Keys for metrics + attrClusterName = "cluster_name" + attrEndpointID = "endpoint_id" + attrEndpointAddress = "endpoint_address" + attrStatusCode = "status_code" + attrErrorType = "error_type" + attrModel = "model" + + // JSON Keys from LLM response + jsonKeyUsage = "usage" + jsonKeyModel = "model" + jsonKeyPromptTokens = "prompt_tokens" + jsonKeyCompletionTokens = "completion_tokens" + jsonKeyTotalTokens = "total_tokens" + + // Server-Sent Events (SSE) constants + sseDone = "[DONE]" + sseDataPrefix = "data:" + + // Metric Names + metricPromptTokens = "pixiu_llm_prompt_tokens_total" + metricCompletionTokens = "pixiu_llm_completion_tokens_total" + metricTotalTokens = "pixiu_llm_total_tokens_total" + metricUpstreamRequests = "pixiu_llm_upstream_requests_total" + metricUpstreamSuccess = "pixiu_llm_upstream_requests_success_total" + metricUpstreamFailure = "pixiu_llm_upstream_requests_failure_total" + metricTotalDurationSum = "pixiu_llm_total_duration_microseconds_sum_total" + metricTTFTSum = "pixiu_llm_time_to_first_token_milliseconds_sum_total" + metricStreamingRequests = "pixiu_llm_streaming_requests_total" +) + +var ( + // Metric Instruments + llmPromptTokens syncint64.Counter + llmCompletionTokens syncint64.Counter + llmTotalTokens syncint64.Counter + llmUpstreamRequestsTotal syncint64.Counter + llmUpstreamRequestsSuccessTotal syncint64.Counter + llmUpstreamRequestsFailureTotal syncint64.Counter + llmTotalDurationSum syncfloat64.Counter + llmTimeToFirstTokenSum syncfloat64.Counter + llmStreamingRequestsTotal syncint64.Counter ) func init() { filter.RegisterHttpFilter(&Plugin{}) } type ( - // Plugin is http filter plugin. - Plugin struct { - } - // FilterFactory is http filter instance - FilterFactory struct { - cfg *Config + Plugin struct{} + FilterFactory struct{ cfg *Config } + Filter struct { + cfg *Config + start time.Time + recordTTFTOnce sync.Once } - // Filter is http filter instance - Filter struct { - cfg *Config - } - // Config describes the config of FilterFactory Config struct { + LogToConsole bool `yaml:"log_to_console" json:"log_to_console,omitempty" default:"false"` } ) -func (p *Plugin) Kind() string { - return Kind -} +func (p *Plugin) Kind() string { return Kind } func (p *Plugin) CreateFilterFactory() (filter.HttpFilterFactory, error) { return &FilterFactory{cfg: &Config{}}, nil } -func (factory *FilterFactory) Config() any { - return factory.cfg -} +func (factory *FilterFactory) Config() any { return factory.cfg } -func (factory *FilterFactory) Apply() error { - return nil -} +func (factory *FilterFactory) Apply() error { return registerLLMMetrics() } func (factory *FilterFactory) PrepareFilterChain(ctx *contexthttp.HttpContext, chain filter.FilterChain) error { - f := &Filter{ - cfg: factory.cfg, - } + f := &Filter{cfg: factory.cfg} + chain.AppendDecodeFilters(f) chain.AppendEncodeFilters(f) return nil } +func (f *Filter) Decode(hc *contexthttp.HttpContext) filter.FilterStatus { + f.start = time.Now() + return filter.Continue +} + func (f *Filter) Encode(hc *contexthttp.HttpContext) filter.FilterStatus { - encoding := hc.Writer.Header().Get(constant.HeaderKeyContentEncoding) + // Report all metrics from a central place + f.reportUpstreamMetrics(hc) + f.reportTotalDurationMetric(hc) + // Handle response body for token counting and TTFT + encoding := hc.Writer.Header().Get(constant.HeaderKeyContentEncoding) switch res := hc.TargetResp.(type) { case *client.StreamResponse: pr, pw := io.Pipe() res.Stream = newTeeReadCloser(res.Stream, pw) - go f.processStreamResponse(pr, encoding) + go f.processStreamResponse(hc, pr, encoding) case *client.UnaryResponse: - f.processUsageData(res.Data, encoding) // Unary response is not a stream + f.processUnaryResponse(hc, res.Data, encoding) default: - logger.Warnf(LoggerFmt+"Response type not suitable for token calc: %T", res) + logger.Warnf(LoggerFmt+"Response type not suitable for token calculation: %T", res) } - return filter.Continue } -func (f *Filter) processStreamResponse(body io.Reader, encoding string) { - // For streams, we decompress the entire stream first, then process its content. - // The content itself (with "data:" prefixes) is passed to processUsageData. - decompressedData, ok := decompress(body, encoding) +// reportUpstreamMetrics reads attempt data from context and reports endpoint-level metrics. +func (f *Filter) reportUpstreamMetrics(hc *contexthttp.HttpContext) { + attemptsVal, ok := hc.Params[proxy.LLMUpstreamAttemptsKey] + if !ok { + return // No attempt data found, likely not an LLM proxy request + } + attempts, ok := attemptsVal.([]proxy.UpstreamAttempt) if !ok { + logger.Warnf(LoggerFmt+"Upstream attempt data in context has wrong type: %T", attemptsVal) return } - decompressedDataTrim := strings.TrimPrefix(string(decompressedData), "data:") + for _, attempt := range attempts { + attrs := []attribute.KeyValue{ + attribute.String(attrClusterName, attempt.ClusterName), + attribute.String(attrEndpointID, attempt.EndpointID), + attribute.String(attrEndpointAddress, attempt.EndpointAddress), + } + + llmUpstreamRequestsTotal.Add(hc.Ctx, 1, attrs...) + + if attempt.Success { + successAttrs := append(attrs, attribute.String(attrStatusCode, strconv.Itoa(attempt.StatusCode))) + llmUpstreamRequestsSuccessTotal.Add(hc.Ctx, 1, successAttrs...) + } else { + failureAttrs := append(attrs, + attribute.String(attrStatusCode, strconv.Itoa(attempt.StatusCode)), + attribute.String(attrErrorType, attempt.ErrorType), + ) + llmUpstreamRequestsFailureTotal.Add(hc.Ctx, 1, failureAttrs...) + } + } +} + +// reportTotalDurationMetric calculates and reports the total request-response duration. +func (f *Filter) reportTotalDurationMetric(hc *contexthttp.HttpContext) { + totalDuration := time.Since(f.start) + clusterName := "unknown" + if rEntry := hc.GetRouteEntry(); rEntry != nil { + clusterName = rEntry.Cluster + } + durationAttrs := []attribute.KeyValue{ + attribute.String(attrClusterName, clusterName), + attribute.String(attrStatusCode, strconv.Itoa(hc.GetStatusCode())), + } + llmTotalDurationSum.Add(hc.Ctx, float64(totalDuration.Microseconds()), durationAttrs...) +} + +// processStreamResponse handles streaming responses to calculate TTFT and count tokens. +func (f *Filter) processStreamResponse(hc *contexthttp.HttpContext, body io.Reader, encoding string) { + streamStartTime := time.Now() + decompressedReader, err := getDecompressedReader(body, encoding) + if err != nil { + logger.Errorf(LoggerFmt+"could not create decompressing reader: %v", err) + return + } + defer decompressedReader.Close() + + buf := make([]byte, 4096) + var eventBuffer bytes.Buffer + + for { + n, err := decompressedReader.Read(buf) + if n > 0 { + eventBuffer.Write(buf[:n]) + for { + event, remaining, found := splitSSEEvent(eventBuffer.Bytes()) + if !found { + break + } + jsonData := strings.TrimSpace(strings.TrimPrefix(string(event), sseDataPrefix)) + if len(jsonData) > 0 { + f.parseAndReportTokens(hc, []byte(jsonData)) + } + eventBuffer.Reset() + eventBuffer.Write(remaining) + } + } + + if err == io.EOF { + if eventBuffer.Len() > 0 { + jsonData := strings.TrimSpace(strings.TrimPrefix(eventBuffer.String(), sseDataPrefix)) + if len(jsonData) > 0 { + f.parseAndReportTokens(hc, []byte(jsonData)) + } + } + break + } + if err != nil { + logger.Errorf(LoggerFmt+"error reading decompressed stream: %v", err) + break + } + } + + // On the very first successful read, record Time to First Token. + f.recordTTFTOnce.Do(func() { + ttft := time.Since(streamStartTime) + clusterName := "unknown" + if rEntry := hc.GetRouteEntry(); rEntry != nil { + clusterName = rEntry.Cluster + } + attrs := attribute.String(attrClusterName, clusterName) + + llmTimeToFirstTokenSum.Add(hc.Ctx, float64(ttft.Milliseconds()), attrs) + llmStreamingRequestsTotal.Add(hc.Ctx, 1, attrs) + }) Review Comment: The TTFT recording logic is placed after the stream processing loop, which means it will only execute after the entire stream is consumed. This defeats the purpose of measuring 'Time to First Token'. The TTFT should be recorded on the first successful read within the loop. ```suggestion ``` ########## pkg/filter/llm/tokenizer/tokenizer.go: ########## @@ -24,138 +24,313 @@ import ( "encoding/json" "fmt" "io" + "strconv" "strings" "sync" + "time" +) + +import ( + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric/global" + "go.opentelemetry.io/otel/metric/instrument" + "go.opentelemetry.io/otel/metric/instrument/syncfloat64" + "go.opentelemetry.io/otel/metric/instrument/syncint64" Review Comment: The 'go.opentelemetry.io/otel/metric/instrument' package structure has changed in newer OpenTelemetry versions. Consider using the newer metric API structure. ```suggestion "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/metric/global" ``` ########## pkg/filter/llm/tokenizer/tokenizer.go: ########## @@ -24,138 +24,313 @@ import ( "encoding/json" "fmt" "io" + "strconv" "strings" "sync" + "time" +) + +import ( + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric/global" + "go.opentelemetry.io/otel/metric/instrument" + "go.opentelemetry.io/otel/metric/instrument/syncfloat64" + "go.opentelemetry.io/otel/metric/instrument/syncint64" + "go.opentelemetry.io/otel/metric/unit" ) import ( "github.com/apache/dubbo-go-pixiu/pkg/client" "github.com/apache/dubbo-go-pixiu/pkg/common/constant" "github.com/apache/dubbo-go-pixiu/pkg/common/extension/filter" contexthttp "github.com/apache/dubbo-go-pixiu/pkg/context/http" + "github.com/apache/dubbo-go-pixiu/pkg/filter/llm/proxy" "github.com/apache/dubbo-go-pixiu/pkg/logger" ) const ( + // General constants Kind = constant.LLMTokenizerFilter LoggerFmt = "[Tokenizer] [DOWNSTREAM] " PromptTokensDetails = "prompt_tokens_details" + + // Metric meter name + meterName = "dubbo-go-pixiu.ai-gateway" + + // Attribute Keys for metrics + attrClusterName = "cluster_name" + attrEndpointID = "endpoint_id" + attrEndpointAddress = "endpoint_address" + attrStatusCode = "status_code" + attrErrorType = "error_type" + attrModel = "model" + + // JSON Keys from LLM response + jsonKeyUsage = "usage" + jsonKeyModel = "model" + jsonKeyPromptTokens = "prompt_tokens" + jsonKeyCompletionTokens = "completion_tokens" + jsonKeyTotalTokens = "total_tokens" + + // Server-Sent Events (SSE) constants + sseDone = "[DONE]" + sseDataPrefix = "data:" + + // Metric Names + metricPromptTokens = "pixiu_llm_prompt_tokens_total" + metricCompletionTokens = "pixiu_llm_completion_tokens_total" + metricTotalTokens = "pixiu_llm_total_tokens_total" + metricUpstreamRequests = "pixiu_llm_upstream_requests_total" + metricUpstreamSuccess = "pixiu_llm_upstream_requests_success_total" + metricUpstreamFailure = "pixiu_llm_upstream_requests_failure_total" + metricTotalDurationSum = "pixiu_llm_total_duration_microseconds_sum_total" + metricTTFTSum = "pixiu_llm_time_to_first_token_milliseconds_sum_total" + metricStreamingRequests = "pixiu_llm_streaming_requests_total" +) + +var ( + // Metric Instruments + llmPromptTokens syncint64.Counter + llmCompletionTokens syncint64.Counter + llmTotalTokens syncint64.Counter + llmUpstreamRequestsTotal syncint64.Counter + llmUpstreamRequestsSuccessTotal syncint64.Counter + llmUpstreamRequestsFailureTotal syncint64.Counter + llmTotalDurationSum syncfloat64.Counter + llmTimeToFirstTokenSum syncfloat64.Counter + llmStreamingRequestsTotal syncint64.Counter ) func init() { filter.RegisterHttpFilter(&Plugin{}) } type ( - // Plugin is http filter plugin. - Plugin struct { - } - // FilterFactory is http filter instance - FilterFactory struct { - cfg *Config + Plugin struct{} + FilterFactory struct{ cfg *Config } + Filter struct { + cfg *Config + start time.Time + recordTTFTOnce sync.Once } - // Filter is http filter instance - Filter struct { - cfg *Config - } - // Config describes the config of FilterFactory Config struct { + LogToConsole bool `yaml:"log_to_console" json:"log_to_console,omitempty" default:"false"` } ) -func (p *Plugin) Kind() string { - return Kind -} +func (p *Plugin) Kind() string { return Kind } func (p *Plugin) CreateFilterFactory() (filter.HttpFilterFactory, error) { return &FilterFactory{cfg: &Config{}}, nil } -func (factory *FilterFactory) Config() any { - return factory.cfg -} +func (factory *FilterFactory) Config() any { return factory.cfg } -func (factory *FilterFactory) Apply() error { - return nil -} +func (factory *FilterFactory) Apply() error { return registerLLMMetrics() } func (factory *FilterFactory) PrepareFilterChain(ctx *contexthttp.HttpContext, chain filter.FilterChain) error { - f := &Filter{ - cfg: factory.cfg, - } + f := &Filter{cfg: factory.cfg} + chain.AppendDecodeFilters(f) chain.AppendEncodeFilters(f) return nil } +func (f *Filter) Decode(hc *contexthttp.HttpContext) filter.FilterStatus { + f.start = time.Now() + return filter.Continue +} + func (f *Filter) Encode(hc *contexthttp.HttpContext) filter.FilterStatus { - encoding := hc.Writer.Header().Get(constant.HeaderKeyContentEncoding) + // Report all metrics from a central place + f.reportUpstreamMetrics(hc) + f.reportTotalDurationMetric(hc) + // Handle response body for token counting and TTFT + encoding := hc.Writer.Header().Get(constant.HeaderKeyContentEncoding) switch res := hc.TargetResp.(type) { case *client.StreamResponse: pr, pw := io.Pipe() res.Stream = newTeeReadCloser(res.Stream, pw) - go f.processStreamResponse(pr, encoding) + go f.processStreamResponse(hc, pr, encoding) case *client.UnaryResponse: - f.processUsageData(res.Data, encoding) // Unary response is not a stream + f.processUnaryResponse(hc, res.Data, encoding) default: - logger.Warnf(LoggerFmt+"Response type not suitable for token calc: %T", res) + logger.Warnf(LoggerFmt+"Response type not suitable for token calculation: %T", res) } - return filter.Continue } -func (f *Filter) processStreamResponse(body io.Reader, encoding string) { - // For streams, we decompress the entire stream first, then process its content. - // The content itself (with "data:" prefixes) is passed to processUsageData. - decompressedData, ok := decompress(body, encoding) +// reportUpstreamMetrics reads attempt data from context and reports endpoint-level metrics. +func (f *Filter) reportUpstreamMetrics(hc *contexthttp.HttpContext) { + attemptsVal, ok := hc.Params[proxy.LLMUpstreamAttemptsKey] + if !ok { + return // No attempt data found, likely not an LLM proxy request + } + attempts, ok := attemptsVal.([]proxy.UpstreamAttempt) if !ok { + logger.Warnf(LoggerFmt+"Upstream attempt data in context has wrong type: %T", attemptsVal) return } - decompressedDataTrim := strings.TrimPrefix(string(decompressedData), "data:") + for _, attempt := range attempts { + attrs := []attribute.KeyValue{ + attribute.String(attrClusterName, attempt.ClusterName), + attribute.String(attrEndpointID, attempt.EndpointID), + attribute.String(attrEndpointAddress, attempt.EndpointAddress), + } + + llmUpstreamRequestsTotal.Add(hc.Ctx, 1, attrs...) + + if attempt.Success { + successAttrs := append(attrs, attribute.String(attrStatusCode, strconv.Itoa(attempt.StatusCode))) + llmUpstreamRequestsSuccessTotal.Add(hc.Ctx, 1, successAttrs...) + } else { + failureAttrs := append(attrs, + attribute.String(attrStatusCode, strconv.Itoa(attempt.StatusCode)), + attribute.String(attrErrorType, attempt.ErrorType), + ) + llmUpstreamRequestsFailureTotal.Add(hc.Ctx, 1, failureAttrs...) + } + } +} + +// reportTotalDurationMetric calculates and reports the total request-response duration. +func (f *Filter) reportTotalDurationMetric(hc *contexthttp.HttpContext) { + totalDuration := time.Since(f.start) + clusterName := "unknown" + if rEntry := hc.GetRouteEntry(); rEntry != nil { + clusterName = rEntry.Cluster + } + durationAttrs := []attribute.KeyValue{ + attribute.String(attrClusterName, clusterName), + attribute.String(attrStatusCode, strconv.Itoa(hc.GetStatusCode())), + } + llmTotalDurationSum.Add(hc.Ctx, float64(totalDuration.Microseconds()), durationAttrs...) +} + +// processStreamResponse handles streaming responses to calculate TTFT and count tokens. +func (f *Filter) processStreamResponse(hc *contexthttp.HttpContext, body io.Reader, encoding string) { + streamStartTime := time.Now() + decompressedReader, err := getDecompressedReader(body, encoding) + if err != nil { + logger.Errorf(LoggerFmt+"could not create decompressing reader: %v", err) + return + } + defer decompressedReader.Close() + + buf := make([]byte, 4096) Review Comment: [nitpick] The buffer size of 4096 bytes is a magic number. Consider defining it as a named constant for better maintainability. ```suggestion buf := make([]byte, streamBufferSize) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@dubbo.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@dubbo.apache.org For additional commands, e-mail: notifications-h...@dubbo.apache.org