Re: SSL between Kafka and Spark Streaming API
SSL is supported for new producer and consumer api and old api (simple consumer and high-level consumer) is not supported. I think spark uses simple consumer? if so its not supported. Thanks, Harsha On August 28, 2015 at 11:00:30 AM, Cassa L (lcas...@gmail.com) wrote: Hi, I was going through SSL setup of Kafka. https://cwiki.apache.org/confluence/display/KAFKA/Deploying+SSL+for+Kafka However, I am also using Spark-Kafka streaming to read data from Kafka. Is there a way to activate SSL for spark streaming API or not possible at all? Thanks, LCassa
Re: SSL between Kafka and Spark Streaming API
You can configure PLAINTEXT listener as well with the broker and use that port for spark. -- Harsha On August 28, 2015 at 12:24:45 PM, Sourabh Chandak (sourabh3...@gmail.com) wrote: Can we use the existing kafka spark streaming jar to connect to a kafka server running in SSL mode? We are fine with non SSL consumer as our kafka cluster and spark cluster are in the same network Thanks, Sourabh On Fri, Aug 28, 2015 at 12:03 PM, Gwen Shapira g...@confluent.io wrote: I can't speak for the Spark Community, but checking their code, DirectKafkaStream and KafkaRDD use the SimpleConsumer API: https://github.com/apache/spark/blob/master/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala https://github.com/apache/spark/blob/master/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala On Fri, Aug 28, 2015 at 11:32 AM, Cassa L lcas...@gmail.com wrote: Hi I am using below Spark jars with Direct Stream API. spark-streaming-kafka_2.10 When I look at its pom.xml, Kafka libraries that its pulling in is groupIdorg.apache.kafka/groupId artifactIdkafka_${scala.binary.version}/artifactId version0.8.2.1/version I believe this DirectStream API uses SimpleConsumer API. Can someone from Spark community confirm too? Thanks, LCassa. On Fri, Aug 28, 2015 at 11:12 AM, Sriharsha Chintalapani ka...@harsha.io wrote: SSL is supported for new producer and consumer api and old api (simple consumer and high-level consumer) is not supported. I think spark uses simple consumer? if so its not supported. Thanks, Harsha On August 28, 2015 at 11:00:30 AM, Cassa L (lcas...@gmail.com) wrote: Hi, I was going through SSL setup of Kafka. https://cwiki.apache.org/confluence/display/KAFKA/Deploying+SSL+for+Kafka However, I am also using Spark-Kafka streaming to read data from Kafka. Is there a way to activate SSL for spark streaming API or not possible at all? Thanks, LCassa
Re: Issue when enabling SSL on broker
Hi, Turns out to be a bug in the instructions in the wiki . I fixed it can you please retry generating the truststore and keystore https://cwiki.apache.org/confluence/display/KAFKA/Deploying+SSL+for+Kafka . checkout this section All of the above steps in a bash script” to generate the keystores. Thanks, Harsha On August 25, 2015 at 8:56:24 PM, Sriharsha Chintalapani (ka...@harsha.io) wrote: Hi Xiang, Did you try following the instructions here https://cwiki.apache.org/confluence/display/KAFKA/Deploying+SSL+for+Kafka . Whats the output of openssl s_client and which version of java and OS are you using. Thanks, Harsha On August 25, 2015 at 8:42:18 PM, Xiang Zhou (Samuel) (zhou...@gmail.com) wrote: no cipher suites in common
Re: Kafka Cluster behind Proxy
Ankit, Did you try using advertised.host.name and advertised.port by specifying proxy host port details. -- Harsha Sent with Airmail On August 17, 2015 at 12:28:19 AM, Ankit Jain (ankitjainc...@gmail.com) wrote: Hi All, We want to deploy the Kafka cluster behind the proxy. We are exposing the proxy IP to client. The client is using the proxy IP to connect to zookeeper server to consume the data from Kafka cluster, but as we know, the zookeeper return the broker IP's to client and then client directly communicate to Kafka Cluster. The IP's return by zookeeper are Kafka IP's and which are not open for client. We want to provide all the access through proxy only. Please let me know, how we can use Kafka Cluster behind Proxy. -- Thanks, Ankit Jain
Re: Controlled Shutdown Tool?
controlled.shutdown built into broker when this config set to true it makes request to controller to initiate the controlled shutdown, waits till the request is succeeded and incase of failure retries the shutdown controlled.shutdown.max.retries times. https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/server/KafkaServer.scala#L175 -- Harsha On July 27, 2015 at 11:50:27 AM, Andrew Otto (ao...@wikimedia.org) wrote: Thanks! But how do I initiate a controlled shutdown on a running broker? Editing server.properties is not going to cause this to happen. Don’t I have to tell the broker to shutdown nicely? All I really want to do is tell the controller to move leadership to other replicas, so I can shutdown the broker without clients getting all confused. On Jul 27, 2015, at 14:48, Sriharsha Chintalapani ka...@harsha.io wrote: You can set controlled.shutdown.enable to true in kafka’s server.properties , this is enabled by default in 0.8.2 on wards and also you can set max retries using controlled.shutdown.max.retries defaults to 3 . Thanks, Harsha On July 27, 2015 at 11:42:32 AM, Andrew Otto (ao...@wikimedia.org) wrote: I’m working on packaging 0.8.2.1 for Wikimedia, and in doing so I’ve noticed that kafka.admin.ShutdownBroker doesn’t exist anymore. From what I can tell, this has been intentionally removed in favor of a JMX(?) config “controlled.shutdown.enable”. It is unclear from the documentation how one is supposed to set this for a running broker. Do I need a special JMX tool in order to flick this switch? I’d like to add a command to my kafka bin wrapper script so that I can easily use this when restarting brokers. What is the proper way to set controlled.shutdown.enable? Thanks! -Andrew Otto
Re: Controlled Shutdown Tool?
You can set controlled.shutdown.enable to true in kafka’s server.properties , this is enabled by default in 0.8.2 on wards and also you can set max retries using controlled.shutdown.max.retries defaults to 3 . Thanks, Harsha On July 27, 2015 at 11:42:32 AM, Andrew Otto (ao...@wikimedia.org) wrote: I’m working on packaging 0.8.2.1 for Wikimedia, and in doing so I’ve noticed that kafka.admin.ShutdownBroker doesn’t exist anymore. From what I can tell, this has been intentionally removed in favor of a JMX(?) config “controlled.shutdown.enable”. It is unclear from the documentation how one is supposed to set this for a running broker. Do I need a special JMX tool in order to flick this switch? I’d like to add a command to my kafka bin wrapper script so that I can easily use this when restarting brokers. What is the proper way to set controlled.shutdown.enable? Thanks! -Andrew Otto
Re: Implementing a custom partitioner
Hi, Are you using the latest trunk for Producer API?. Did you implement the interface here https://cwiki.apache.org/confluence/display/KAFKA/KIP-+22+-+Expose+a+Partitioner+interface+in+the+new+producer -- Harsha On July 21, 2015 at 2:27:05 PM, JIEFU GONG (jg...@berkeley.edu) wrote: Hi all, If I wanted to write my own partitioner, all I would need to do is write a class that extends Partitioner and override the partition function, correct? I am currently doing so, at least, with a class in the package 'services', yet when I use: properties.put(partitioner.class, services.myPartitioner); and instantiate my producer, this doesn't work properly. I'm using a simple switch statement, so I am led to believe that I have not improperly written my partitioner. After attempting to debug the issue, I notice that the constructor I'm entering when attempting to instantiate the producer has the line: this.partitioner = new Partitioner(); which more or less ignores my input. Any ideas? Help is appreciated! -- Jiefu Gong University of California, Berkeley | Class of 2017 B.A Computer Science | College of Letters and Sciences jg...@berkeley.edu elise...@berkeley.edu | (925) 400-3427
Re: Implementing a custom partitioner
If you are using the new producer api from kafka 0.8.2 there is no pluggable partitioner in it for this you need to use the latest trunk. But in 0.8.2 if you are using old producer code you can implement a pluggable partitioner https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/producer/ProducerConfig.scala#L69 by implementing this interface https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/producer/Partitioner.scala and its get created here https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/producer/Producer.scala#L61 Thanks, Harsha On July 21, 2015 at 2:54:05 PM, JIEFU GONG (jg...@berkeley.edu) wrote: Sriharsha, thanks for your response. I'm using version 0.8.2, and I am implementing kafka.producer.Partitioner. I noticed that in the latest trunk the line I specified above is replaced with: this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class); does this mean I cannot use my own partitioner in v 0.8.2? On Tue, Jul 21, 2015 at 2:48 PM, Sriharsha Chintalapani ka...@harsha.io wrote: Hi, Are you using the latest trunk for Producer API?. Did you implement the interface here https://cwiki.apache.org/confluence/display/KAFKA/KIP-+22+-+Expose+a+Partitioner+interface+in+the+new+producer -- Harsha On July 21, 2015 at 2:27:05 PM, JIEFU GONG (jg...@berkeley.edu) wrote: Hi all, If I wanted to write my own partitioner, all I would need to do is write a class that extends Partitioner and override the partition function, correct? I am currently doing so, at least, with a class in the package 'services', yet when I use: properties.put(partitioner.class, services.myPartitioner); and instantiate my producer, this doesn't work properly. I'm using a simple switch statement, so I am led to believe that I have not improperly written my partitioner. After attempting to debug the issue, I notice that the constructor I'm entering when attempting to instantiate the producer has the line: this.partitioner = new Partitioner(); which more or less ignores my input. Any ideas? Help is appreciated! -- Jiefu Gong University of California, Berkeley | Class of 2017 B.A Computer Science | College of Letters and Sciences jg...@berkeley.edu elise...@berkeley.edu | (925) 400-3427 -- Jiefu Gong University of California, Berkeley | Class of 2017 B.A Computer Science | College of Letters and Sciences jg...@berkeley.edu | (925) 400-3427
Re: Dropping support for Scala 2.9.x
I am +1 on dropping 2.9.x support. Thanks, Harsha On July 8, 2015 at 7:08:12 AM, Ismael Juma (mli...@juma.me.uk) wrote: Hi, The responses in this thread were positive, but there weren't many. A few months passed and Sriharsha encouraged me to reopen the thread given that the 2.9 build has been broken for at least a week[1] and no-one seemed to notice. Do we want to invest more time so that the 2.9 build continues to work or do we want to focus our efforts on 2.10 and 2.11? Please share your opinion. Best, Ismael [1] https://issues.apache.org/jira/browse/KAFKA-2325 On Fri, Mar 27, 2015 at 2:20 PM, Ismael Juma mli...@juma.me.uk wrote: Hi all, The Kafka build currently includes support for Scala 2.9, which means that it cannot take advantage of features introduced in Scala 2.10 or depend on libraries that require it. This restricts the solutions available while trying to solve existing issues. I was browsing JIRA looking for areas to contribute and I quickly ran into two issues where this is the case: * KAFKA-1351: String.format is very expensive in Scala could be solved nicely by using the String interpolation feature introduced in Scala 2.10. * KAFKA-1595: Remove deprecated and slower scala JSON parser from kafka.consumer.TopicCount could be solved by using an existing JSON library, but both jackson-scala and play-json require 2.10 (argonaut supports Scala 2.9, but it brings other dependencies like scalaz). We can workaround this by writing our own code instead of using libraries, of course, but it's not ideal. Other features like Scala Futures and value classes would also be useful in some situations, I would think (for a more extensive list of new features, see http://scala-language.1934581.n4.nabble.com/Scala-2-10-0-now-available-td4634126.html ). Another pain point of supporting 2.9.x is that it doubles the number of build and test configurations required from 2 to 4 (because the 2.9.x series was not necessarily binary compatible). A strong argument for maintaining support for 2.9.x was the client library, but that has been rewritten in Java. It's also worth mentioning that Scala 2.9.1 was released in August 2011 (more than 3.5 years ago) and the 2.9.x series hasn't received updates of any sort since early 2013. Scala 2.10.0, in turn, was released in January 2013 (over 2 years ago) and 2.10.5, the last planned release in the 2.10.x series, has been recently released (so even 2.10.x won't be receiving updates any longer). All in all, I think it would not be unreasonable to drop support for Scala 2.9.x in a future release, but I may be missing something. What do others think? Ismael
Re: latest kafka consumer api maven location
Sushant, You are using kafka clients new consumer api. It looks like you want to use high-level consumer api?. If so you need use following kafka core lib as the dependency dependency groupIdorg.apache.kafka/groupId artifactIdkafka_2.10/artifactId version0.8.2.1/version /dependency More details on this page https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example -- Harsha On June 21, 2015 at 8:02:01 AM, Shushant Arora (shushantaror...@gmail.com) wrote: which is the latest jar to be used for kafka java client. As in dependency groupIdorg.apache.kafka/groupId artifactIdkafka-clients/artifactId version0.8.2.1/version /dependency In class org.apache.kafka.clients.consumer.KafkaConsumer public MapString, ConsumerRecordsK,V poll(long timeout) { // TODO Auto-generated method stub return null; } poll method returns null. I want to use a high level java consumer. And why in org.apache.kafka.clients.consumer.ConsumerConfig configuration is for public static final String BOOTSTRAP_SERVERS_CONFIG = bootstrap.servers; not for zookeeper.connect ? Is in highlevel offsets are maintained by zookeeper then why broker address is required?
Re: Log file of server start up error
It looks like you have another process running for kafka broker. Stop that broker and start a new one. -- Harsha On May 24, 2015 at 11:05:27 PM, Sanjay Mistry (mistrysanja...@gmail.com) wrote: Hello, When run this command bin/kafka-console-producer.sh --broker-list localhost:9092 --topic testing. i am getting INFO Closing socket connection to /127.0.0.1. (kafka.network.Processor) error in server konsol. Please help me. On Sat, May 23, 2015 at 12:19 PM, Sanjay Mistry mistrysanja...@gmail.com wrote: [2015-05-23 12:16:41,624] INFO Initiating client connection, connectString=localhost:2181 sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@70808f4e (org.apache.zookeeper.ZooKeeper) [2015-05-23 12:16:41,659] INFO Opening socket connection to server localhost/0:0:0:0:0:0:0:1:2181 (org.apache.zookeeper.ClientCnxn) [2015-05-23 12:16:41,673] INFO Socket connection established to localhost/0:0:0:0:0:0:0:1:2181, initiating session (org.apache.zookeeper.ClientCnxn) [2015-05-23 12:16:41,740] INFO Session establishment complete on server localhost/0:0:0:0:0:0:0:1:2181, sessionid = 0x14d7f85f7a3, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn) [2015-05-23 12:16:41,743] INFO zookeeper state changed (SyncConnected) (org.I0Itec.zkclient.ZkClient) [2015-05-23 12:16:42,015] FATAL Fatal error during KafkaServerStable startup. Prepare to shutdown (kafka.server.KafkaServerStartable) kafka.common.KafkaException: Failed to acquire lock on file .lock in /tmp/kafka-logs. A Kafka instance in another process or thread is using this directory. at kafka.log.LogManager$$anonfun$lockLogDirs$1.apply(LogManager.scala:95) at kafka.log.LogManager$$anonfun$lockLogDirs$1.apply(LogManager.scala:92) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:33) at scala.collection.TraversableLike$class.map(TraversableLike.scala:233) at scala.collection.mutable.WrappedArray.map(WrappedArray.scala:33) at kafka.log.LogManager.lockLogDirs(LogManager.scala:92) at kafka.log.LogManager.init(LogManager.scala:55) at kafka.server.KafkaServer.createLogManager(KafkaServer.scala:275) at kafka.server.KafkaServer.startup(KafkaServer.scala:72) at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:34) at kafka.Kafka$.main(Kafka.scala:46) at kafka.Kafka.main(Kafka.scala) [2015-05-23 12:16:42,023] INFO [Kafka Server 0], shutting down (kafka.server.KafkaServer) [2015-05-23 12:16:42,030] INFO Terminate ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread) [2015-05-23 12:16:42,036] INFO EventThread shut down (org.apache.zookeeper.ClientCnxn) [2015-05-23 12:16:42,037] INFO Session: 0x14d7f85f7a3 closed (org.apache.zookeeper.ZooKeeper) [2015-05-23 12:16:42,038] INFO [Kafka Server 0], shut down completed (kafka.server.KafkaServer) [2015-05-23 12:16:42,040] INFO [Kafka Server 0], shutting down (kafka.server.KafkaServer)
Re: Delete topic pending
May be thats a bug. We should probably alert the user saying the topic doesn’t exist rather than go and add it to the zookeeper for deletion trigger. Also to delete any topic you need to set “delete.topic.enable” to true -- Harsha On May 18, 2015 at 10:16:46 AM, Dillian Murphey (crackshotm...@gmail.com) wrote: If a broker doesn't have the topic, and I run delete topic, that topic will be in a pending delete state forever. What am I doing wrong here? Also, what if I have data loss and I just want to delete the dang topic form zookeeper directly with non of this pending stuff. Thanks
RE: Support https or ssl
Jamie, Once the patch gets in you can configure kafka brokers with ssl configs and similarly you can configure producer and consumer with ssl both will negotiate a ssl connection once its established rest of the communication will happen over ssl. I am not sure about the timeline but we are working to get this asap. In my previous email I was talking about https://cwiki.apache.org/confluence/display/KAFKA/Clients#Clients-HTTPREST or https://github.com/confluentinc/kafka-rest and front with them a http server that accepts ssl connections. Its bit of redirection but the end connections to kafka brokers won’t be ssl . If thats your use case I would suggest you to wait till these patches gets in. Thanks, Harsha On May 8, 2015 at 1:35:17 PM, Jamie Wang (jamie.w...@actuate.com) wrote: Thank you Harsha for the reply. Will the patch work with 0.8.2.1? So the general logic is that we write an agent that sends message via https and then may be a servlet using Kafka produce class to authenticate with kafka server on ssl port and so on When will the full implementation be available for GA? Thanks again. Jamie -Original Message- From: Sriharsha Chintalapani [mailto:ka...@harsha.io] Sent: Thursday, May 07, 2015 7:22 PM To: Jamie Wang; users@kafka.apache.org Subject: Re: Support https or ssl Hi Jamie, I am currently working on providing ssl support for kafka. Here are the iras https://issues.apache.org/jira/browse/KAFKA-1690 and https://issues.apache.org/jira/browse/KAFKA-1684 . If you are using REST api to front kafka producer than you can probably make that http server to be on ssl. -- Harsha On May 7, 2015 at 7:07:58 PM, Jamie Wang (jamie.w...@actuate.com) wrote: Hello, It's been a while since my team worked on kafka related project. Btw, previous project using Kafka worked wonderfully for us. Now I have requirement to use https or SSL. I am wondering if the latest version has support for SSL. If not, what is the timeline this functionality would supported and if there is any suggestion on what I can do in the interim to provide a similar functionality using Kakfa. Thank you in advance for your time and help. Jamie
Re: Support https or ssl
Hi Jamie, I am currently working on providing ssl support for kafka. Here are the iras https://issues.apache.org/jira/browse/KAFKA-1690 and https://issues.apache.org/jira/browse/KAFKA-1684 . If you are using REST api to front kafka producer than you can probably make that http server to be on ssl. -- Harsha On May 7, 2015 at 7:07:58 PM, Jamie Wang (jamie.w...@actuate.com) wrote: Hello, It's been a while since my team worked on kafka related project. Btw, previous project using Kafka worked wonderfully for us. Now I have requirement to use https or SSL. I am wondering if the latest version has support for SSL. If not, what is the timeline this functionality would supported and if there is any suggestion on what I can do in the interim to provide a similar functionality using Kakfa. Thank you in advance for your time and help. Jamie
Re: [KIP-DISCUSSION] KIP-22 Expose a Partitioner interface in the new producer
Hi Jay, Sorry about the KIP formatting . I fixed those in the KIP. 2. We certainly need to add both the serialized and unserialized form for the key as both are useful. I added those to the interface. 3. Do we need to add the value? I suspect people will have uses for computing something off a few fields in the value to choose the partition. This would be useful in cases where the key was being used for log compaction purposes and did not contain the full information for computing the partition. added it as well. 4. This interface doesn't include either an init() or close() method. It should implement Closable and Configurable, right? I am not quite sure about having init() or close() for partitioner. Are we looking at partitioner using some external resources to initialize and close. If thats the case than init() should also take in some config as param, this can add more complexity. 5. What happens if the user both sets the partition id in the ProducerRecord and sets a partitioner? Does the partition id just get passed in to the partitioner (as sort of implied in this interface?). This is a bit weird since if you pass in the partition id you kind of expect it to get used, right? Or is it the case that if you specify a partition the partitioner isn't used at all (in which case no point in including partition in the Partitioner api). In current Producer Record partition id is getting passed to Partitioner. If a custom partitioner is not going to use that than thats up to their implementation right. Similarly in our interface we’ve Value as another param this may or may not be used. Essentially its up to the Partitioner to disclose on what available information they are going to partition against. Thanks, Harsha On April 23, 2015 at 9:11:33 AM, Jay Kreps (jay.kr...@gmail.com) wrote: Hey Harsha, A few comments: Can you finish up the KIP there are some unfinished sentences and odd whitespace things going on. Here are the questions I think we should consider: 1. Do we need this at all given that we have the partition argument in ProducerRecord which gives full control? I think we do need it because this is a way to plug in a different partitioning strategy at run time and do it in a fairly transparent way. 2. We certainly need to add both the serialized and unserialized form for the key as both are useful. 3. Do we need to add the value? I suspect people will have uses for computing something off a few fields in the value to choose the partition. This would be useful in cases where the key was being used for log compaction purposes and did not contain the full information for computing the partition. 4. This interface doesn't include either an init() or close() method. It should implement Closable and Configurable, right? 5. What happens if the user both sets the partition id in the ProducerRecord and sets a partitioner? Does the partition id just get passed in to the partitioner (as sort of implied in this interface?). This is a bit weird since if you pass in the partition id you kind of expect it to get used, right? Or is it the case that if you specify a partition the partitioner isn't used at all (in which case no point in including partition in the Partitioner api). Cheers, -Jay On Thu, Apr 23, 2015 at 6:55 AM, Sriharsha Chintalapani ka...@harsha.io wrote: Hi, Here is the KIP for adding a partitioner interface for producer. https://cwiki.apache.org/confluence/display/KAFKA/KIP-+22+-+Expose+a+Partitioner+interface+in+the+new+producer There is one open question about how interface should look like. Please take a look and let me know if you prefer one way or the other. Thanks, Harsha
[KIP-DISCUSSION] KIP-22 Expose a Partitioner interface in the new producer
Hi, Here is the KIP for adding a partitioner interface for producer. https://cwiki.apache.org/confluence/display/KAFKA/KIP-+22+-+Expose+a+Partitioner+interface+in+the+new+producer There is one open question about how interface should look like. Please take a look and let me know if you prefer one way or the other. Thanks, Harsha
Re: [DISCUSS] KIP-12 - Kafka Sasl/Kerberos implementation
1. *Support for running potentially long-running delegated tasks outside the network thread*: It is recommended that delegated tasks indicated by a handshake status of NEED_TASK are run on a separate thread since they may block ( http://docs.oracle.com/javase/7/docs/api/javax/net/ssl/SSLEngine.html). It is easier to encapsulate this in SSLChannel without any changes to common code if selection keys are managed within the Channel. This makes sense I can change code to not do it on the network thread. Right now we are doing the handshake as part of the processor ( it shouldn’t be in acceptor) and we have multiple processors thread. Do we still see this as an issue if it happens on the same thread as processor? . -- Harsha Sent with Airmail On April 22, 2015 at 7:18:17 AM, Sriharsha Chintalapani (harsh...@fastmail.fm) wrote: Hi Rajini, Thanks for the details. I did go through your code . There was a discussion before about not having selector related code into the channel or extending the selector it self. 1. *Support for running potentially long-running delegated tasks outside the network thread*: It is recommended that delegated tasks indicated by a handshake status of NEED_TASK are run on a separate thread since they may block ( http://docs.oracle.com/javase/7/docs/api/javax/net/ssl/SSLEngine.html). It is easier to encapsulate this in SSLChannel without any changes to common code if selection keys are managed within the Channel. This makes sense I can change code to not do it on the network thread. 2. *Renegotiation handshake*: During a read operation, handshake status may indicate that renegotiation is required. It will be good to encapsulate this state change (and any knowledge of these SSL-specific state transitions) within SSLChannel. Our experience was that managing keys and state within the SSLChannel rather than in Selector made this code neater. Do we even want to support renegotiation. This is a case where user/client handshakes with server anonymously but later want to change and present their identity and establish a new SSL session. In our producer or consumers either present their identity ( two -way auth) or not. Since these are long running processes I don’t see that there might be a case where they initially establish the session and later present their identity. *Graceful shutdown of the SSL connection*s: Our experience was that we could encapsulate all of the logic for shutting down SSLEngine gracefully within SSLChannel when the selection key and state are owned and managed by SSLChannel. Can’t this be done when channel.close() is called any reason to own the selection key. 4. *And finally a minor point:* We found that by managing selection key and selection interests within SSLChannel, protocol-independent Selector didn't need the concept of handshake at all and all channel state management and handshake related code could be held in protocol-specific classes. This may be worth taking into consideration since it makes it easier for common network layer code to be maintained without any understanding of the details of individual security protocols. The only thing network code( SocketServer) is aware of channel isHandshakeComplete if its not do the handshake or go about read/write from channel. Yes socketServer need to be aware of channel is ready to read or not. But on the other hand there isn’t too many details of handshake leaked into socketServer. Either we let server know that a channel needs handshake or we keep the selectionKey state into channel which means we are adding selector related code into channel. Thanks, Harsha On April 22, 2015 at 3:56:04 AM, Rajini Sivaram (rajinisiva...@googlemail.com) wrote: When we were working on the client-side SSL implementation for Kafka, we found that returning selection interest from handshake() method wasn't sufficient to handle some of the SSL sequences. We resorted to managing the selection key and interest state within SSLChannel to avoid SSL-specific knowledge escaping out of SSL classes into protocol-independent network code. The current server-side SSL patch doesn't address these scenarios yet, but we may want to take these into account while designing the common Channel class/interface. 1. *Support for running potentially long-running delegated tasks outside the network thread*: It is recommended that delegated tasks indicated by a handshake status of NEED_TASK are run on a separate thread since they may block ( http://docs.oracle.com/javase/7/docs/api/javax/net/ssl/SSLEngine.html). It is easier to encapsulate this in SSLChannel without any changes to common code if selection keys are managed within the Channel. 2. *Renegotiation handshake*: During a read operation, handshake status may indicate that renegotiation is required. It will be good to encapsulate this state change (and any knowledge of these SSL-specific state transitions) within
Re: [DISCUSS] KIP-12 - Kafka Sasl/Kerberos implementation
understanding of the details of individual security protocols. The channel classes we used are included in the patch in https://issues.apache.org/jira/browse/KAFKA-1690. The patch contains unit tests to validate these scenarios as well as other buffer overflow conditions which may be useful for server-side code when the scenarios described above are implemented. Regards, Rajini On Tue, Apr 21, 2015 at 11:13 PM, Sriharsha Chintalapani harsh...@fastmail.fm wrote: Hi Jay, Thanks for the review. 1. Isn't the blocking handshake going to be a performance concern? Can we do the handshake non-blocking instead? If anything that causes connections to drop can incur blocking network roundtrips won't that eat up all the network threads immediately? I guess I would have to look at that code to know... I’ve non-blocking handshake on the server side as well as for new producer client. Blocking handshake is only done for BlockingChannel.scala and it just loops over the non-blocking hand shake until the context is established. So on the server side (SocketServer.scala) as it goes through the steps and returns “READ or WRITE” signal for next step. For BlockingChannel the worst case I look at is the connection timeout but most times this handshake will finish up much quicker . I am cleaning up the code will send up a patch in next few days . 2. Do we need to support blocking channel at all? That is just for the old clients, and I think we should probably just leave those be to reduce scope here. So blocking channel used not only by simple consumer but also ControllerChannelManager and controlled shutdown also. Are we planning on deprecating it. I think at least for ControllerChannelManager it makes sense to have a blocking channel. If the users want to lock down the cluster i.e no PLAINTEXT channels are allowed than all the communication has to go through either SSL and KERBEROS so in this case we need add this capability to BlockingChannel. 3. Can we change the APIs to drop the getters when that is not required by the API being implemented. In general we don't use setters and getters as a naming convention. My bad on adding getters and setters :). I’ll work on removing it and change the KIP accordingly. I still need some accessor methods though . Thanks, Harsha On April 21, 2015 at 2:51:15 PM, Jay Kreps (jay.kr...@gmail.com) wrote: Hey Sriharsha, Thanks for the excellent write-up. Couple of minor questions: 1. Isn't the blocking handshake going to be a performance concern? Can we do the handshake non-blocking instead? If anything that causes connections to drop can incur blocking network roundtrips won't that eat up all the network threads immediately? I guess I would have to look at that code to know... 2. Do we need to support blocking channel at all? That is just for the old clients, and I think we should probably just leave those be to reduce scope here. 3. Can we change the APIs to drop the getters when that is not required by the API being implemented. In general we don't use setters and getters as a naming convention. The long explanation on that is that setters/getters kind of imply a style of java programming where you have simple structs with getters and setters for each field. In general we try to have access methods only when necessary, and rather than setters model the full change or action being carried out, and if possible disallow change entirely. This is more in line with modern java style I think. We aren't perfect in following this, but once you start with getters and setters people start just adding them everywhere and then using them. -Jay On Mon, Apr 20, 2015 at 10:42 AM, Sriharsha Chintalapani ka...@harsha.io wrote: Hi, I updated the KIP-12 with more details. Please take a look https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=51809888 Thanks, Harsha On February 11, 2015 at 10:02:43 AM, Harsha (ka...@harsha.io) wrote: Thanks Joe. It will be part of KafkaServer and will run on its own thread. Since each kafka server will run with a keytab we should make sure they are all getting renewed. On Wed, Feb 11, 2015, at 10:00 AM, Joe Stein wrote: Thanks Harsha, looks good so far. How were you thinking of running the KerberosTicketManager as a standalone process or like controller or is it a layer of code that does the plumbing pieces everywhere? ~ Joestein On Wed, Feb 11, 2015 at 12:18 PM, Harsha ka...@harsha.io wrote: Hi, Here is the initial proposal for sasl/kerberos implementation for kafka https://cwiki.apache.org/confluence/x/YI4WAw and JIRA https://issues.apache.org/jira/browse/KAFKA-1686. I am
Re: [DISCUSS] KIP-12 - Kafka Sasl/Kerberos implementation
Hi Jay, Thanks for the review. 1. Isn't the blocking handshake going to be a performance concern? Can we do the handshake non-blocking instead? If anything that causes connections to drop can incur blocking network roundtrips won't that eat up all the network threads immediately? I guess I would have to look at that code to know... I’ve non-blocking handshake on the server side as well as for new producer client. Blocking handshake is only done for BlockingChannel.scala and it just loops over the non-blocking hand shake until the context is established. So on the server side (SocketServer.scala) as it goes through the steps and returns “READ or WRITE” signal for next step. For BlockingChannel the worst case I look at is the connection timeout but most times this handshake will finish up much quicker . I am cleaning up the code will send up a patch in next few days . 2. Do we need to support blocking channel at all? That is just for the old clients, and I think we should probably just leave those be to reduce scope here. So blocking channel used not only by simple consumer but also ControllerChannelManager and controlled shutdown also. Are we planning on deprecating it. I think at least for ControllerChannelManager it makes sense to have a blocking channel. If the users want to lock down the cluster i.e no PLAINTEXT channels are allowed than all the communication has to go through either SSL and KERBEROS so in this case we need add this capability to BlockingChannel. 3. Can we change the APIs to drop the getters when that is not required by the API being implemented. In general we don't use setters and getters as a naming convention. My bad on adding getters and setters :). I’ll work on removing it and change the KIP accordingly. I still need some accessor methods though . Thanks, Harsha On April 21, 2015 at 2:51:15 PM, Jay Kreps (jay.kr...@gmail.com) wrote: Hey Sriharsha, Thanks for the excellent write-up. Couple of minor questions: 1. Isn't the blocking handshake going to be a performance concern? Can we do the handshake non-blocking instead? If anything that causes connections to drop can incur blocking network roundtrips won't that eat up all the network threads immediately? I guess I would have to look at that code to know... 2. Do we need to support blocking channel at all? That is just for the old clients, and I think we should probably just leave those be to reduce scope here. 3. Can we change the APIs to drop the getters when that is not required by the API being implemented. In general we don't use setters and getters as a naming convention. The long explanation on that is that setters/getters kind of imply a style of java programming where you have simple structs with getters and setters for each field. In general we try to have access methods only when necessary, and rather than setters model the full change or action being carried out, and if possible disallow change entirely. This is more in line with modern java style I think. We aren't perfect in following this, but once you start with getters and setters people start just adding them everywhere and then using them. -Jay On Mon, Apr 20, 2015 at 10:42 AM, Sriharsha Chintalapani ka...@harsha.io wrote: Hi, I updated the KIP-12 with more details. Please take a look https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=51809888 Thanks, Harsha On February 11, 2015 at 10:02:43 AM, Harsha (ka...@harsha.io) wrote: Thanks Joe. It will be part of KafkaServer and will run on its own thread. Since each kafka server will run with a keytab we should make sure they are all getting renewed. On Wed, Feb 11, 2015, at 10:00 AM, Joe Stein wrote: Thanks Harsha, looks good so far. How were you thinking of running the KerberosTicketManager as a standalone process or like controller or is it a layer of code that does the plumbing pieces everywhere? ~ Joestein On Wed, Feb 11, 2015 at 12:18 PM, Harsha ka...@harsha.io wrote: Hi, Here is the initial proposal for sasl/kerberos implementation for kafka https://cwiki.apache.org/confluence/x/YI4WAw and JIRA https://issues.apache.org/jira/browse/KAFKA-1686. I am currently working on prototype which will add more details to the KIP. Just opening the thread to say the work is in progress. I'll update the thread with a initial prototype patch. Thanks, Harsha
Re: Topic existance Java API
Mingtao, I think you are looking at scala version 2.8.2 . Check your kafka version by kafka_2.8.2-${version} . In 0.8.2 we’ve AdminUtils.topicExists(zkClient, topic) . Thanks, Harsha On April 20, 2015 at 2:20:27 PM, Mingtao Zhang (mail2ming...@gmail.com) wrote: Hi, I am on Kafka 2.8.2. Looking for a way to check whether topic existed or not in Java. I saw there is a method called listTopics - TopicCommand.listTopics, but the return type is void. Is there an API to check topic existance? (preferable argument: (zkconnect, topic)) Thanks in advance! Best Regards, Mingtao
Re: [DISCUSS] KIP-12 - Kafka Sasl/Kerberos implementation
Hi Jun, I am using the underlying protocol GSS-API that sasl also uses. I can add details about LDAP/AD . For AD , this is in general the integration of AD to KERBEROS. I.e kerberos can talk to AD to get the kinit login credentials ( more of a setup details between kerberos and AD) . For LDAP GSS-API allows you to do DIGEST auth as well. I’ll add the details regarding both of these. For SSL support I’ll add the details to the same KIP as they both extend the same Channel and share some of the implementation. Thanks, Harsha On April 20, 2015 at 12:31:12 PM, Jun Rao (j...@confluent.io) wrote: Hi, Harsha, For SASL, a common use case is the integration with LDAP/AD. For completeness, could you describe (or provide a link) how such integration can be done? Also, what about the SSL support, do you plan to describe it in same same KIP or a separate one? Thanks, Jun On Mon, Apr 20, 2015 at 12:42 PM, Sriharsha Chintalapani ka...@harsha.io wrote: Hi, I updated the KIP-12 with more details. Please take a look https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=51809888 Thanks, Harsha On February 11, 2015 at 10:02:43 AM, Harsha (ka...@harsha.io) wrote: Thanks Joe. It will be part of KafkaServer and will run on its own thread. Since each kafka server will run with a keytab we should make sure they are all getting renewed. On Wed, Feb 11, 2015, at 10:00 AM, Joe Stein wrote: Thanks Harsha, looks good so far. How were you thinking of running the KerberosTicketManager as a standalone process or like controller or is it a layer of code that does the plumbing pieces everywhere? ~ Joestein On Wed, Feb 11, 2015 at 12:18 PM, Harsha ka...@harsha.io wrote: Hi, Here is the initial proposal for sasl/kerberos implementation for kafka https://cwiki.apache.org/confluence/x/YI4WAw and JIRA https://issues.apache.org/jira/browse/KAFKA-1686. I am currently working on prototype which will add more details to the KIP. Just opening the thread to say the work is in progress. I'll update the thread with a initial prototype patch. Thanks, Harsha
Re: [DISCUSS] KIP-12 - Kafka Sasl/Kerberos implementation
Hi, I updated the KIP-12 with more details. Please take a look https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=51809888 Thanks, Harsha On February 11, 2015 at 10:02:43 AM, Harsha (ka...@harsha.io) wrote: Thanks Joe. It will be part of KafkaServer and will run on its own thread. Since each kafka server will run with a keytab we should make sure they are all getting renewed. On Wed, Feb 11, 2015, at 10:00 AM, Joe Stein wrote: Thanks Harsha, looks good so far. How were you thinking of running the KerberosTicketManager as a standalone process or like controller or is it a layer of code that does the plumbing pieces everywhere? ~ Joestein On Wed, Feb 11, 2015 at 12:18 PM, Harsha ka...@harsha.io wrote: Hi, Here is the initial proposal for sasl/kerberos implementation for kafka https://cwiki.apache.org/confluence/x/YI4WAw and JIRA https://issues.apache.org/jira/browse/KAFKA-1686. I am currently working on prototype which will add more details to the KIP. Just opening the thread to say the work is in progress. I'll update the thread with a initial prototype patch. Thanks, Harsha