Hello, I am trying to set up Kafka with custom authentication & authorisation. The authentication is PLAIN, i.e. user/pass.
On a single broker, this worked fine (i.e. users were able to authenticate correctly), but in a multi-broker set-up I am struggling to get the interbroker communication to work correctly. I am seeing errors of this nature (see especially the last line), [2019-10-16 12:59:52,909] INFO Authorising! (com.gsacapital.dsg.authorisation.SimpleAuthorisation) [2019-10-16 12:59:52,910] INFO Context: User:admin trying ClusterAction on Cluster:LITERAL:kafka-cluster (com.gsacapital.dsg.authorisation.SimpleAuthorisation) [2019-10-16 12:59:52,910] INFO Principal: User:admin (com.gsacapital.dsg.authorisation.SimpleAuthorisation) [2019-10-16 12:59:52,910] INFO Detected superUser. Permission granted. (com.gsacapital.dsg.authorisation.SimpleAuthorisation) [2019-10-16 12:59:52,921] DEBUG [ReplicaFetcher replicaId=1001, leaderId=1003, fetcherId=0] Node 1003 sent an incremental fetch response for session 848569259 with 0 response partition(s), 17 implied partition(s) (org.apache.kafka.clients.FetchSessionHandler) [2019-10-16 12:59:52,922] DEBUG [ReplicaFetcher replicaId=1001, leaderId=1003, fetcherId=0] Built incremental fetch (sessionId=848569259, epoch=810) for node 1003. Added 0 partition(s), altered 0 partition(s), removed 0 partition(s) out of 17 partition(s) (org.apache.kafka.clients.FetchSessionHandler) [2019-10-16 12:59:52,927] DEBUG connections.max.reauth.ms for mechanism=PLAIN: 0 (org.apache.kafka.common.security.authenticator.SaslServerAuthenticator) [2019-10-16 12:59:53,017] DEBUG connections.max.reauth.ms for mechanism=PLAIN: 0 (org.apache.kafka.common.security.authenticator.SaslServerAuthenticator) [2019-10-16 12:59:53,025] DEBUG Set SASL server state to HANDSHAKE_OR_VERSIONS_REQUEST during authentication (org.apache.kafka.common.security.authenticator.SaslServerAuthenticator) [2019-10-16 12:59:53,025] DEBUG Set SASL server state to FAILED during authentication (org.apache.kafka.common.security.authenticator.SaslServerAuthenticator) [2019-10-16 12:59:53,025] INFO [SocketServer brokerId=1001] Failed authentication with /10.210.65.111 (Unexpected Kafka request of type METADATA during SASL handshake.) (org.apache.kafka.common.network.Selector) [2019-10-16 12:59:53,106] DEBUG connections.max.reauth.ms for mechanism=PLAIN: 0 (org.apache.kafka.common.security.authenticator.SaslServerAuthenticator) [2019-10-16 12:59:53,106] DEBUG Set SASL server state to HANDSHAKE_OR_VERSIONS_REQUEST during authentication (org.apache.kafka.common.security.authenticator.SaslServerAuthenticator) [2019-10-16 12:59:53,106] DEBUG Handling Kafka request API_VERSIONS during authentication (org.apache.kafka.common.security.authenticator.SaslServerAuthenticator) [2019-10-16 12:59:53,106] DEBUG Set SASL server state to HANDSHAKE_REQUEST during authentication (org.apache.kafka.common.security.authenticator.SaslServerAuthenticator) [2019-10-16 12:59:53,106] DEBUG Set SASL server state to FAILED during authentication (org.apache.kafka.common.security.authenticator.SaslServerAuthenticator) [2019-10-16 12:59:53,106] INFO [SocketServer brokerId=1001] Failed authentication with /10.250.2.56 (Unexpected Kafka request of type METADATA during SASL handshake.) (org.apache.kafka.common.network.Selector) [2019-10-16 12:59:53,118] DEBUG Set SASL server state to HANDSHAKE_OR_VERSIONS_REQUEST during authentication (org.apache.kafka.common.security.authenticator.SaslServerAuthenticator) [2019-10-16 12:59:53,118] DEBUG Set SASL server state to FAILED during authentication (org.apache.kafka.common.security.authenticator.SaslServerAuthenticator) *[2019-10-16 12:59:53,118] INFO [SocketServer brokerId=1001] Failed authentication with /10.210.65.111 <http://10.210.65.111> (Unexpected Kafka request of type METADATA during SASL handshake.) (org.apache.kafka.common.network.Selector)* I am running from the Confluent 5.3.1 kafka Docker container, with these environment variables: KAFKA_AUTHORIZER_CLASS_NAME:"com.gsacapital.dsg.authorisation.SimpleAuthorisation" KAFKA_AUTO_CREATE_TOPICS_ENABLE:"false" KAFKA_SECURITY_PROTOCOL:"SASL_PLAINTEXT" KAFKA_LISTENER_NAME_EXTLISTENER_PLAIN_SASL_SERVER_CALLBACK_HANDLER_CLASS:"com.gsacapital.dsg.authentication.SimpleAuthentication" KAFKA_ADVERTISED_LISTENERS:"EXTLISTENER:// broker-2.kafka.test.dsg.intra.gsacapital.com:9900" KAFKA_SASL_ENABLED_MECHANISMS:"PLAIN" KAFKA_BROKER_ID_GENERATION_ENABLE:"false" KAFKA_SASL_SERVER_CALLBACK_HANDLER_CLASS:"com.gsacapital.dsg.authentication.SimpleAuthentication" KAFKA_ZOOKEEPER_CONNECT:"zookeeper.test.dsg.intra.gsacapital.com:2181" KAFKA_BROKER_ID:"1002" ZOOKEEPER_SASL_ENABLED:"FALSE" KAFKA_LISTENER_SECURITY_PROTOCOL_MAP:"EXTLISTENER:SASL_PLAINTEXT" KAFKA_LOG4J_ROOT_LOGLEVEL:"DEBUG" KAFKA_INTER_BROKER_LISTENER_NAME:"EXTLISTENER" KAFKA_DELETE_TOPIC_ENABLE:"true" KAFKA_SASL_PROTOCOL:"SASL_PLAINTEXT,PLAIN" KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR:"3" KAFKA_LISTENERS:"EXTLISTENER://0.0.0.0:9900" KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL:"PLAIN" KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE:"false" KAFKA_OPTS:"-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf" and the kafka_server_jaas.conf looks like this (for all 3 brokers): KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-secret" user_admin="admin-secret" user_kafkabroker1="kafkabroker1-secret"; }; Is anything obviously missing here? The authentication and authorisation handlers are inspired by (but not identical to) the work in https://github.com/navikt/kafka-plain-saslserver-2-ad Any suggestions would be much appreciated! -Joris.