Copilot commented on code in PR #6055: URL: https://github.com/apache/shenyu/pull/6055#discussion_r2212681534
########## shenyu-plugin/shenyu-plugin-ai/shenyu-plugin-ai-token-limiter/src/main/java/org/apache/shenyu/plugin/ai/token/limiter/AiTokenLimiterPlugin.java: ########## @@ -228,42 +242,78 @@ private Flux<? extends DataBuffer> appendResponse(final Publisher<? extends Data byte[] inBytes = new byte[ro.remaining()]; ro.get(inBytes); + byte[] processedBytes; if (isGzip) { int offset = 0; - int len = inBytes.length; - if (!headerSkipped.get()) { + if (!headerSkipped.getAndSet(true)) { offset = skipGzipHeader(inBytes); - headerSkipped.set(true); } - inflater.setInput(inBytes, offset, len - offset); + inflater.setInput(inBytes, offset, inBytes.length - offset); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); try { int cnt; while ((cnt = inflater.inflate(outBuf)) > 0) { - writer.write(ByteBuffer.wrap(outBuf, 0, cnt)); + baos.write(outBuf, 0, cnt); } } catch (DataFormatException ex) { - LOG.error("inflater decompression failed", ex); + LOG.error("Inflater decompression failed", ex); } + processedBytes = baos.toByteArray(); } else { - writer.write(ro); + processedBytes = inBytes; + } + String chunk = new String(processedBytes, StandardCharsets.UTF_8); + for (String line : chunk.split("\\r?\\n")) { + if (!line.startsWith("data:")) { + continue; + } + String payload = line.substring("data:".length()).trim(); + if (payload.isEmpty() || "[DONE]".equals(payload)) { + continue; + } + if (!payload.startsWith("{")) { + continue; + } + try { + JsonNode node = MAPPER.readTree(payload); + JsonNode usage = node.get(Constants.USAGE); + if (Objects.nonNull(usage) && usage.has(Constants.COMPLETION_TOKENS)) { + long c = usage.get(Constants.COMPLETION_TOKENS).asLong(); + tokensRecorder.accept(c); + streamingUsageRecorded.set(true); + } + } catch (Exception e) { + LOG.error("parse ai resp error", e); + } } + writer.write(ByteBuffer.wrap(processedBytes)); }); } catch (Exception e) { LOG.error("read dataBuffer error", e); } }) .doFinally(signal -> { - // release inflater if (Objects.nonNull(inflater)) { inflater.end(); } - String responseBody = writer.output(); - AiModel aiModel = exchange.getAttribute(Constants.AI_MODEL); - long tokens = Objects.requireNonNull(aiModel).getCompletionTokens(responseBody); - tokensRecorder.accept(tokens); + if (!streamingUsageRecorded.get()) { + String sse = writer.output(); + long usageTokens = extractUsageTokensFromSse(sse); + tokensRecorder.accept(usageTokens); + } }); } + private long extractUsageTokensFromSse(final String sse) { + Pattern p = Pattern.compile("\"completion_tokens\"\\s*:\\s*(\\d+)"); + Matcher m = p.matcher(sse); Review Comment: The regex pattern is compiled on every method call. Consider making the Pattern a static final field to improve performance. ```suggestion private static final Pattern COMPLETION_TOKENS_PATTERN = Pattern.compile("\"completion_tokens\"\\s*:\\s*(\\d+)"); private long extractUsageTokensFromSse(final String sse) { Matcher m = COMPLETION_TOKENS_PATTERN.matcher(sse); ``` ########## shenyu-plugin/shenyu-plugin-ai/shenyu-plugin-ai-token-limiter/src/main/java/org/apache/shenyu/plugin/ai/token/limiter/AiTokenLimiterPlugin.java: ########## @@ -228,42 +242,78 @@ private Flux<? extends DataBuffer> appendResponse(final Publisher<? extends Data byte[] inBytes = new byte[ro.remaining()]; ro.get(inBytes); + byte[] processedBytes; if (isGzip) { int offset = 0; - int len = inBytes.length; - if (!headerSkipped.get()) { + if (!headerSkipped.getAndSet(true)) { offset = skipGzipHeader(inBytes); - headerSkipped.set(true); } - inflater.setInput(inBytes, offset, len - offset); + inflater.setInput(inBytes, offset, inBytes.length - offset); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); try { int cnt; while ((cnt = inflater.inflate(outBuf)) > 0) { - writer.write(ByteBuffer.wrap(outBuf, 0, cnt)); + baos.write(outBuf, 0, cnt); } } catch (DataFormatException ex) { - LOG.error("inflater decompression failed", ex); + LOG.error("Inflater decompression failed", ex); } + processedBytes = baos.toByteArray(); } else { - writer.write(ro); + processedBytes = inBytes; + } + String chunk = new String(processedBytes, StandardCharsets.UTF_8); + for (String line : chunk.split("\\r?\\n")) { + if (!line.startsWith("data:")) { + continue; + } + String payload = line.substring("data:".length()).trim(); + if (payload.isEmpty() || "[DONE]".equals(payload)) { + continue; + } + if (!payload.startsWith("{")) { + continue; + } + try { + JsonNode node = MAPPER.readTree(payload); + JsonNode usage = node.get(Constants.USAGE); + if (Objects.nonNull(usage) && usage.has(Constants.COMPLETION_TOKENS)) { + long c = usage.get(Constants.COMPLETION_TOKENS).asLong(); + tokensRecorder.accept(c); + streamingUsageRecorded.set(true); + } + } catch (Exception e) { + LOG.error("parse ai resp error", e); + } } + writer.write(ByteBuffer.wrap(processedBytes)); }); } catch (Exception e) { LOG.error("read dataBuffer error", e); Review Comment: Catching generic Exception is too broad. Consider catching specific exceptions like JsonProcessingException or IOException to handle different error scenarios appropriately. ```suggestion } catch (JsonProcessingException e) { LOG.error("JSON processing error while parsing AI response", e); } catch (IOException e) { LOG.error("IO error while parsing AI response", e); } } writer.write(ByteBuffer.wrap(processedBytes)); }); } catch (IOException e) { LOG.error("IO error while reading dataBuffer", e); ``` ########## shenyu-plugin/shenyu-plugin-ai/shenyu-plugin-ai-token-limiter/src/main/java/org/apache/shenyu/plugin/ai/token/limiter/AiTokenLimiterPlugin.java: ########## @@ -228,42 +242,78 @@ private Flux<? extends DataBuffer> appendResponse(final Publisher<? extends Data byte[] inBytes = new byte[ro.remaining()]; ro.get(inBytes); + byte[] processedBytes; if (isGzip) { int offset = 0; - int len = inBytes.length; - if (!headerSkipped.get()) { + if (!headerSkipped.getAndSet(true)) { offset = skipGzipHeader(inBytes); - headerSkipped.set(true); } - inflater.setInput(inBytes, offset, len - offset); + inflater.setInput(inBytes, offset, inBytes.length - offset); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); try { int cnt; while ((cnt = inflater.inflate(outBuf)) > 0) { - writer.write(ByteBuffer.wrap(outBuf, 0, cnt)); + baos.write(outBuf, 0, cnt); } } catch (DataFormatException ex) { - LOG.error("inflater decompression failed", ex); + LOG.error("Inflater decompression failed", ex); } + processedBytes = baos.toByteArray(); } else { - writer.write(ro); + processedBytes = inBytes; + } + String chunk = new String(processedBytes, StandardCharsets.UTF_8); + for (String line : chunk.split("\\r?\\n")) { + if (!line.startsWith("data:")) { + continue; + } + String payload = line.substring("data:".length()).trim(); + if (payload.isEmpty() || "[DONE]".equals(payload)) { + continue; + } + if (!payload.startsWith("{")) { + continue; + } + try { + JsonNode node = MAPPER.readTree(payload); + JsonNode usage = node.get(Constants.USAGE); + if (Objects.nonNull(usage) && usage.has(Constants.COMPLETION_TOKENS)) { + long c = usage.get(Constants.COMPLETION_TOKENS).asLong(); + tokensRecorder.accept(c); + streamingUsageRecorded.set(true); + } + } catch (Exception e) { + LOG.error("parse ai resp error", e); Review Comment: The error message 'parse ai resp error' is unclear and uses abbreviations. Consider a more descriptive message like 'Failed to parse AI response JSON payload'. ```suggestion LOG.error("Failed to parse AI response JSON payload", e); ``` ########## shenyu-plugin/shenyu-plugin-ai/shenyu-plugin-ai-token-limiter/src/main/java/org/apache/shenyu/plugin/ai/token/limiter/AiTokenLimiterPlugin.java: ########## @@ -228,42 +242,78 @@ private Flux<? extends DataBuffer> appendResponse(final Publisher<? extends Data byte[] inBytes = new byte[ro.remaining()]; ro.get(inBytes); + byte[] processedBytes; if (isGzip) { int offset = 0; - int len = inBytes.length; - if (!headerSkipped.get()) { + if (!headerSkipped.getAndSet(true)) { Review Comment: Using getAndSet(true) in a conditional check can lead to race conditions. Consider using a more explicit atomic operation or synchronization mechanism to ensure thread safety. ```suggestion if (headerSkipped.compareAndSet(false, true)) { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@shenyu.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org