[ https://issues.apache.org/jira/browse/KAFKA-7631?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16826229#comment-16826229 ]
koert kuipers commented on KAFKA-7631: -------------------------------------- i think i ran into this. brokers are kafka 2.2.0. my brokers use GSSAPI/kerberos, but i have also have SCRAM enabled for clients that use delegation tokens: sasl.mechanism.inter.broker.protocol=GSSAPI sasl.enabled.mechanisms=GSSAPI,SCRAM-SHA-256,SCRAM-SHA-512 my jaas.conf for brokers has com.sun.security.auth.module.Krb5LoginModule for KafkaClient kafka server log shows: {code} [2019-04-25 12:23:48,108] WARN [SocketServer brokerId=xx] Unexpected error from /x.x.x.x; closing connection (org.apache.kafka.common.network.Selector) java.lang.NullPointerException at org.apache.kafka.common.security.authenticator.SaslServerAuthenticator.handleSaslToken(SaslServerAuthenticator.java:450) at org.apache.kafka.common.security.authenticator.SaslServerAuthenticator.authenticate(SaslServerAuthenticator.java:290) at org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:173) at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:536) at org.apache.kafka.common.network.Selector.poll(Selector.java:472) at kafka.network.Processor.poll(SocketServer.scala:830) at kafka.network.Processor.run(SocketServer.scala:730) at java.lang.Thread.run(Thread.java:748) {code} my client is spark structured streaming driver, which in spark 3 has kafka delegation support, which is what i am testing. i see here: {code} 2019-04-25 12:23:48 DEBUG org.apache.kafka.common.security.authenticator.SaslClientAuthenticator: Set SASL client state to SEND_HANDSHAKE_REQUEST 2019-04-25 12:23:48 DEBUG org.apache.kafka.common.security.authenticator.SaslClientAuthenticator: Set SASL client state to RECEIVE_HANDSHAKE_RESPONSE 2019-04-25 12:23:48 DEBUG org.apache.kafka.common.security.authenticator.SaslClientAuthenticator: Set SASL client state to INITIAL 2019-04-25 12:23:48 DEBUG org.apache.kafka.common.security.scram.internals.ScramSaslClient: Setting SASL/SCRAM_SHA_512 client state to RECEIVE_SERVER_FIRST_MESSAGE 2019-04-25 12:23:48 DEBUG org.apache.kafka.common.security.authenticator.SaslClientAuthenticator: Set SASL client state to INTERMEDIATE 2019-04-25 12:23:48 DEBUG org.apache.kafka.common.network.Selector: [Consumer clientId=x, groupId=x] Connection with x/x.x.x.x disconnected java.io.EOFException at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:96) at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.receiveResponseOrToken(SaslClientAuthenticator.java:407) at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.receiveKafkaResponse(SaslClientAuthenticator.java:497) at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.receiveToken(SaslClientAuthenticator.java:435) at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.authenticate(SaslClientAuthenticator.java:259) at org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:173) at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:536) at org.apache.kafka.common.network.Selector.poll(Selector.java:472) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:535) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:227) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:161) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:245) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:317) at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1226) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1195) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1135) at org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$fetchLatestOffsets$2(KafkaOffsetReader.scala:217) at org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$withRetriesWithoutInterrupt$1(KafkaOffsetReader.scala:358) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77) at org.apache.spark.sql.kafka010.KafkaOffsetReader.withRetriesWithoutInterrupt(KafkaOffsetReader.scala:357) at org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$fetchLatestOffsets$1(KafkaOffsetReader.scala:215) at org.apache.spark.sql.kafka010.KafkaOffsetReader.runUninterruptibly(KafkaOffsetReader.scala:325) at org.apache.spark.sql.kafka010.KafkaOffsetReader.fetchLatestOffsets(KafkaOffsetReader.scala:215) at org.apache.spark.sql.kafka010.KafkaMicroBatchStream.$anonfun$getOrCreateInitialPartitionOffsets$1(KafkaMicroBatchStream.scala:198) at scala.Option.getOrElse(Option.scala:138) at org.apache.spark.sql.kafka010.KafkaMicroBatchStream.getOrCreateInitialPartitionOffsets(KafkaMicroBatchStream.scala:193) at org.apache.spark.sql.kafka010.KafkaMicroBatchStream.initialOffset(KafkaMicroBatchStream.scala:81) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$6(MicroBatchExecution.scala:365) at scala.Option.getOrElse(Option.scala:138) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$4(MicroBatchExecution.scala:365) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:327) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:325) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$2(MicroBatchExecution.scala:362) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:237) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at scala.collection.TraversableLike.map(TraversableLike.scala:237) at scala.collection.TraversableLike.map$(TraversableLike.scala:230) at scala.collection.AbstractTraversable.map(Traversable.scala:108) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$1(MicroBatchExecution.scala:354) at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:577) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.constructNextBatch(MicroBatchExecution.scala:350) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:195) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:327) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:325) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:178) at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:172) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:331) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:243) {code} > NullPointerException when SCRAM is allowed bu ScramLoginModule is not in > broker's jaas.conf > ------------------------------------------------------------------------------------------- > > Key: KAFKA-7631 > URL: https://issues.apache.org/jira/browse/KAFKA-7631 > Project: Kafka > Issue Type: Improvement > Components: security > Affects Versions: 2.0.0 > Reporter: Andras Beni > Assignee: Viktor Somogyi-Vass > Priority: Minor > > When user wants to use delegation tokens and lists {{SCRAM}} in > {{sasl.enabled.mechanisms}}, but does not add {{ScramLoginModule}} to > broker's JAAS configuration, a null pointer exception is thrown on broker > side and the connection is closed. > Meaningful error message should be logged and sent back to the client. > {code} > java.lang.NullPointerException > at > org.apache.kafka.common.security.authenticator.SaslServerAuthenticator.handleSaslToken(SaslServerAuthenticator.java:376) > at > org.apache.kafka.common.security.authenticator.SaslServerAuthenticator.authenticate(SaslServerAuthenticator.java:262) > at > org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:127) > at > org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:489) > at org.apache.kafka.common.network.Selector.poll(Selector.java:427) > at kafka.network.Processor.poll(SocketServer.scala:679) > at kafka.network.Processor.run(SocketServer.scala:584) > at java.lang.Thread.run(Thread.java:748) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)