Copilot commented on code in PR #6055:
URL: https://github.com/apache/shenyu/pull/6055#discussion_r2212681534


##########
shenyu-plugin/shenyu-plugin-ai/shenyu-plugin-ai-token-limiter/src/main/java/org/apache/shenyu/plugin/ai/token/limiter/AiTokenLimiterPlugin.java:
##########
@@ -228,42 +242,78 @@ private Flux<? extends DataBuffer> appendResponse(final 
Publisher<? extends Data
                                 byte[] inBytes = new byte[ro.remaining()];
                                 ro.get(inBytes);
 
+                                byte[] processedBytes;
                                 if (isGzip) {
                                     int offset = 0;
-                                    int len = inBytes.length;
-                                    if (!headerSkipped.get()) {
+                                    if (!headerSkipped.getAndSet(true)) {
                                         offset = skipGzipHeader(inBytes);
-                                        headerSkipped.set(true);
                                     }
-                                    inflater.setInput(inBytes, offset, len - 
offset);
+                                    inflater.setInput(inBytes, offset, 
inBytes.length - offset);
+                                    ByteArrayOutputStream baos = new 
ByteArrayOutputStream();
                                     try {
                                         int cnt;
                                         while ((cnt = 
inflater.inflate(outBuf)) > 0) {
-                                            
writer.write(ByteBuffer.wrap(outBuf, 0, cnt));
+                                            baos.write(outBuf, 0, cnt);
                                         }
                                     } catch (DataFormatException ex) {
-                                        LOG.error("inflater decompression 
failed", ex);
+                                        LOG.error("Inflater decompression 
failed", ex);
                                     }
+                                    processedBytes = baos.toByteArray();
                                 } else {
-                                    writer.write(ro);
+                                    processedBytes = inBytes;
+                                }
+                                String chunk = new String(processedBytes, 
StandardCharsets.UTF_8);
+                                for (String line : chunk.split("\\r?\\n")) {
+                                    if (!line.startsWith("data:")) {
+                                        continue;
+                                    }
+                                    String payload = 
line.substring("data:".length()).trim();
+                                    if (payload.isEmpty() || 
"[DONE]".equals(payload)) {
+                                        continue;
+                                    }
+                                    if (!payload.startsWith("{")) {
+                                        continue;
+                                    }
+                                    try {
+                                        JsonNode node = 
MAPPER.readTree(payload);
+                                        JsonNode usage = 
node.get(Constants.USAGE);
+                                        if (Objects.nonNull(usage) && 
usage.has(Constants.COMPLETION_TOKENS)) {
+                                            long c = 
usage.get(Constants.COMPLETION_TOKENS).asLong();
+                                            tokensRecorder.accept(c);
+                                            streamingUsageRecorded.set(true);
+                                        }
+                                    } catch (Exception e) {
+                                        LOG.error("parse ai resp error", e);
+                                    }
                                 }
+                                writer.write(ByteBuffer.wrap(processedBytes));
                             });
                         } catch (Exception e) {
                             LOG.error("read dataBuffer error", e);
                         }
                     })
                     .doFinally(signal -> {
-                        // release inflater
                         if (Objects.nonNull(inflater)) {
                             inflater.end();
                         }
-                        String responseBody = writer.output();
-                        AiModel aiModel = 
exchange.getAttribute(Constants.AI_MODEL);
-                        long tokens = 
Objects.requireNonNull(aiModel).getCompletionTokens(responseBody);
-                        tokensRecorder.accept(tokens);
+                        if (!streamingUsageRecorded.get()) {
+                            String sse = writer.output();
+                            long usageTokens = extractUsageTokensFromSse(sse);
+                            tokensRecorder.accept(usageTokens);
+                        }
                     });
         }
 
