Awesome. After adding --new-consumer and --bootstrap-server, I got it working.
Thanks a lot. On Fri, Sep 16, 2016 at 5:33 AM, Rajini Sivaram < rajinisiva...@googlemail.com> wrote: > Max, > > You need to use the new consumer since the old consumer does not support > security features. For console-consumer, you need to add the option > --new-consumer. > > On Fri, Sep 16, 2016 at 10:14 AM, Max Bridgewater < > max.bridgewa...@gmail.com > > wrote: > > > Thanks Rajini. That was the issue. Now I am facing another one. I am not > > sure why my consumer is trying to use the topic in PLAINTEXT. The > consumer > > config is: > > > > security.protocol=SASL_PLAINTEXT > > sasl.mechanism=PLAIN > > > > > > KAFKA_OPTS is set to /home/kafka/kafka_client_jaas.conf. I can confirm > > that > > this file is being read because if I change the file name to something > > non-existing, I get file not found exception. > > > > The content of this jaas file: > > > > KafkaClient { > > org.apache.kafka.common.security.plain.PlainLoginModule required > > username="alice" > > password="alice-secret"; > > }; > > > > > > I launch the consumer with: > > bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test3 > > --from-beginning --consumer.config=config/consumer.properties > > > > The server config: > > > > listeners=SASL_PLAINTEXT://localhost:9092 > > security.inter.broker.protocol=SASL_PLAINTEXT > > sasl.mechanism.inter.broker.protocol=PLAIN > > sasl.enabled.mechanisms=PLAIN > > > > The producer config: > > > > security.protocol=SASL_PLAINTEXT > > sasl.mechanism=PLAIN > > > > Now, when I launch the consumer, I get following error: > > > > [2016-09-16 05:09:11,908] WARN > > [test-consumer-group_pascalvm-1474016950388-699882ba-leader- > > finder-thread], > > Failed to find leader for Set([test3,0]) > > (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread) > > kafka.common.BrokerEndPointNotAvailableException: End point with > security > > protocol PLAINTEXT not found for broker 0 > > at kafka.cluster.Broker$$anonfun$5.apply(Broker.scala:131) > > at kafka.cluster.Broker$$anonfun$5.apply(Broker.scala:131) > > at scala.collection.MapLike$class.getOrElse(MapLike.scala:128) > > at scala.collection.AbstractMap.getOrElse(Map.scala:58) > > at kafka.cluster.Broker.getBrokerEndPoint(Broker.scala:130) > > at > > kafka.utils.ZkUtils$$anonfun$getAllBrokerEndPointsForChanne > > l$1.apply(ZkUtils.scala:166) > > at > > kafka.utils.ZkUtils$$anonfun$getAllBrokerEndPointsForChanne > > l$1.apply(ZkUtils.scala:166) > > at > > scala.collection.TraversableLike$$anonfun$map$ > > 1.apply(TraversableLike.scala:244) > > at > > scala.collection.TraversableLike$$anonfun$map$ > > 1.apply(TraversableLike.scala:244) > > > > What am I missing? > > > > > > > > > > On Fri, Sep 16, 2016 at 3:57 AM, Rajini Sivaram < > > rajinisiva...@googlemail.com> wrote: > > > > > Max, > > > > > > I think there is a typo in your configuration. You intended admin > > password > > > to be admin-secret? > > > > > > KafkaServer { > > > org.apache.kafka.common.security.plain.PlainLoginModule required > > > username="admin" > > > password="admin-secret" > > > user_admin="alice-secret" *=> Change to **"admin-secret"* > > > user_alice="alice-secret"; > > > }; > > > > > > > > > Since your inter-broker security protocol is SASL_PLAINTEXT, the > > controller > > > uses SASL with the username "admin" and that connection is failing > since > > > the server thinks the expected password is "alice-secret". > > > > > > > > > > > > On Fri, Sep 16, 2016 at 8:43 AM, Max Bridgewater < > > > max.bridgewa...@gmail.com> > > > wrote: > > > > > > > Hi, > > > > > > > > I am trying to get SASL_PLAINTEXT or SASL_SSL to work. Sofar I am not > > > > successful. I posted the full story on SO: > > > > http://stackoverflow.com/questions/39521691/kafka- > > > authentication-producer- > > > > unable-to-connect-producer > > > > > > > > Bottom line is, when I start the server in SASL_PLAINTEXT mode, the > > below > > > > exception keeps popping up in the logs. The first issue is that you > see > > > it > > > > only when you change log level to DEBUG, while in reality the server > > > isn't > > > > in a functioning state. Should the error be printed at error level? > > > > > > > > Now, the real issue is I don't understand why this is happening. It > > seems > > > > the server is connecting to itself and trying to authenticate against > > > > itself and failing to do so. What is wrong in my configuration? > > > > > > > > In server.properties, I have: > > > > > > > > isteners=SASL_PLAINTEXT://0.0.0.0:9092 > > > > security.inter.broker.protocol=SASL_PLAINTEXT > > > > sasl.mechanism.inter.broker.protocol=PLAIN > > > > sasl.enabled.mechanisms=PLAIN > > > > > > > > Replacing 0.0.0.0 with localhost and 127.0.0.1 produces same result. > > > > > > > > I also have KAFKA_OPTS set to /home/kafka/kafka_client_jaas.conf. > And > > > the > > > > content of kafka_client_jaas.conf is: > > > > > > > > KafkaServer { > > > > org.apache.kafka.common.security.plain.PlainLoginModule required > > > > username="admin" > > > > password="admin-secret" > > > > user_admin="alice-secret" > > > > user_alice="alice-secret"; > > > > }; > > > > > > > > No client is up. The only things I have up are ZK and the Kafka > server. > > > > Here is the stack trace: > > > > > > > > 2016-09-15 22:06:09 DEBUG NetworkClient:496 - Initiating connection > to > > > node > > > > 0 at 0.0.0.0:9092. > > > > 2016-09-15 22:06:09 DEBUG Acceptor:52 - Accepted connection from / > > > > 127.0.0.1 > > > > on /127.0.1.1:9092. sendBufferSize [actual|requested]: > [102400|102400] > > > > recvBufferSize [actual|requested]: [102400|102400] > > > > 2016-09-15 22:06:09 DEBUG Processor:52 - Processor 0 listening to new > > > > connection from /127.0.0.1:59669 > > > > 2016-09-15 22:06:09 DEBUG SaslClientAuthenticator:204 - Set SASL > client > > > > state to SEND_HANDSHAKE_REQUEST > > > > 2016-09-15 22:06:09 DEBUG SaslClientAuthenticator:133 - Creating > > > > SaslClient: client=null;service=kafka;serviceHostname=0.0.0.0;mechs= > > > > [PLAIN] > > > > 2016-09-15 22:06:09 DEBUG SaslClientAuthenticator:204 - Set SASL > client > > > > state to RECEIVE_HANDSHAKE_RESPONSE > > > > 2016-09-15 22:06:09 DEBUG NetworkClient:476 - Completed connection to > > > node > > > > 0 > > > > 2016-09-15 22:06:09 DEBUG SaslServerAuthenticator:269 - Set SASL > server > > > > state to HANDSHAKE_REQUEST > > > > 2016-09-15 22:06:09 DEBUG SaslServerAuthenticator:310 - Handle Kafka > > > > request SASL_HANDSHAKE > > > > 2016-09-15 22:06:09 DEBUG SaslServerAuthenticator:354 - Using SASL > > > > mechanism 'PLAIN' provided by client > > > > 2016-09-15 22:06:09 DEBUG SaslServerAuthenticator:269 - Set SASL > server > > > > state to AUTHENTICATE > > > > 2016-09-15 22:06:09 DEBUG SaslClientAuthenticator:204 - Set SASL > client > > > > state to INITIAL > > > > 2016-09-15 22:06:09 DEBUG SaslClientAuthenticator:204 - Set SASL > client > > > > state to INTERMEDIATE > > > > 2016-09-15 22:06:09 DEBUG SaslServerAuthenticator:269 - Set SASL > server > > > > state to FAILED > > > > 2016-09-15 22:06:09 DEBUG Selector:345 - Connection with /127.0.0.1 > > > > disconnected > > > > java.io.IOException: javax.security.sasl.SaslException: > Authentication > > > > failed: Invalid JAAS configuration [Caused by > > > > javax.security.sasl.SaslException: Authentication failed: Invalid > > > username > > > > or password] > > > > at > > > > org.apache.kafka.common.security.authenticator. > > SaslServerAuthenticator. > > > > authenticate(SaslServerAuthenticator.java:243) > > > > at > > > > org.apache.kafka.common.network.KafkaChannel.prepare( > > > KafkaChannel.java:64) > > > > at > > > > org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector. > > > > java:318) > > > > at org.apache.kafka.common.network.Selector.poll( > > Selector.java:283) > > > > at kafka.network.Processor.poll(SocketServer.scala:472) > > > > at kafka.network.Processor.run(SocketServer.scala:412) > > > > at java.lang.Thread.run(Thread.java:745) > > > > Caused by: javax.security.sasl.SaslException: Authentication failed: > > > > Invalid JAAS configuration [Caused by javax.security.sasl. > > SaslException: > > > > Authentication failed: Invalid username or password] > > > > at > > > > org.apache.kafka.common.security.plain.PlainSaslServer. > > evaluateResponse( > > > > PlainSaslServer.java:101) > > > > at > > > > org.apache.kafka.common.security.authenticator. > > SaslServerAuthenticator. > > > > authenticate(SaslServerAuthenticator.java:228) > > > > ... 6 more > > > > Caused by: javax.security.sasl.SaslException: Authentication failed: > > > > Invalid username or password > > > > at > > > > org.apache.kafka.common.security.plain.PlainSaslServer. > > evaluateResponse( > > > > PlainSaslServer.java:98) > > > > ... 7 more > > > > 2016-09-15 22:06:09 DEBUG Selector:345 - Connection with > > 0.0.0.0/0.0.0.0 > > > > disconnected > > > > java.io.EOFException > > > > at > > > > org.apache.kafka.common.network.NetworkReceive. > > readFromReadableChannel( > > > > NetworkReceive.java:83) > > > > at > > > > org.apache.kafka.common.network.NetworkReceive. > > > > readFrom(NetworkReceive.java:71) > > > > at > > > > org.apache.kafka.common.security.authenticator. > > SaslClientAuthenticator. > > > > receiveResponseOrToken(SaslClientAuthenticator.java:239) > > > > at > > > > org.apache.kafka.common.security.authenticator. > > SaslClientAuthenticator. > > > > authenticate(SaslClientAuthenticator.java:182) > > > > at > > > > org.apache.kafka.common.network.KafkaChannel.prepare( > > > KafkaChannel.java:64) > > > > at > > > > org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector. > > > > java:318) > > > > at org.apache.kafka.common.network.Selector.poll( > > Selector.java:283) > > > > at org.apache.kafka.clients.NetworkClient.poll( > > > NetworkClient.java:260) > > > > at > > > > kafka.utils.NetworkClientBlockingOps$.recursivePoll$1( > > > > NetworkClientBlockingOps.scala:111) > > > > at > > > > kafka.utils.NetworkClientBlockingOps$.kafka$utils$ > > > > NetworkClientBlockingOps$$pollUntil$extension( > > NetworkClientBlockingOps. > > > > scala:120) > > > > at > > > > kafka.utils.NetworkClientBlockingOps$.blockingReady$extension( > > > > NetworkClientBlockingOps.scala:59) > > > > at > > > > kafka.controller.RequestSendThread.brokerReady( > > ControllerChannelManager. > > > > scala:232) > > > > at > > > > kafka.controller.RequestSendThread.liftedTree1$ > > > 1(ControllerChannelManager. > > > > scala:181) > > > > at > > > > kafka.controller.RequestSendThread.doWork(ControllerChannelManager. > > > > scala:180) > > > > at kafka.utils.ShutdownableThread.run( > ShutdownableThread.scala:63) > > > > 2016-09-15 22:06:09 DEBUG NetworkClient:463 - Node 0 disconnected. > > > > > > > > Any thoughts? > > > > > > > > Thanks, > > > > Max. > > > > > > > > > > > > > > > > -- > > > Regards, > > > > > > Rajini > > > > > > > > > -- > Regards, > > Rajini >