Keith Wall created KAFKA-17134:
----------------------------------
Summary: Restarting a server (same JVM) configured for OAUTHBEARER
fails with RejectedExecutionException
Key: KAFKA-17134
URL: https://issues.apache.org/jira/browse/KAFKA-17134
Project: Kafka
Issue Type: Bug
Reporter: Keith Wall
If you programmatically restart a server (3.7.1) configured for OAUTHBEARER
{*}within the same JVM{*}, the startup attempt fails with the stack trace given
below.
The issue is that a closed {{VerificationKeyResolver}} gets left behind in the
{{{}OAuthBearerValidatorCallbackHandler.{}}}{{VERIFICATION_KEY_RESOLVER_CACHE}}
after the server is shutdown. On restart, as the server's config is unchanged,
the closed {{VerificationKeyResolver}} gets reused. The
{{ScheduledThreadPoolExecutor}} is already in a closed state so the init call
fails.
A reproducer for this problem is found here:
[https://github.com/k-wall/oauth_bearer_leak/blob/main/src/main/java/OAuthBearerValidatorLeak.java#L51]
The reproducer can be used with this OAuth Server:
{{docker run --rm -p 8080:8080 ghcr.io/navikt/mock-oauth2-server:2.1.8}}
{{Exception in thread "main" org.apache.kafka.common.KafkaException:
org.apache.kafka.common.KafkaException: The OAuth validator configuration
encountered an error when initializing the VerificationKeyResolver}}
{{ at
org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:184)}}
{{ at
org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:192)}}
{{ at
org.apache.kafka.common.network.ChannelBuilders.serverChannelBuilder(ChannelBuilders.java:107)}}
{{ at kafka.network.Processor.<init>(SocketServer.scala:973)}}
{{ at kafka.network.Acceptor.newProcessor(SocketServer.scala:879)}}
{{ at
kafka.network.Acceptor.$anonfun$addProcessors$1(SocketServer.scala:849)}}
{{ at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:190)}}
{{ at kafka.network.Acceptor.addProcessors(SocketServer.scala:848)}}
{{ at kafka.network.DataPlaneAcceptor.configure(SocketServer.scala:523)}}
{{ at
kafka.network.SocketServer.createDataPlaneAcceptorAndProcessors(SocketServer.scala:251)}}
{{ at kafka.network.SocketServer.$anonfun$new$31(SocketServer.scala:175)}}
{{ at
kafka.network.SocketServer.$anonfun$new$31$adapted(SocketServer.scala:175)}}
{{ at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:576)}}
{{ at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:574)}}
{{ at scala.collection.AbstractIterable.foreach(Iterable.scala:933)}}
{{ at kafka.network.SocketServer.<init>(SocketServer.scala:175)}}
{{ at kafka.server.BrokerServer.startup(BrokerServer.scala:255)}}
{{ at
kafka.server.KafkaRaftServer.$anonfun$startup$2(KafkaRaftServer.scala:99)}}
{{ at
kafka.server.KafkaRaftServer.$anonfun$startup$2$adapted(KafkaRaftServer.scala:99)}}
{{ at scala.Option.foreach(Option.scala:437)}}
{{ at kafka.server.KafkaRaftServer.startup(KafkaRaftServer.scala:99)}}
{{ at OAuthBearerValidatorLeak.main(OAuthBearerValidatorLeak.java:51)}}
{{Caused by: org.apache.kafka.common.KafkaException: The OAuth validator
configuration encountered an error when initializing the
VerificationKeyResolver}}
{{ at
org.apache.kafka.common.security.oauthbearer.OAuthBearerValidatorCallbackHandler.init(OAuthBearerValidatorCallbackHandler.java:146)}}
{{ at
org.apache.kafka.common.security.oauthbearer.OAuthBearerValidatorCallbackHandler.configure(OAuthBearerValidatorCallbackHandler.java:136)}}
{{ at
org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:151)}}
{{ ... 21 more}}
{{Caused by: java.util.concurrent.RejectedExecutionException: Task
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@4f66ffc8[Not
completed, task =
java.util.concurrent.Executors$RunnableAdapter@1bc49bc5[Wrapped task =
org.apache.kafka.common.security.oauthbearer.internals.secured.RefreshingHttpsJwks$$Lambda/0x00000001373c7c88@7b6e5c12]]
rejected from
java.util.concurrent.ScheduledThreadPoolExecutor@39e67516[Terminated, pool size
= 0, active threads = 0, queued tasks = 0, completed tasks = 0]}}
{{ at
java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2081)}}
{{ at
java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:841)}}
{{ at
java.base/java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:340)}}
{{ at
java.base/java.util.concurrent.ScheduledThreadPoolExecutor.scheduleAtFixedRate(ScheduledThreadPoolExecutor.java:632)}}
{{ at
java.base/java.util.concurrent.Executors$DelegatedScheduledExecutorService.scheduleAtFixedRate(Executors.java:870)}}
{{ at
org.apache.kafka.common.security.oauthbearer.internals.secured.RefreshingHttpsJwks.init(RefreshingHttpsJwks.java:198)}}
{{ at
org.apache.kafka.common.security.oauthbearer.internals.secured.RefreshingHttpsJwksVerificationKeyResolver.init(RefreshingHttpsJwksVerificationKeyResolver.java:103)}}
{{ at
org.apache.kafka.common.security.oauthbearer.OAuthBearerValidatorCallbackHandler$RefCountingVerificationKeyResolver.init(OAuthBearerValidatorCallbackHandler.java:266)}}
{{ at
org.apache.kafka.common.security.oauthbearer.OAuthBearerValidatorCallbackHandler.init(OAuthBearerValidatorCallbackHandler.java:144)}}
{{ ... 23 more}}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)