This is an automated email from the ASF dual-hosted git repository. liuhongyu pushed a commit to branch fix/fix_mcp_sse in repository https://gitbox.apache.org/repos/asf/shenyu.git
commit 508a37219ed6531f8d9fcac381fa112ae26d31d8 Author: liuhy <[email protected]> AuthorDate: Wed Feb 4 14:06:54 2026 +0800 fix: enhance logging and improve request handling in HTTP client plugins --- .../httpclient/AbstractHttpClientPlugin.java | 5 +++- .../plugin/httpclient/NettyHttpClientPlugin.java | 16 ++++++++++++- .../mcp/server/callback/ShenyuToolCallback.java | 28 ++++++++++++++++++---- .../NonCommittingMcpResponseDecorator.java | 10 ++++++++ 4 files changed, 52 insertions(+), 7 deletions(-) diff --git a/shenyu-plugin/shenyu-plugin-httpclient/src/main/java/org/apache/shenyu/plugin/httpclient/AbstractHttpClientPlugin.java b/shenyu-plugin/shenyu-plugin-httpclient/src/main/java/org/apache/shenyu/plugin/httpclient/AbstractHttpClientPlugin.java index 845585407d..c79816ab58 100644 --- a/shenyu-plugin/shenyu-plugin-httpclient/src/main/java/org/apache/shenyu/plugin/httpclient/AbstractHttpClientPlugin.java +++ b/shenyu-plugin/shenyu-plugin-httpclient/src/main/java/org/apache/shenyu/plugin/httpclient/AbstractHttpClientPlugin.java @@ -74,7 +74,10 @@ public abstract class AbstractHttpClientPlugin<R> implements ShenyuPlugin { final int retryTimes = (int) Optional.ofNullable(exchange.getAttribute(Constants.HTTP_RETRY)).orElse(0); final String retryStrategy = (String) Optional.ofNullable(exchange.getAttribute(Constants.RETRY_STRATEGY)).orElseGet(RetryEnum.CURRENT::getName); LogUtils.debug(LOG, () -> String.format("The request urlPath is: %s, retryTimes is : %s, retryStrategy is : %s", uri, retryTimes, retryStrategy)); - final Mono<R> response = doRequest(exchange, exchange.getRequest().getMethod().name(), uri, exchange.getRequest().getBody()) + final Mono<R> response = doRequest(exchange, + Objects.nonNull(exchange.getRequest().getMethod()) ? exchange.getRequest().getMethod().name() : "UNKNOWN", + uri, + exchange.getRequest().getBody()) .timeout(duration, Mono.error(() -> new TimeoutException("Response took longer than timeout: " + duration))) .doOnError(e -> LOG.error(e.getMessage(), e)); RetryStrategy<R> strategy; diff --git a/shenyu-plugin/shenyu-plugin-httpclient/src/main/java/org/apache/shenyu/plugin/httpclient/NettyHttpClientPlugin.java b/shenyu-plugin/shenyu-plugin-httpclient/src/main/java/org/apache/shenyu/plugin/httpclient/NettyHttpClientPlugin.java index 0724329946..f8efe95964 100644 --- a/shenyu-plugin/shenyu-plugin-httpclient/src/main/java/org/apache/shenyu/plugin/httpclient/NettyHttpClientPlugin.java +++ b/shenyu-plugin/shenyu-plugin-httpclient/src/main/java/org/apache/shenyu/plugin/httpclient/NettyHttpClientPlugin.java @@ -23,6 +23,8 @@ import org.apache.commons.lang3.StringUtils; import org.apache.shenyu.common.constant.Constants; import org.apache.shenyu.common.enums.PluginEnum; import org.apache.shenyu.common.enums.UniqueHeaderEnum; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.NettyDataBuffer; import org.springframework.http.HttpHeaders; @@ -44,6 +46,8 @@ import java.util.Objects; */ public class NettyHttpClientPlugin extends AbstractHttpClientPlugin<HttpClientResponse> { + private static final Logger LOG = LoggerFactory.getLogger(NettyHttpClientPlugin.class); + private final HttpClient httpClient; /** @@ -61,6 +65,9 @@ public class NettyHttpClientPlugin extends AbstractHttpClientPlugin<HttpClientRe ServerHttpRequest request = exchange.getRequest(); final HttpHeaders httpHeaders = new HttpHeaders(request.getHeaders()); this.duplicateHeaders(exchange, httpHeaders, UniqueHeaderEnum.REQ_UNIQUE_HEADER); + if (LOG.isDebugEnabled()) { + LOG.debug("NettyHttpClient request: method={}, uri={}", httpMethod, uri); + } return Mono.from(httpClient.headers(headers -> { httpHeaders.forEach(headers::set); headers.remove(HttpHeaders.HOST); @@ -71,6 +78,9 @@ public class NettyHttpClientPlugin extends AbstractHttpClientPlugin<HttpClientRe }).request(HttpMethod.valueOf(httpMethod)).uri(uri.toASCIIString()) .send((req, nettyOutbound) -> nettyOutbound.send(body.map(dataBuffer -> ((NettyDataBuffer) dataBuffer).getNativeBuffer()))) .responseConnection((res, connection) -> { + if (LOG.isDebugEnabled()) { + LOG.debug("NettyHttpClient response: status={}", res.status().code()); + } exchange.getAttributes().put(Constants.CLIENT_RESPONSE_ATTR, res); exchange.getAttributes().put(Constants.CLIENT_RESPONSE_CONN_ATTR, connection); final ServerHttpResponse response = exchange.getResponse(); @@ -89,7 +99,11 @@ public class NettyHttpClientPlugin extends AbstractHttpClientPlugin<HttpClientRe } else { throw new IllegalStateException("Unable to set status code on response: " + res.status().code() + ", " + response.getClass()); } - response.getHeaders().putAll(headers); + try { + response.getHeaders().putAll(headers); + } catch (UnsupportedOperationException ex) { + LOG.debug("Skip setting response headers because they are read-only: {}", ex.getMessage()); + } return Mono.just(res); })); } diff --git a/shenyu-plugin/shenyu-plugin-mcp-server/src/main/java/org/apache/shenyu/plugin/mcp/server/callback/ShenyuToolCallback.java b/shenyu-plugin/shenyu-plugin-mcp-server/src/main/java/org/apache/shenyu/plugin/mcp/server/callback/ShenyuToolCallback.java index e77649d809..a17b403234 100644 --- a/shenyu-plugin/shenyu-plugin-mcp-server/src/main/java/org/apache/shenyu/plugin/mcp/server/callback/ShenyuToolCallback.java +++ b/shenyu-plugin/shenyu-plugin-mcp-server/src/main/java/org/apache/shenyu/plugin/mcp/server/callback/ShenyuToolCallback.java @@ -215,11 +215,17 @@ public class ShenyuToolCallback implements ToolCallback { final String configStr, final String input) { + final RequestConfigHelper configHelper = new RequestConfigHelper(configStr); + final String toolMethod = configHelper.getMethod(); + final String toolUrl = configHelper.getUrlTemplate(); + final JsonObject requestTemplate = configHelper.getRequestTemplate(); + final String toolTimeout = requestTemplate.has("timeout") ? requestTemplate.get("timeout").getAsString() : "default"; final CompletableFuture<String> responseFuture = new CompletableFuture<>(); final ServerWebExchange decoratedExchange = buildDecoratedExchange( originExchange, responseFuture, sessionId, configStr, input); - LOG.debug("Executing plugin chain for session: {}", sessionId); + LOG.debug("Executing plugin chain for session: {} (method: {}, url: {}, timeoutMs: {})", + sessionId, toolMethod, toolUrl, toolTimeout); // Check if this is a temporary session that needs cleanup final boolean isTemporarySession = sessionId.startsWith("temp_"); @@ -233,7 +239,12 @@ public class ShenyuToolCallback implements ToolCallback { responseFuture.completeExceptionally(e); } }) - .doOnSuccess(v -> LOG.debug("Plugin chain completed successfully for session: {}", sessionId)) + .doOnSuccess(v -> { + LOG.debug("Plugin chain completed successfully for session: {}", sessionId); + if (!responseFuture.isDone()) { + responseFuture.complete(""); + } + }) .doOnCancel(() -> { LOG.warn("Plugin chain execution cancelled for session: {}", sessionId); if (!responseFuture.isDone()) { @@ -352,7 +363,11 @@ public class ShenyuToolCallback implements ToolCallback { final String path = RequestConfigHelper.buildPath(urlTemplate, argsPosition, inputJson); // Build body with parameter formatting (only format body parameters that need it) - final JsonObject bodyJson = buildFormattedBodyJson(argsToJsonBody, argsPosition, inputJson); + JsonObject bodyJson = buildFormattedBodyJson(argsToJsonBody, argsPosition, inputJson); + // Fallback: if argsToJsonBody is false but input has content for body methods, use the raw input as body + if (!argsToJsonBody && bodyJson.size() == 0 && isRequestBodyMethod(method) && inputJson.size() > 0) { + bodyJson = inputJson.deepCopy(); + } return new RequestConfig(method, path, bodyJson, requestTemplate, argsToJsonBody, inputJson); } @@ -603,8 +618,11 @@ public class ShenyuToolCallback implements ToolCallback { */ private ServerWebExchange handleRequestBody(final ServerWebExchange decoratedExchange, final RequestConfig requestConfig) { - if (isRequestBodyMethod(requestConfig.getMethod()) && requestConfig.getBodyJson().size() > 0) { - return new BodyWriterExchange(decoratedExchange, requestConfig.getBodyJson().toString()); + if (isRequestBodyMethod(requestConfig.getMethod())) { + String body = requestConfig.getBodyJson().size() > 0 + ? requestConfig.getBodyJson().toString() + : "{}"; + return new BodyWriterExchange(decoratedExchange, body); } return decoratedExchange; } diff --git a/shenyu-plugin/shenyu-plugin-mcp-server/src/main/java/org/apache/shenyu/plugin/mcp/server/response/NonCommittingMcpResponseDecorator.java b/shenyu-plugin/shenyu-plugin-mcp-server/src/main/java/org/apache/shenyu/plugin/mcp/server/response/NonCommittingMcpResponseDecorator.java index de39a2ab9e..233f07e77a 100644 --- a/shenyu-plugin/shenyu-plugin-mcp-server/src/main/java/org/apache/shenyu/plugin/mcp/server/response/NonCommittingMcpResponseDecorator.java +++ b/shenyu-plugin/shenyu-plugin-mcp-server/src/main/java/org/apache/shenyu/plugin/mcp/server/response/NonCommittingMcpResponseDecorator.java @@ -102,6 +102,16 @@ public class NonCommittingMcpResponseDecorator extends ServerHttpResponseDecorat .doOnError(error -> handleProcessingError("writeAndFlushWith", error)); } + @Override + public Mono<Void> setComplete() { + LOG.debug("Completing response for session: {}", sessionId); + if (!responseFuture.isDone()) { + responseFuture.complete(processResponse("")); + } + // Non-committing: do not write to the underlying response. + return Mono.empty(); + } + /** * Processes the collected response data buffers and completes the response future. *