+        private long extractUsageTokensFromSse(final String sse) {
+            Pattern p = 
Pattern.compile("\"completion_tokens\"\\s*:\\s*(\\d+)");
+            Matcher m = p.matcher(sse);

Review Comment:
   The regex pattern is compiled on every method call. Consider making the 
Pattern a static final field to improve performance.
   ```suggestion
           private static final Pattern COMPLETION_TOKENS_PATTERN = 
Pattern.compile("\"completion_tokens\"\\s*:\\s*(\\d+)");
   
           private long extractUsageTokensFromSse(final String sse) {
               Matcher m = COMPLETION_TOKENS_PATTERN.matcher(sse);
   ```



##########
shenyu-plugin/shenyu-plugin-ai/shenyu-plugin-ai-token-limiter/src/main/java/org/apache/shenyu/plugin/ai/token/limiter/AiTokenLimiterPlugin.java:
##########
@@ -228,42 +242,78 @@ private Flux<? extends DataBuffer> appendResponse(final 
Publisher<? extends Data
                                 byte[] inBytes = new byte[ro.remaining()];
                                 ro.get(inBytes);
 
+                                byte[] processedBytes;
                                 if (isGzip) {
                                     int offset = 0;
-                                    int len = inBytes.length;
-                                    if (!headerSkipped.get()) {
+                                    if (!headerSkipped.getAndSet(true)) {
                                         offset = skipGzipHeader(inBytes);
-                                        headerSkipped.set(true);
                                     }
-                                    inflater.setInput(inBytes, offset, len - 
offset);
+                                    inflater.setInput(inBytes, offset, 
inBytes.length - offset);
+                                    ByteArrayOutputStream baos = new 
ByteArrayOutputStream();
                                     try {
                                         int cnt;
                                         while ((cnt = 
inflater.inflate(outBuf)) > 0) {
-                                            
writer.write(ByteBuffer.wrap(outBuf, 0, cnt));
+                                            baos.write(outBuf, 0, cnt);
                                         }
                                     } catch (DataFormatException ex) {
-                                        LOG.error("inflater decompression 
failed", ex);
+                                        LOG.error("Inflater decompression 
failed", ex);
                                     }
+                                    processedBytes = baos.toByteArray();
                                 } else {
-                                    writer.write(ro);
+                                    processedBytes = inBytes;
+                                }
+                                String chunk = new String(processedBytes, 
StandardCharsets.UTF_8);
+                                for (String line : chunk.split("\\r?\\n")) {
+                                    if (!line.startsWith("data:")) {
+                                        continue;
+                                    }
+                                    String payload = 
line.substring("data:".length()).trim();
+                                    if (payload.isEmpty() || 
"[DONE]".equals(payload)) {
+                                        continue;
+                                    }
+                                    if (!payload.startsWith("{")) {
+                                        continue;
+                                    }
+                                    try {
+                                        JsonNode node = 
MAPPER.readTree(payload);
+                                        JsonNode usage = 
node.get(Constants.USAGE);
+                                        if (Objects.nonNull(usage) && 
usage.has(Constants.COMPLETION_TOKENS)) {
+                                            long c = 
usage.get(Constants.COMPLETION_TOKENS).asLong();
+                                            tokensRecorder.accept(c);
+                                            streamingUsageRecorded.set(true);
+                                        }
+                                    } catch (Exception e) {
+                                        LOG.error("parse ai resp error", e);
+                                    }
                                 }
+                                writer.write(ByteBuffer.wrap(processedBytes));
                             });
                         } catch (Exception e) {
                             LOG.error("read dataBuffer error", e);

Review Comment:
   Catching generic Exception is too broad. Consider catching specific 
exceptions like JsonProcessingException or IOException to handle different 
error scenarios appropriately.
   ```suggestion
                                       } catch (JsonProcessingException e) {
                                           LOG.error("JSON processing error 
while parsing AI response", e);
                                       } catch (IOException e) {
                                           LOG.error("IO error while parsing AI 
response", e);
                                       }
                                   }
                                   
writer.write(ByteBuffer.wrap(processedBytes));
                               });
                           } catch (IOException e) {
                               LOG.error("IO error while reading dataBuffer", 
e);
   ```



##########
shenyu-plugin/shenyu-plugin-ai/shenyu-plugin-ai-token-limiter/src/main/java/org/apache/shenyu/plugin/ai/token/limiter/AiTokenLimiterPlugin.java:
##########
@@ -228,42 +242,78 @@ private Flux<? extends DataBuffer> appendResponse(final 
Publisher<? extends Data
                                 byte[] inBytes = new byte[ro.remaining()];
                                 ro.get(inBytes);
 
+                                byte[] processedBytes;
                                 if (isGzip) {
                                     int offset = 0;
-                                    int len = inBytes.length;
-                                    if (!headerSkipped.get()) {
+                                    if (!headerSkipped.getAndSet(true)) {
                                         offset = skipGzipHeader(inBytes);
-                                        headerSkipped.set(true);
                                     }
-                                    inflater.setInput(inBytes, offset, len - 
offset);
+                                    inflater.setInput(inBytes, offset, 
inBytes.length - offset);
+                                    ByteArrayOutputStream baos = new 
ByteArrayOutputStream();
                                     try {
                                         int cnt;
                                         while ((cnt = 
inflater.inflate(outBuf)) > 0) {
-                                            
writer.write(ByteBuffer.wrap(outBuf, 0, cnt));
+                                            baos.write(outBuf, 0, cnt);
                                         }
                                     } catch (DataFormatException ex) {
-                                        LOG.error("inflater decompression 
failed", ex);
+                                        LOG.error("Inflater decompression 
failed", ex);
                                     }
+                                    processedBytes = baos.toByteArray();
                                 } else {
-                                    writer.write(ro);
+                                    processedBytes = inBytes;
+                                }
+                                String chunk = new String(processedBytes, 
StandardCharsets.UTF_8);
+                                for (String line : chunk.split("\\r?\\n")) {
+                                    if (!line.startsWith("data:")) {
+                                        continue;
+                                    }
+                                    String payload = 
line.substring("data:".length()).trim();
+                                    if (payload.isEmpty() || 
"[DONE]".equals(payload)) {
+                                        continue;
+                                    }
+                                    if (!payload.startsWith("{")) {
+                                        continue;
+                                    }
+                                    try {
+                                        JsonNode node = 
MAPPER.readTree(payload);
+                                        JsonNode usage = 
node.get(Constants.USAGE);
+                                        if (Objects.nonNull(usage) && 
usage.has(Constants.COMPLETION_TOKENS)) {
+                                            long c = 
usage.get(Constants.COMPLETION_TOKENS).asLong();
+                                            tokensRecorder.accept(c);
+                                            streamingUsageRecorded.set(true);
+                                        }
+                                    } catch (Exception e) {
+                                        LOG.error("parse ai resp error", e);

Review Comment:
   The error message 'parse ai resp error' is unclear and uses abbreviations. 
Consider a more descriptive message like 'Failed to parse AI response JSON 
payload'.
   ```suggestion
                                           LOG.error("Failed to parse AI 
response JSON payload", e);
   ```



##########
shenyu-plugin/shenyu-plugin-ai/shenyu-plugin-ai-token-limiter/src/main/java/org/apache/shenyu/plugin/ai/token/limiter/AiTokenLimiterPlugin.java:
##########
@@ -228,42 +242,78 @@ private Flux<? extends DataBuffer> appendResponse(final 
Publisher<? extends Data
                                 byte[] inBytes = new byte[ro.remaining()];
                                 ro.get(inBytes);
 
+                                byte[] processedBytes;
                                 if (isGzip) {
                                     int offset = 0;
-                                    int len = inBytes.length;
-                                    if (!headerSkipped.get()) {
+                                    if (!headerSkipped.getAndSet(true)) {

Review Comment:
   Using getAndSet(true) in a conditional check can lead to race conditions. 
Consider using a more explicit atomic operation or synchronization mechanism to 
ensure thread safety.
   ```suggestion
                                       if (headerSkipped.compareAndSet(false, 
true)) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@shenyu.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to