Awesome. After adding --new-consumer and --bootstrap-server, I got it
working.

Thanks a lot.

On Fri, Sep 16, 2016 at 5:33 AM, Rajini Sivaram <
rajinisiva...@googlemail.com> wrote:

> Max,
>
> You need to use the new consumer since the old consumer does not support
> security features. For console-consumer, you need to add the option
> --new-consumer.
>
> On Fri, Sep 16, 2016 at 10:14 AM, Max Bridgewater <
> max.bridgewa...@gmail.com
> > wrote:
>
> > Thanks Rajini. That was the issue. Now I am facing another one. I am not
> > sure why my consumer is trying to use the topic in PLAINTEXT. The
> consumer
> > config is:
> >
> > security.protocol=SASL_PLAINTEXT
> > sasl.mechanism=PLAIN
> >
> >
> > KAFKA_OPTS is set to /home/kafka/kafka_client_jaas.conf. I can confirm
> > that
> > this file is being read because if I change the file name to something
> > non-existing, I get file not found exception.
> >
> > The content of this jaas file:
> >
> > KafkaClient {
> >   org.apache.kafka.common.security.plain.PlainLoginModule required
> >   username="alice"
> >   password="alice-secret";
> > };
> >
> >
> > I launch the consumer with:
> > bin/kafka-console-consumer.sh  --zookeeper localhost:2181 --topic test3
> > --from-beginning --consumer.config=config/consumer.properties
> >
> > The server config:
> >
> > listeners=SASL_PLAINTEXT://localhost:9092
> > security.inter.broker.protocol=SASL_PLAINTEXT
> > sasl.mechanism.inter.broker.protocol=PLAIN
> > sasl.enabled.mechanisms=PLAIN
> >
> > The producer config:
> >
> > security.protocol=SASL_PLAINTEXT
> > sasl.mechanism=PLAIN
> >
> > Now, when I launch the consumer, I get following error:
> >
> > [2016-09-16 05:09:11,908] WARN
> > [test-consumer-group_pascalvm-1474016950388-699882ba-leader-
> > finder-thread],
> > Failed to find leader for Set([test3,0])
> > (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread)
> > kafka.common.BrokerEndPointNotAvailableException: End point with
> security
> > protocol PLAINTEXT not found for broker 0
> >     at kafka.cluster.Broker$$anonfun$5.apply(Broker.scala:131)
> >     at kafka.cluster.Broker$$anonfun$5.apply(Broker.scala:131)
> >     at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
> >     at scala.collection.AbstractMap.getOrElse(Map.scala:58)
> >     at kafka.cluster.Broker.getBrokerEndPoint(Broker.scala:130)
> >     at
> > kafka.utils.ZkUtils$$anonfun$getAllBrokerEndPointsForChanne
> > l$1.apply(ZkUtils.scala:166)
> >     at
> > kafka.utils.ZkUtils$$anonfun$getAllBrokerEndPointsForChanne
> > l$1.apply(ZkUtils.scala:166)
> >     at
> > scala.collection.TraversableLike$$anonfun$map$
> > 1.apply(TraversableLike.scala:244)
> >     at
> > scala.collection.TraversableLike$$anonfun$map$
> > 1.apply(TraversableLike.scala:244)
> >
> > What am I missing?
> >
> >
> >
> >
> > On Fri, Sep 16, 2016 at 3:57 AM, Rajini Sivaram <
> > rajinisiva...@googlemail.com> wrote:
> >
> > > Max,
> > >
> > > I think there is a typo in your configuration. You intended admin
> > password
> > > to be admin-secret?
> > >
> > > KafkaServer {
> > >    org.apache.kafka.common.security.plain.PlainLoginModule required
> > >    username="admin"
> > >    password="admin-secret"
> > >    user_admin="alice-secret"  *=> Change to **"admin-secret"*
> > >    user_alice="alice-secret";
> > > };
> > >
> > >
> > > Since your inter-broker security protocol is SASL_PLAINTEXT, the
> > controller
> > > uses SASL with the username "admin" and that connection is failing
> since
> > > the server thinks the expected password is "alice-secret".
> > >
> > >
> > >
> > > On Fri, Sep 16, 2016 at 8:43 AM, Max Bridgewater <
> > > max.bridgewa...@gmail.com>
> > > wrote:
> > >
> > > > Hi,
> > > >
> > > > I am trying to get SASL_PLAINTEXT or SASL_SSL to work. Sofar I am not
> > > > successful. I posted the full story on SO:
> > > > http://stackoverflow.com/questions/39521691/kafka-
> > > authentication-producer-
> > > > unable-to-connect-producer
> > > >
> > > > Bottom line is, when I start the server in SASL_PLAINTEXT mode, the
> > below
> > > > exception keeps popping up in the logs. The first issue is that you
> see
> > > it
> > > > only when you change log level to DEBUG, while in reality the server
> > > isn't
> > > > in a functioning state. Should the error be printed at error level?
> > > >
> > > > Now, the real issue is I don't understand why this is happening. It
> > seems
> > > > the server is connecting to itself and trying to authenticate against
> > > > itself and failing to do so. What is wrong in my configuration?
> > > >
> > > > In  server.properties, I have:
> > > >
> > > > isteners=SASL_PLAINTEXT://0.0.0.0:9092
> > > > security.inter.broker.protocol=SASL_PLAINTEXT
> > > > sasl.mechanism.inter.broker.protocol=PLAIN
> > > > sasl.enabled.mechanisms=PLAIN
> > > >
> > > > Replacing 0.0.0.0 with localhost and 127.0.0.1 produces same result.
> > > >
> > > > I also have KAFKA_OPTS set to /home/kafka/kafka_client_jaas.conf.
> And
> > > the
> > > > content of kafka_client_jaas.conf is:
> > > >
> > > > KafkaServer {
> > > >    org.apache.kafka.common.security.plain.PlainLoginModule required
> > > >    username="admin"
> > > >    password="admin-secret"
> > > >    user_admin="alice-secret"
> > > >    user_alice="alice-secret";
> > > > };
> > > >
> > > > No client is up. The only things I have up are ZK and the Kafka
> server.
> > > > Here is the stack trace:
> > > >
> > > > 2016-09-15 22:06:09 DEBUG NetworkClient:496 - Initiating connection
> to
> > > node
> > > > 0 at 0.0.0.0:9092.
> > > > 2016-09-15 22:06:09 DEBUG Acceptor:52 - Accepted connection from /
> > > > 127.0.0.1
> > > > on /127.0.1.1:9092. sendBufferSize [actual|requested]:
> [102400|102400]
> > > > recvBufferSize [actual|requested]: [102400|102400]
> > > > 2016-09-15 22:06:09 DEBUG Processor:52 - Processor 0 listening to new
> > > > connection from /127.0.0.1:59669
> > > > 2016-09-15 22:06:09 DEBUG SaslClientAuthenticator:204 - Set SASL
> client
> > > > state to SEND_HANDSHAKE_REQUEST
> > > > 2016-09-15 22:06:09 DEBUG SaslClientAuthenticator:133 - Creating
> > > > SaslClient: client=null;service=kafka;serviceHostname=0.0.0.0;mechs=
> > > > [PLAIN]
> > > > 2016-09-15 22:06:09 DEBUG SaslClientAuthenticator:204 - Set SASL
> client
> > > > state to RECEIVE_HANDSHAKE_RESPONSE
> > > > 2016-09-15 22:06:09 DEBUG NetworkClient:476 - Completed connection to
> > > node
> > > > 0
> > > > 2016-09-15 22:06:09 DEBUG SaslServerAuthenticator:269 - Set SASL
> server
> > > > state to HANDSHAKE_REQUEST
> > > > 2016-09-15 22:06:09 DEBUG SaslServerAuthenticator:310 - Handle Kafka
> > > > request SASL_HANDSHAKE
> > > > 2016-09-15 22:06:09 DEBUG SaslServerAuthenticator:354 - Using SASL
> > > > mechanism 'PLAIN' provided by client
> > > > 2016-09-15 22:06:09 DEBUG SaslServerAuthenticator:269 - Set SASL
> server
> > > > state to AUTHENTICATE
> > > > 2016-09-15 22:06:09 DEBUG SaslClientAuthenticator:204 - Set SASL
> client
> > > > state to INITIAL
> > > > 2016-09-15 22:06:09 DEBUG SaslClientAuthenticator:204 - Set SASL
> client
> > > > state to INTERMEDIATE
> > > > 2016-09-15 22:06:09 DEBUG SaslServerAuthenticator:269 - Set SASL
> server
> > > > state to FAILED
> > > > 2016-09-15 22:06:09 DEBUG Selector:345 - Connection with /127.0.0.1
> > > > disconnected
> > > > java.io.IOException: javax.security.sasl.SaslException:
> Authentication
> > > > failed: Invalid JAAS configuration [Caused by
> > > > javax.security.sasl.SaslException: Authentication failed: Invalid
> > > username
> > > > or password]
> > > >     at
> > > > org.apache.kafka.common.security.authenticator.
> > SaslServerAuthenticator.
> > > > authenticate(SaslServerAuthenticator.java:243)
> > > >     at
> > > > org.apache.kafka.common.network.KafkaChannel.prepare(
> > > KafkaChannel.java:64)
> > > >     at
> > > > org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.
> > > > java:318)
> > > >     at org.apache.kafka.common.network.Selector.poll(
> > Selector.java:283)
> > > >     at kafka.network.Processor.poll(SocketServer.scala:472)
> > > >     at kafka.network.Processor.run(SocketServer.scala:412)
> > > >     at java.lang.Thread.run(Thread.java:745)
> > > > Caused by: javax.security.sasl.SaslException: Authentication failed:
> > > > Invalid JAAS configuration [Caused by javax.security.sasl.
> > SaslException:
> > > > Authentication failed: Invalid username or password]
> > > >     at
> > > > org.apache.kafka.common.security.plain.PlainSaslServer.
> > evaluateResponse(
> > > > PlainSaslServer.java:101)
> > > >     at
> > > > org.apache.kafka.common.security.authenticator.
> > SaslServerAuthenticator.
> > > > authenticate(SaslServerAuthenticator.java:228)
> > > >     ... 6 more
> > > > Caused by: javax.security.sasl.SaslException: Authentication failed:
> > > > Invalid username or password
> > > >     at
> > > > org.apache.kafka.common.security.plain.PlainSaslServer.
> > evaluateResponse(
> > > > PlainSaslServer.java:98)
> > > >     ... 7 more
> > > > 2016-09-15 22:06:09 DEBUG Selector:345 - Connection with
> > 0.0.0.0/0.0.0.0
> > > > disconnected
> > > > java.io.EOFException
> > > >     at
> > > > org.apache.kafka.common.network.NetworkReceive.
> > readFromReadableChannel(
> > > > NetworkReceive.java:83)
> > > >     at
> > > > org.apache.kafka.common.network.NetworkReceive.
> > > > readFrom(NetworkReceive.java:71)
> > > >     at
> > > > org.apache.kafka.common.security.authenticator.
> > SaslClientAuthenticator.
> > > > receiveResponseOrToken(SaslClientAuthenticator.java:239)
> > > >     at
> > > > org.apache.kafka.common.security.authenticator.
> > SaslClientAuthenticator.
> > > > authenticate(SaslClientAuthenticator.java:182)
> > > >     at
> > > > org.apache.kafka.common.network.KafkaChannel.prepare(
> > > KafkaChannel.java:64)
> > > >     at
> > > > org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.
> > > > java:318)
> > > >     at org.apache.kafka.common.network.Selector.poll(
> > Selector.java:283)
> > > >     at org.apache.kafka.clients.NetworkClient.poll(
> > > NetworkClient.java:260)
> > > >     at
> > > > kafka.utils.NetworkClientBlockingOps$.recursivePoll$1(
> > > > NetworkClientBlockingOps.scala:111)
> > > >     at
> > > > kafka.utils.NetworkClientBlockingOps$.kafka$utils$
> > > > NetworkClientBlockingOps$$pollUntil$extension(
> > NetworkClientBlockingOps.
> > > > scala:120)
> > > >     at
> > > > kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(
> > > > NetworkClientBlockingOps.scala:59)
> > > >     at
> > > > kafka.controller.RequestSendThread.brokerReady(
> > ControllerChannelManager.
> > > > scala:232)
> > > >     at
> > > > kafka.controller.RequestSendThread.liftedTree1$
> > > 1(ControllerChannelManager.
> > > > scala:181)
> > > >     at
> > > > kafka.controller.RequestSendThread.doWork(ControllerChannelManager.
> > > > scala:180)
> > > >     at kafka.utils.ShutdownableThread.run(
> ShutdownableThread.scala:63)
> > > > 2016-09-15 22:06:09 DEBUG NetworkClient:463 - Node 0 disconnected.
> > > >
> > > > Any thoughts?
> > > >
> > > > Thanks,
> > > > Max.
> > > >
> > >
> > >
> > >
> > > --
> > > Regards,
> > >
> > > Rajini
> > >
> >
>
>
>
> --
> Regards,
>
> Rajini
>

Reply via email to