Re: [PR] Fix some SSE netty usage issues [pinot]
gortiz closed pull request #17667: Fix some SSE netty usage issues URL: https://github.com/apache/pinot/pull/17667 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Fix some SSE netty usage issues [pinot]
gortiz commented on PR #17667: URL: https://github.com/apache/pinot/pull/17667#issuecomment-3891210684 Already fixed in https://github.com/apache/pinot/pull/17684 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] - To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
Re: [PR] Fix some SSE netty usage issues [pinot]
gortiz commented on code in PR #17667:
URL: https://github.com/apache/pinot/pull/17667#discussion_r2783842155
##
pinot-core/src/main/java/org/apache/pinot/core/transport/DataTableHandler.java:
##
@@ -92,12 +92,18 @@ protected void channelRead0(ChannelHandlerContext ctx,
ByteBuf msg) {
LOGGER.error("Caught exception while deserializing data table of size:
{} from server: {}", responseSize,
_serverRoutingInstance, e);
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.DATA_TABLE_DESERIALIZATION_EXCEPTIONS,
1);
+ // Propagate so the pipeline closes the channel and channelInactive runs
(markServerDown), otherwise
+ // the query would hang waiting for a response that will never be
delivered.
+ ctx.fireExceptionCaught(e);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
LOGGER.error("Caught exception while handling response from server: {}",
_serverRoutingInstance, cause);
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.RESPONSE_FETCH_EXCEPTIONS, 1);
+// Propagate so the pipeline can close the channel and release resources;
otherwise the channel
+// may be left in an inconsistent state and leak.
+ctx.fireExceptionCaught(cause);
Review Comment:
In case we run out of memory upstream, this method is called. We need to
ensure that memory is released:
```
ERROR [DataTableHandler] [nioEventLoopGroup-x-y] Caught exception while
handling response from server: ZZ
io.netty.util.internal.OutOfDirectMemoryError: failed to allocate X byte(s)
of direct memory (used: Y, max: Z)
at
io.netty.util.internal.PlatformDependent.incrementMemoryCounter(PlatformDependent.java:880)
at
io.netty.util.internal.PlatformDependent.allocateDirectNoCleaner(PlatformDependent.java:809)
at
io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:718)
at
io.netty.buffer.PoolArena$DirectArena.newUnpooledChunk(PoolArena.java:707)
at io.netty.buffer.PoolArena.allocateHuge(PoolArena.java:224)
at io.netty.buffer.PoolArena.allocate(PoolArena.java:142)
at io.netty.buffer.PoolArena.reallocate(PoolArena.java:317)
at io.netty.buffer.PooledByteBuf.capacity(PooledByteBuf.java:123)
at
io.netty.buffer.AbstractByteBuf.ensureWritable0(AbstractByteBuf.java:305)
at
io.netty.buffer.AbstractByteBuf.ensureWritable(AbstractByteBuf.java:280)
at
io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1103)
at
io.netty.handler.codec.ByteToMessageDecoder$1.cumulate(ByteToMessageDecoder.java:105)
at
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:288)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1519)
at
io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1377)
at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1428)
at
io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:530)
at
io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:469)
at
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1357)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:868)
at
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:796)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:732)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:658)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java
Re: [PR] Fix some SSE netty usage issues [pinot]
gortiz commented on code in PR #17667:
URL: https://github.com/apache/pinot/pull/17667#discussion_r2783815287
##
pinot-core/src/main/java/org/apache/pinot/core/transport/DataTableHandler.java:
##
@@ -92,12 +92,18 @@ protected void channelRead0(ChannelHandlerContext ctx,
ByteBuf msg) {
LOGGER.error("Caught exception while deserializing data table of size:
{} from server: {}", responseSize,
_serverRoutingInstance, e);
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.DATA_TABLE_DESERIALIZATION_EXCEPTIONS,
1);
+ // Propagate so the pipeline closes the channel and channelInactive runs
(markServerDown), otherwise
+ // the query would hang waiting for a response that will never be
delivered.
+ ctx.fireExceptionCaught(e);
Review Comment:
Maybe isntead of closing the channel here we want to tell the router that
the query has finished incorrectly. But we need to do something beyond dropping
the message we weren't able to parse
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Fix some SSE netty usage issues [pinot]
codecov-commenter commented on PR #17667: URL: https://github.com/apache/pinot/pull/17667#issuecomment-3871935642 ## [Codecov](https://app.codecov.io/gh/apache/pinot/pull/17667?dropdown=coverage&src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) Report :x: Patch coverage is `83.3%` with `3 lines` in your changes missing coverage. Please review. :white_check_mark: Project coverage is 55.57%. Comparing base ([`4ef8e3e`](https://app.codecov.io/gh/apache/pinot/commit/4ef8e3eafd14a3c656689afe910bc3a3c615773d?dropdown=coverage&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)) to head ([`278effc`](https://app.codecov.io/gh/apache/pinot/commit/278effc5de02cd36438c4334dd274c92937be75a?dropdown=coverage&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)). :warning: Report is 1 commits behind head on master. | [Files with missing lines](https://app.codecov.io/gh/apache/pinot/pull/17667?dropdown=coverage&src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Patch % | Lines | |---|---|---| | [...e/pinot/core/transport/InstanceRequestHandler.java](https://app.codecov.io/gh/apache/pinot/pull/17667?src=pr&el=tree&filepath=pinot-core%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fpinot%2Fcore%2Ftransport%2FInstanceRequestHandler.java&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS90cmFuc3BvcnQvSW5zdGFuY2VSZXF1ZXN0SGFuZGxlci5qYXZh) | 0.00% | [2 Missing :warning: ](https://app.codecov.io/gh/apache/pinot/pull/17667?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | | [.../apache/pinot/core/transport/DataTableHandler.java](https://app.codecov.io/gh/apache/pinot/pull/17667?src=pr&el=tree&filepath=pinot-core%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fpinot%2Fcore%2Ftransport%2FDataTableHandler.java&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS90cmFuc3BvcnQvRGF0YVRhYmxlSGFuZGxlci5qYXZh) | 50.00% | [1 Missing :warning: ](https://app.codecov.io/gh/apache/pinot/pull/17667?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | > :exclamation: There is a different number of reports uploaded between BASE (4ef8e3e) and HEAD (278effc). Click for more details. > > HEAD has 36 uploads less than BASE > >| Flag | BASE (4ef8e3e) | HEAD (278effc) | >|--|--|--| >|java-21|5|1| >|unittests1|2|1| >|unittests|4|1| >|temurin|10|1| >|java-11|5|0| >|unittests2|2|0| >|integration|6|0| >|integration2|2|0| >|integration1|2|0| >|custom-integration1|2|0| > Additional details and impacted files ```diff @@ Coverage Diff @@ ## master #17667 +/- ## - Coverage 63.25% 55.57% -7.68% + Complexity 1499 720 -779 Files 3174 2475 -699 Lines190323 140253 -50070 Branches 2908022341-6739 - Hits 12038177947 -42434 + Misses6060655730-4876 + Partials 9336 6576-2760 ``` | [Flag](https://app.codecov.io/gh/apache/pinot/pull/17667/flags?src=pr&el=flags&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Coverage Δ | | |---|---|---| | [custom-integration1](https://app.codecov.io/gh/apache/pinot/pull/17667/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | | | [integration](https://app.codecov.io/gh/apache/pinot/pull/17667/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | | | [integration1](https://app.codecov.io/gh/apache/pinot/pull/17667/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | | | [integration2](https://app.codecov.io/gh/apache/pinot/pull/17667/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | | | [java-11](https://app.codecov.io/gh/apache/pinot/pull/17667/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
Re: [PR] Fix some SSE netty usage issues [pinot]
gortiz commented on code in PR #17667:
URL: https://github.com/apache/pinot/pull/17667#discussion_r2782615298
##
pinot-core/src/main/java/org/apache/pinot/core/transport/DirectOOMHandler.java:
##
@@ -70,6 +70,10 @@ public void channelInactive(ChannelHandlerContext ctx) {
if (_silentShutDown) {
return;
}
+// Remove from _allChannels on server side so we don't leak channel
references when brokers disconnect
+if (_allChannels != null) {
+ _allChannels.remove(ctx.channel());
+}
Review Comment:
That is ok. If we are in the middle of a shutdown the JVM process will
finish anyway
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
-
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
Re: [PR] Fix some SSE netty usage issues [pinot]
Copilot commented on code in PR #17667:
URL: https://github.com/apache/pinot/pull/17667#discussion_r2782389263
##
pinot-core/src/main/java/org/apache/pinot/core/transport/DirectOOMHandler.java:
##
@@ -70,6 +70,10 @@ public void channelInactive(ChannelHandlerContext ctx) {
if (_silentShutDown) {
return;
}
+// Remove from _allChannels on server side so we don't leak channel
references when brokers disconnect
+if (_allChannels != null) {
+ _allChannels.remove(ctx.channel());
+}
Review Comment:
The early return on `_silentShutDown` prevents removing the channel from
`_allChannels`, which can still leak channel references in the silent shutdown
path. Move the `_allChannels.remove(ctx.channel())` block before the
`_silentShutDown` check (or ensure silent shutdown performs equivalent cleanup).
##
pinot-core/src/main/java/org/apache/pinot/core/transport/QueryServer.java:
##
@@ -120,6 +120,11 @@ public void start() {
PooledByteBufAllocator bufAllocator = PooledByteBufAllocator.DEFAULT;
PooledByteBufAllocatorMetric metric = bufAllocator.metric();
ServerMetrics metrics = ServerMetrics.get();
+ // Notice here we assume there is a single QueryServer per JVM. If that
is not true (ie quickstarts with multiple
+ // servers), we:
+ // 1. Will have one allocator per server, which may cause higher memory
usage, and may cause OOM.
+ // 2. Will have multiple sets of gauges for each allocator, which means
the last one will win and override the
Review Comment:
This new comment warns about a problematic multi-server-in-one-JVM behavior,
but the code continues to create/register per `start()`. Either enforce the
intended singleton behavior (shared allocator + one-time gauge registration) or
reword the comment to avoid implying the method already mitigates the issue.
```suggestion
// Note: this method does not enforce a single QueryServer per JVM. If
multiple servers are started in the same
// JVM (e.g., quickstarts with multiple servers), each call to start()
will:
// 1. Create a separate allocator per server, which may cause higher
memory usage and may lead to OOM.
// 2. Register a separate set of gauges for each allocator, where the
last registration will win and override the
```
##
pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java:
##
@@ -85,6 +85,7 @@ public class ServerChannels {
private final BrokerMetrics _brokerMetrics = BrokerMetrics.get();
private final ConcurrentHashMap
_serverToChannelMap = new ConcurrentHashMap<>();
+ private final PooledByteBufAllocator _bufAllocatorWithLimits;
Review Comment:
The comment explicitly calls out multi-broker-in-one-JVM scenarios, but the
implementation still constructs one allocator per `ServerChannels` instance and
re-registers global gauges (last one wins). If multi-broker JVMs are expected
(e.g., quickstarts), consider making the allocator (and gauge registration)
truly JVM-singleton (e.g., static lazy holder + one-time registration guard),
or adjust the comment to reflect the actual scope (per-ServerChannels, not
per-JVM).
##
pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java:
##
@@ -125,6 +126,28 @@ public ServerChannels(QueryRouter queryRouter, @Nullable
NettyConfig nettyConfig
_queryRouter = queryRouter;
_tlsConfig = tlsConfig;
_threadAccountant = threadAccountant;
+
+
+// Notice here we assume there is a single ServerChannels per JVM. If that
is not true (ie quickstarts with multiple
+// brokers), we:
+// 1. Will have one allocator per broker, which may cause higher memory
usage, and may cause OOM.
+// 2. Will have multiple sets of gauges for each allocator, which means
the last one will win and override the
+//previous ones.
+
+// Create a single shared allocator with limits for all channels
+PooledByteBufAllocator defaultAllocator = PooledByteBufAllocator.DEFAULT;
+PooledByteBufAllocatorMetric metric = defaultAllocator.metric();
+_bufAllocatorWithLimits =
PooledByteBufAllocatorWithLimits.getBufferAllocatorWithLimits(metric);
+PooledByteBufAllocatorMetric bufAllocatorMetric =
_bufAllocatorWithLimits.metric();
+// Register metrics for the shared allocator
+
_brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.NETTY_POOLED_USED_DIRECT_MEMORY,
bufAllocatorMetric::usedDirectMemory);
Review Comment:
The comment explicitly calls out multi-broker-in-one-JVM scenarios, but the
implementation still constructs one allocator per `ServerChannels` instance and
re-registers global gauges (last one wins). If multi-broker JVMs are expected
(e.g., quickstarts), consider making the allocator (and gauge registration)
truly JVM-singleton (e.g., static lazy holder + one-time registration guard),
or adjust the comment to reflect the actu
