Copilot commented on code in PR #7894:
URL: https://github.com/apache/ignite-3/pull/7894#discussion_r3000929632
##########
modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java:
##########
@@ -667,24 +670,33 @@ private void writeHandshakeResponse(BitSet
mutuallySupportedFeatures, ClientMess
throw new UnsupportedAuthenticationTypeException("Unsupported
authentication type: " + authnType);
}
- private void writeAndFlush(ClientMessagePacker packer,
ChannelHandlerContext ctx) {
+ private boolean writeAndFlush(ClientMessagePacker packer,
ChannelHandlerContext ctx, ResponseWriteGuard guard) {
var buf = packer.getBuffer();
int bytes = buf.readableBytes();
try {
- // writeAndFlush releases pooled buffer.
- ctx.writeAndFlush(buf);
+ // write releases pooled buffer.
+ if (!guard.write(ctx, buf)) {
+ // Response for this request has already been sent.
+ // Example: exception after response write, catch block in
processOperation tries to write an error
+ // => duplicate response. Guard prevents this.
+ packer.close();
+ return false;
+ }
+
+ ctx.flush();
} catch (Throwable t) {
packer.close();
throw t;
}
metrics.bytesSentAdd(bytes);
+ return true;
}
- private void writeAndFlushWithMagic(ClientMessagePacker packer,
ChannelHandlerContext ctx) {
+ private void writeAndFlushWithMagic(ClientMessagePacker packer,
ChannelHandlerContext ctx, ResponseWriteGuard guard) {
ctx.write(Unpooled.wrappedBuffer(ClientMessageCommon.MAGIC_BYTES));
- writeAndFlush(packer, ctx);
+ writeAndFlush(packer, ctx, guard);
metrics.bytesSentAdd(ClientMessageCommon.MAGIC_BYTES.length);
}
Review Comment:
`writeAndFlushWithMagic` writes MAGIC_BYTES to the channel *before* checking
`ResponseWriteGuard`. If `guard.write(...)` rejects the actual payload write,
the magic buffer remains queued (and may be flushed by a later write),
producing a corrupted/partial response on the wire and also overcounting
`bytesSent`.
Guarding needs to be applied to the *entire* response (magic + payload)
atomically (e.g., check/flip the guard before any writes, or write a single
composite buffer containing magic + payload), and
`bytesSentAdd(MAGIC_BYTES.length)` should only happen when the response was
actually written.
##########
modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java:
##########
@@ -862,22 +884,33 @@ private void processOperation(ChannelHandlerContext ctx,
ClientMessageUnpacker i
partitionOperationsExecutor.execute(() -> {
try {
- processOperationInternal(ctx, in, requestId0, opCode0);
+ processOperationInternal(ctx, in, requestId0, opCode0,
guard);
} catch (Throwable t) {
in.close();
- writeError(requestId0, opCode0, t, ctx, false);
+ writeError(requestId0, opCode0, t, ctx, false, guard);
metrics.requestsFailedIncrement();
}
});
} else {
- processOperationInternal(ctx, in, requestId, opCode);
+ processOperationInternal(ctx, in, requestId, opCode, guard);
}
} catch (Throwable t) {
in.close();
- writeError(requestId, opCode, t, ctx, false);
+ // If we failed to read the request ID, we cannot send a valid
error response.
+ // Close the connection instead.
+ if (requestId == -1) {
+ LOG.warn("Failed to read client request, closing connection
[connectionId={}, remoteAddress={}]",
+ t, connectionId, ctx.channel().remoteAddress());
+
Review Comment:
In the `requestId == -1` error path, `processOperation` returns early after
`requestsActiveIncrement()` without a corresponding
`requestsActiveDecrement()`. This will leak the "active requests" metric for
malformed/partial messages.
Consider ensuring `requestsActiveDecrement()` is always executed on all exit
paths (e.g., decrement before returning here or restructure with a `finally`).
```suggestion
metrics.requestsActiveDecrement();
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]