Re: Is kafka support dynamic ACL rule
Hi, While creating the ACL you can do that however while Kafka authorizing it does not support any REGEX for users You have to create a new Authorizer class by extending Authorizer f.i "SimpleAclAuthorizer.scala" -> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala Then you can tell Kafka to use your authorizer from the server.properties as: authorizer.class.name=com.example.CustomAclAuthorizer Then you can do any kind of authorization yourself Regards hui happy , 27 Ara 2018 Per, 02:18 tarihinde şunu yazdı: > Hi > > As I learned that kafka can use '--resource-pattern-type prefixed' to add > rule for prefixed topic. > For example an user 'kafkaclient', we could define a rule let the user can > access all topics start with that user name, i.e., 'kafkaclient--', such > as 'kafkaclient--topic1', 'kafkaclient--topic2', etc. > > /opt/kafka/bin/kafka-acls.sh \ > > --authorizer-properties zookeeper.connect=zookeeper:2181 \ > > --add \ > > --allow-principal User:"kafkaclient" \ > > --operation All \ > > --resource-pattern-type prefixed \ > > --topic "kafkaclient--" \ > > > But is it possible to define dynamic user name ? > In above case we know the username is 'kafkaclient', and if there are many > other users, we have to add rule for each user; these rules are similar, > except the user name. > > So i want to know if it's possible to just define a single rule, using > dynamic user name, each user could access the topics start with itself > username. something likes: > > /opt/kafka/bin/kafka-acls.sh \ > > --authorizer-properties zookeeper.connect=zookeeper:2181 \ > > --add \ > > --allow-principal User:"**" \ > > --operation All \ > > --resource-pattern-type prefixed \ > > --topic "**--" \ > > > Then whatever to add user or add topic later, we don't need to add any > rules. > > Thanks. > Hui >
AdminClient error while calling the createACL function
Hi, I am running 1 node kafka setup in my local Docker and trying to create ACLs by using AdminClient API but receiving the following error: [2018-11-12 23:21:20,065] TRACE [AdminClient clientId=adminclient-1] Metadata is not ready: we have not fetched metadata from the bootstrap nodes yet. (org.apache.kafka.clients.admin.internals.AdminMetadataManager) [2018-11-12 23:21:20,066] DEBUG [AdminClient clientId=adminclient-1] Requesting metadata update. (org.apache.kafka.clients.admin.internals.AdminMetadataManager) [2018-11-12 23:21:20,066] TRACE [AdminClient clientId=adminclient-1] Unable to assign Call(callName=createAcls, deadlineMs=1542064880086) to a node. (org.apache.kafka.clients.admin.KafkaAdminClient) [2018-11-12 23:21:20,066] TRACE [AdminClient clientId=adminclient-1] Entering KafkaClient#poll(timeout=22) (org.apache.kafka.clients.admin.KafkaAdminClient) [2018-11-12 23:21:20,091] TRACE [AdminClient clientId=adminclient-1] KafkaClient#poll retrieved 0 response(s) (org.apache.kafka.clients.admin.KafkaAdminClient) [2018-11-12 23:21:20,094] DEBUG [AdminClient clientId=adminclient-1] Call(callName=createAcls, deadlineMs=1542064880086) timed out at 1542064880092 after 1 attempt(s) (org.apache.kafka.clients.admin.KafkaAdminClient) java.lang.Exception: TimeoutException: Timed out waiting for a node assignment. at org.apache.kafka.clients.admin.KafkaAdminClient$Call.fail(KafkaAdminClient.java:603) at org.apache.kafka.clients.admin.KafkaAdminClient$TimeoutProcessor.handleTimeouts(KafkaAdminClient.java:721) at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.timeoutPendingCalls(KafkaAdminClient.java:789) at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1083) at java.lang.Thread.run(Thread.java:748) I tried to increase timeout value but did not work. I am using Kafka 2.0 I could not find anything else, can you please help ? This might be related to single node setup ? Thanks in advance
org.apache.kafka.common.errors.SecurityDisabledException: No Authorizer is configured on the broker.
Hi, I am trying to create ACLs programatically during the connection and inside the AuthenticationCallbackHandler. I am using AdminClient API as: Initialize AdminClient*** properties = new HashMap<>(); //TODO: read Those from properties properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092"); properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "5000"); adminClient = AdminClient.create(properties); ***Actual call to create ACLs** try{ CreateAclsResult acls = adminClient.createAcls(aclList); acls.all().get(); }catch (Exception e){ LOG.error("Error received while creating the ACLs",e); adminClient.close(); } But I am getting following error in Kafka server: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.SecurityDisabledException: No Authorizer is configured on the broker. at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:262) Anybody has any idea, I appreciate the help Thanks
Custom Token Authentication Callback
Hi, I am trying to build a custom sequence to authenticate the clients. According to documentation, I need to create 2 different Callback: sasl.login.callback.handler.class -> I am assuming this is for the first time connect request comes then I should create a JWT token inside this Callback sasl.server.callback.handler.class -> This is the one validating token It is really not clear the use of those 2 different implementation. So I wanted to give a try, but I am getting error from the Callback handle function during the server is starting My expectation, handle function should work only when a client tries to connect ? I am confused please help on this one Kafka version: 2.0.0 My sequence: - Clients will have token and send it to Kafka for verification - No inter-broker authentication required Config details: listener.security.protocol.map = INSIDE:PLAINTEXT,OUTSIDE:SASL_PLAINTEXT listeners = INSIDE://:9092,OUTSIDE://:9094 sasl.enabled.mechanisms = [OAUTHBEARER] ENV: KAFKA_LISTENER_NAME_OUTSIDE_OAUTHBEARER_SASL_LOGIN_CALLBACK_HANDLER_CLASS= KAFKA_LISTENER_NAME_OUTSIDE_OAUTHBEARER_SASL_SERVER_CALLBACK_HANDLER_CLASS= Thanks for the help