Re: SSL between Kafka and Spark Streaming API

2015-08-28 Thread Sriharsha Chintalapani
SSL is supported for new producer and consumer api and old api (simple consumer 
and high-level consumer) is not supported.
I think spark uses simple consumer? if so its not supported.

Thanks,
Harsha


On August 28, 2015 at 11:00:30 AM, Cassa L (lcas...@gmail.com) wrote:

Hi, 
I was going through SSL setup of Kafka. 
https://cwiki.apache.org/confluence/display/KAFKA/Deploying+SSL+for+Kafka 
However, I am also using Spark-Kafka streaming to read data from Kafka. Is 
there a way to activate SSL for spark streaming API or not possible at all? 

Thanks, 
LCassa 


Re: SSL between Kafka and Spark Streaming API

2015-08-28 Thread Sriharsha Chintalapani
You can configure PLAINTEXT listener as well with the broker and use that port 
for spark.

-- 
Harsha


On August 28, 2015 at 12:24:45 PM, Sourabh Chandak (sourabh3...@gmail.com) 
wrote:

Can we use the existing kafka spark streaming jar to connect to a kafka server 
running in SSL mode?

We are fine with non SSL consumer as our kafka cluster and spark cluster are in 
the same network


Thanks,
Sourabh

On Fri, Aug 28, 2015 at 12:03 PM, Gwen Shapira g...@confluent.io wrote:
I can't speak for the Spark Community, but checking their code,
DirectKafkaStream and KafkaRDD use the SimpleConsumer API:

https://github.com/apache/spark/blob/master/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
https://github.com/apache/spark/blob/master/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala

On Fri, Aug 28, 2015 at 11:32 AM, Cassa L lcas...@gmail.com wrote:

 Hi I am using below Spark jars with Direct Stream API.
   spark-streaming-kafka_2.10

 When I look at its pom.xml, Kafka libraries that its pulling in is
    groupIdorg.apache.kafka/groupId
        artifactIdkafka_${scala.binary.version}/artifactId
        version0.8.2.1/version


 I believe this DirectStream API uses SimpleConsumer API. Can someone from
 Spark community confirm too?

 Thanks,
 LCassa.

 On Fri, Aug 28, 2015 at 11:12 AM, Sriharsha Chintalapani ka...@harsha.io
 wrote:

  SSL is supported for new producer and consumer api and old api (simple
  consumer and high-level consumer) is not supported.
  I think spark uses simple consumer? if so its not supported.
 
  Thanks,
  Harsha
 
 
  On August 28, 2015 at 11:00:30 AM, Cassa L (lcas...@gmail.com) wrote:
 
  Hi,
  I was going through SSL setup of Kafka.
 
 https://cwiki.apache.org/confluence/display/KAFKA/Deploying+SSL+for+Kafka
  However, I am also using Spark-Kafka streaming to read data from Kafka.
 Is
  there a way to activate SSL for spark streaming API or not possible at
  all?
 
  Thanks,
  LCassa
 
 




Re: Issue when enabling SSL on broker

2015-08-25 Thread Sriharsha Chintalapani
Hi,
      Turns out to be a bug in the instructions in the wiki . I fixed it can 
you please retry generating the truststore and keystore
https://cwiki.apache.org/confluence/display/KAFKA/Deploying+SSL+for+Kafka .
checkout this section All of the above steps in a bash script” to generate the 
keystores.

Thanks,
Harsha


On August 25, 2015 at 8:56:24 PM, Sriharsha Chintalapani (ka...@harsha.io) 
wrote:

Hi Xiang,
         Did you try following the instructions here 
https://cwiki.apache.org/confluence/display/KAFKA/Deploying+SSL+for+Kafka .
Whats the output of openssl s_client and which version of java and OS are you 
using.

Thanks,
Harsha


On August 25, 2015 at 8:42:18 PM, Xiang Zhou (Samuel) (zhou...@gmail.com) wrote:

no cipher suites in common 

Re: Kafka Cluster behind Proxy

2015-08-17 Thread Sriharsha Chintalapani
Ankit,
       Did you try using advertised.host.name and advertised.port by specifying 
proxy host  port details.

-- 
Harsha
Sent with Airmail

On August 17, 2015 at 12:28:19 AM, Ankit Jain (ankitjainc...@gmail.com) wrote:

Hi All,  

We want to deploy the Kafka cluster behind the proxy. We are exposing the  
proxy IP to client. The client is using the proxy IP to connect to  
zookeeper server to consume the data from Kafka cluster, but as we know,  
the zookeeper return the broker IP's to client and then client directly  
communicate to Kafka Cluster. The IP's return by zookeeper are Kafka IP's  
and which are not open for client. We want to provide all the access  
through proxy only.  

Please let me know, how we can use Kafka Cluster behind Proxy.  
--  
Thanks,  
Ankit Jain  


Re: Controlled Shutdown Tool?

2015-07-27 Thread Sriharsha Chintalapani
controlled.shutdown built into broker when this config set to true it makes 
request to controller to initiate the controlled shutdown, waits till the 
request is succeeded and incase of failure retries the shutdown  
controlled.shutdown.max.retries times.

https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/server/KafkaServer.scala#L175

-- 
Harsha


On July 27, 2015 at 11:50:27 AM, Andrew Otto (ao...@wikimedia.org) wrote:

Thanks!

But how do I initiate a controlled shutdown on a running broker?  Editing 
server.properties is not going to cause this to happen.  Don’t I have to tell 
the broker to shutdown nicely?  All I really want to do is tell the controller 
to move leadership to other replicas, so I can shutdown the broker without 
clients getting all confused.


On Jul 27, 2015, at 14:48, Sriharsha Chintalapani ka...@harsha.io wrote:

You can set controlled.shutdown.enable to true in kafka’s server.properties  , 
this is enabled by default in 0.8.2 on wards
and also you can set max retries using controlled.shutdown.max.retries defaults 
to 3 .


Thanks,
Harsha


On July 27, 2015 at 11:42:32 AM, Andrew Otto (ao...@wikimedia.org) wrote:

I’m working on packaging 0.8.2.1 for Wikimedia, and in doing so I’ve noticed 
that kafka.admin.ShutdownBroker doesn’t exist anymore. From what I can tell, 
this has been intentionally removed in favor of a JMX(?) config 
“controlled.shutdown.enable”. It is unclear from the documentation how one is 
supposed to set this for a running broker. Do I need a special JMX tool in 
order to flick this switch? I’d like to add a command to my kafka bin wrapper 
script so that I can easily use this when restarting brokers.

What is the proper way to set controlled.shutdown.enable?

Thanks!
-Andrew Otto





Re: Controlled Shutdown Tool?

2015-07-27 Thread Sriharsha Chintalapani
You can set controlled.shutdown.enable to true in kafka’s server.properties  , 
this is enabled by default in 0.8.2 on wards
and also you can set max retries using controlled.shutdown.max.retries defaults 
to 3 .


Thanks,
Harsha


On July 27, 2015 at 11:42:32 AM, Andrew Otto (ao...@wikimedia.org) wrote:

I’m working on packaging 0.8.2.1 for Wikimedia, and in doing so I’ve noticed 
that kafka.admin.ShutdownBroker doesn’t exist anymore. From what I can tell, 
this has been intentionally removed in favor of a JMX(?) config 
“controlled.shutdown.enable”. It is unclear from the documentation how one is 
supposed to set this for a running broker. Do I need a special JMX tool in 
order to flick this switch? I’d like to add a command to my kafka bin wrapper 
script so that I can easily use this when restarting brokers.  

What is the proper way to set controlled.shutdown.enable?  

Thanks!  
-Andrew Otto  




Re: Implementing a custom partitioner

2015-07-21 Thread Sriharsha Chintalapani
Hi,
     Are you using the latest trunk for Producer API?.  Did you implement the 
interface here 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-+22+-+Expose+a+Partitioner+interface+in+the+new+producer
-- 
Harsha


On July 21, 2015 at 2:27:05 PM, JIEFU GONG (jg...@berkeley.edu) wrote:

Hi all,  

If I wanted to write my own partitioner, all I would need to do is write a  
class that extends Partitioner and override the partition function,  
correct? I am currently doing so, at least, with a class in the package  
'services', yet when I use:  

properties.put(partitioner.class, services.myPartitioner);  

and instantiate my producer, this doesn't work properly. I'm using a simple  
switch statement, so I am led to believe that I have not improperly written  
my partitioner. After attempting to debug the issue, I  
notice that the constructor I'm entering when attempting to instantiate the  
producer has the line:  

this.partitioner = new Partitioner();  

which more or less ignores my input. Any ideas? Help is appreciated!  




--  

Jiefu Gong  
University of California, Berkeley | Class of 2017  
B.A Computer Science | College of Letters and Sciences  

jg...@berkeley.edu elise...@berkeley.edu | (925) 400-3427  


Re: Implementing a custom partitioner

2015-07-21 Thread Sriharsha Chintalapani
If you are using the new producer api from kafka 0.8.2 there is no pluggable 
partitioner in it for this you need to use the latest trunk. But in 0.8.2 if 
you are using old producer code you can implement a pluggable partitioner 
https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/producer/ProducerConfig.scala#L69
by implementing this interface
https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/producer/Partitioner.scala

and its get created here 
https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/producer/Producer.scala#L61

Thanks,
Harsha


On July 21, 2015 at 2:54:05 PM, JIEFU GONG (jg...@berkeley.edu) wrote:

Sriharsha, thanks for your response. I'm using version 0.8.2, and I am 
implementing kafka.producer.Partitioner. 

I noticed that in the latest trunk the line I specified above is replaced with:

this.partitioner  
=  
config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG,  
Partitioner.class);

does this mean I cannot use my own partitioner in v 0.8.2?


On Tue, Jul 21, 2015 at 2:48 PM, Sriharsha Chintalapani ka...@harsha.io wrote:
Hi,
     Are you using the latest trunk for Producer API?.  Did you implement the 
interface here 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-+22+-+Expose+a+Partitioner+interface+in+the+new+producer
-- 
Harsha


On July 21, 2015 at 2:27:05 PM, JIEFU GONG (jg...@berkeley.edu) wrote:

Hi all,

If I wanted to write my own partitioner, all I would need to do is write a
class that extends Partitioner and override the partition function,
correct? I am currently doing so, at least, with a class in the package
'services', yet when I use:

properties.put(partitioner.class, services.myPartitioner);

and instantiate my producer, this doesn't work properly. I'm using a simple
switch statement, so I am led to believe that I have not improperly written
my partitioner. After attempting to debug the issue, I
notice that the constructor I'm entering when attempting to instantiate the
producer has the line:

this.partitioner = new Partitioner();

which more or less ignores my input. Any ideas? Help is appreciated!




--

Jiefu Gong
University of California, Berkeley | Class of 2017
B.A Computer Science | College of Letters and Sciences

jg...@berkeley.edu elise...@berkeley.edu | (925) 400-3427



--
Jiefu Gong
University of California, Berkeley | Class of 2017
B.A Computer Science | College of Letters and Sciences
jg...@berkeley.edu | (925) 400-3427


Re: Dropping support for Scala 2.9.x

2015-07-08 Thread Sriharsha Chintalapani
I am +1 on dropping 2.9.x support.

Thanks, 
Harsha


On July 8, 2015 at 7:08:12 AM, Ismael Juma (mli...@juma.me.uk) wrote:

Hi,  

The responses in this thread were positive, but there weren't many. A few  
months passed and Sriharsha encouraged me to reopen the thread given that  
the 2.9 build has been broken for at least a week[1] and no-one seemed to  
notice.  

Do we want to invest more time so that the 2.9 build continues to work or  
do we want to focus our efforts on 2.10 and 2.11? Please share your opinion.  

Best,  
Ismael  

[1] https://issues.apache.org/jira/browse/KAFKA-2325  

On Fri, Mar 27, 2015 at 2:20 PM, Ismael Juma mli...@juma.me.uk wrote:  

 Hi all,  
  
 The Kafka build currently includes support for Scala 2.9, which means that  
 it cannot take advantage of features introduced in Scala 2.10 or depend on  
 libraries that require it.  
  
 This restricts the solutions available while trying to solve existing  
 issues. I was browsing JIRA looking for areas to contribute and I quickly  
 ran into two issues where this is the case:  
  
 * KAFKA-1351: String.format is very expensive in Scala could be solved  
 nicely by using the String interpolation feature introduced in Scala 2.10.  
  
 * KAFKA-1595: Remove deprecated and slower scala JSON parser from  
 kafka.consumer.TopicCount could be solved by using an existing JSON  
 library, but both jackson-scala and play-json require 2.10 (argonaut  
 supports Scala 2.9, but it brings other dependencies like scalaz). We can  
 workaround this by writing our own code instead of using libraries, of  
 course, but it's not ideal.  
  
 Other features like Scala Futures and value classes would also be useful  
 in some situations, I would think (for a more extensive list of new  
 features, see  
 http://scala-language.1934581.n4.nabble.com/Scala-2-10-0-now-available-td4634126.html
   
 ).  
  
 Another pain point of supporting 2.9.x is that it doubles the number of  
 build and test configurations required from 2 to 4 (because the 2.9.x  
 series was not necessarily binary compatible).  
  
 A strong argument for maintaining support for 2.9.x was the client  
 library, but that has been rewritten in Java.  
  
 It's also worth mentioning that Scala 2.9.1 was released in August 2011  
 (more than 3.5 years ago) and the 2.9.x series hasn't received updates of  
 any sort since early 2013. Scala 2.10.0, in turn, was released in January  
 2013 (over 2 years ago) and 2.10.5, the last planned release in the 2.10.x  
 series, has been recently released (so even 2.10.x won't be receiving  
 updates any longer).  
  
 All in all, I think it would not be unreasonable to drop support for Scala  
 2.9.x in a future release, but I may be missing something. What do others  
 think?  
  
 Ismael  
  


Re: latest kafka consumer api maven location

2015-06-21 Thread Sriharsha Chintalapani
Sushant,
     You are using kafka clients new consumer api. It looks like you want to 
use high-level consumer api?. If so you need use following kafka core lib as 
the dependency
dependency
groupIdorg.apache.kafka/groupId
artifactIdkafka_2.10/artifactId
version0.8.2.1/version
/dependency

More details on this page 
https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example
-- 
Harsha


On June 21, 2015 at 8:02:01 AM, Shushant Arora (shushantaror...@gmail.com) 
wrote:

which is the latest jar to be used for kafka java client.  

As in  
dependency  
groupIdorg.apache.kafka/groupId  
artifactIdkafka-clients/artifactId  
version0.8.2.1/version  
/dependency  

In class org.apache.kafka.clients.consumer.KafkaConsumer  

public MapString, ConsumerRecordsK,V poll(long timeout) {  
// TODO Auto-generated method stub  
return null;  
}  

poll method returns null. I want to use a high level java consumer.  

And why in org.apache.kafka.clients.consumer.ConsumerConfig configuration  
is for  
public static final String BOOTSTRAP_SERVERS_CONFIG = bootstrap.servers;  
not for zookeeper.connect ? Is in highlevel offsets are maintained by  
zookeeper  
then why broker address is required?  


Re: Log file of server start up error

2015-05-26 Thread Sriharsha Chintalapani
       It looks like you have another process running for kafka broker. Stop 
that broker and start a new one.

-- 
Harsha


On May 24, 2015 at 11:05:27 PM, Sanjay Mistry (mistrysanja...@gmail.com) wrote:

Hello,  

When run this command bin/kafka-console-producer.sh --broker-list  
localhost:9092 --topic testing. i am getting INFO Closing socket  
connection to /127.0.0.1. (kafka.network.Processor) error in server konsol.  

Please help me.  


On Sat, May 23, 2015 at 12:19 PM, Sanjay Mistry mistrysanja...@gmail.com  
wrote:  

 [2015-05-23 12:16:41,624] INFO Initiating client connection,  
 connectString=localhost:2181 sessionTimeout=6000  
 watcher=org.I0Itec.zkclient.ZkClient@70808f4e  
 (org.apache.zookeeper.ZooKeeper)  
 [2015-05-23 12:16:41,659] INFO Opening socket connection to server  
 localhost/0:0:0:0:0:0:0:1:2181 (org.apache.zookeeper.ClientCnxn)  
 [2015-05-23 12:16:41,673] INFO Socket connection established to  
 localhost/0:0:0:0:0:0:0:1:2181, initiating session  
 (org.apache.zookeeper.ClientCnxn)  
 [2015-05-23 12:16:41,740] INFO Session establishment complete on server  
 localhost/0:0:0:0:0:0:0:1:2181, sessionid = 0x14d7f85f7a3, negotiated  
 timeout = 6000 (org.apache.zookeeper.ClientCnxn)  
 [2015-05-23 12:16:41,743] INFO zookeeper state changed (SyncConnected)  
 (org.I0Itec.zkclient.ZkClient)  
 [2015-05-23 12:16:42,015] FATAL Fatal error during KafkaServerStable  
 startup. Prepare to shutdown (kafka.server.KafkaServerStartable)  
 kafka.common.KafkaException: Failed to acquire lock on file .lock in  
 /tmp/kafka-logs. A Kafka instance in another process or thread is using  
 this directory.  
 at  
 kafka.log.LogManager$$anonfun$lockLogDirs$1.apply(LogManager.scala:95)  
 at  
 kafka.log.LogManager$$anonfun$lockLogDirs$1.apply(LogManager.scala:92)  
 at  
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
   
 at  
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
   
 at  
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
   
 at  
 scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:33)  
 at  
 scala.collection.TraversableLike$class.map(TraversableLike.scala:233)  
 at scala.collection.mutable.WrappedArray.map(WrappedArray.scala:33)  
 at kafka.log.LogManager.lockLogDirs(LogManager.scala:92)  
 at kafka.log.LogManager.init(LogManager.scala:55)  
 at kafka.server.KafkaServer.createLogManager(KafkaServer.scala:275)  
 at kafka.server.KafkaServer.startup(KafkaServer.scala:72)  
 at  
 kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:34)  
 at kafka.Kafka$.main(Kafka.scala:46)  
 at kafka.Kafka.main(Kafka.scala)  
 [2015-05-23 12:16:42,023] INFO [Kafka Server 0], shutting down  
 (kafka.server.KafkaServer)  
 [2015-05-23 12:16:42,030] INFO Terminate ZkClient event thread.  
 (org.I0Itec.zkclient.ZkEventThread)  
 [2015-05-23 12:16:42,036] INFO EventThread shut down  
 (org.apache.zookeeper.ClientCnxn)  
 [2015-05-23 12:16:42,037] INFO Session: 0x14d7f85f7a3 closed  
 (org.apache.zookeeper.ZooKeeper)  
 [2015-05-23 12:16:42,038] INFO [Kafka Server 0], shut down completed  
 (kafka.server.KafkaServer)  
 [2015-05-23 12:16:42,040] INFO [Kafka Server 0], shutting down  
 (kafka.server.KafkaServer)  
  
  


Re: Delete topic pending

2015-05-18 Thread Sriharsha Chintalapani
May be thats a bug. We should probably alert the user saying the topic doesn’t 
exist rather than go and add it to the zookeeper for deletion trigger. Also to 
delete any topic you need to set “delete.topic.enable” to true

-- 
Harsha


On May 18, 2015 at 10:16:46 AM, Dillian Murphey (crackshotm...@gmail.com) wrote:

If a broker doesn't have the topic, and I run delete topic, that topic will  
be in a pending delete state forever.  

What am I doing wrong here?  

Also, what if I have data loss and I just want to delete the dang topic  
form zookeeper directly with non of this pending stuff.  

Thanks  


RE: Support https or ssl

2015-05-08 Thread Sriharsha Chintalapani
Jamie,
      Once the patch gets in you can configure kafka brokers with ssl configs 
and similarly you can configure producer and consumer with ssl both will 
negotiate a ssl connection once its established rest of the communication will 
happen over ssl. I am not sure about the timeline but we are working to get 
this asap.  
In my previous email I was talking about 
https://cwiki.apache.org/confluence/display/KAFKA/Clients#Clients-HTTPREST  or 
https://github.com/confluentinc/kafka-rest
and front with them a http server that accepts ssl connections. Its bit of 
redirection but the end connections to kafka brokers won’t be ssl . If thats 
your use case I would suggest you to wait till these patches gets in.
Thanks,
Harsha


On May 8, 2015 at 1:35:17 PM, Jamie Wang (jamie.w...@actuate.com) wrote:

Thank you Harsha for the reply. Will the patch work with 0.8.2.1? So the 
general logic is that we write an agent that sends message via https and then 
may be a servlet using Kafka produce class to authenticate with kafka server on 
ssl port and so on When will the full implementation be available for GA? 
Thanks again.  

Jamie  

-Original Message-  
From: Sriharsha Chintalapani [mailto:ka...@harsha.io]  
Sent: Thursday, May 07, 2015 7:22 PM  
To: Jamie Wang; users@kafka.apache.org  
Subject: Re: Support https or ssl  

Hi Jamie,  
        I am currently working on providing ssl support for kafka. Here are the 
iras https://issues.apache.org/jira/browse/KAFKA-1690 and 
https://issues.apache.org/jira/browse/KAFKA-1684 . If you are using REST api to 
front kafka producer than you can probably make that http server to be on ssl.  
--   
Harsha  


On May 7, 2015 at 7:07:58 PM, Jamie Wang (jamie.w...@actuate.com) wrote:  

Hello,  

It's been a while since my team worked on kafka related project. Btw, previous 
project using Kafka worked wonderfully for us. Now I have requirement to use 
https or SSL. I am wondering if the latest version has support for SSL. If not, 
what is the timeline this functionality would supported and if there is any 
suggestion on what I can do in the interim to provide a similar functionality 
using Kakfa. Thank you in advance for your time and help.  

Jamie  


Re: Support https or ssl

2015-05-07 Thread Sriharsha Chintalapani
Hi Jamie,
        I am currently working on providing ssl support for kafka. Here are the 
iras https://issues.apache.org/jira/browse/KAFKA-1690 and 
https://issues.apache.org/jira/browse/KAFKA-1684 . If you are using REST api to 
front kafka producer than you can probably make that http server to be on ssl.
-- 
Harsha


On May 7, 2015 at 7:07:58 PM, Jamie Wang (jamie.w...@actuate.com) wrote:

Hello,  

It's been a while since my team worked on kafka related project. Btw, previous 
project using Kafka worked wonderfully for us. Now I have requirement to use 
https or SSL. I am wondering if the latest version has support for SSL. If not, 
what is the timeline this functionality would supported and if there is any 
suggestion on what I can do in the interim to provide a similar functionality 
using Kakfa. Thank you in advance for your time and help.  

Jamie  


Re: [KIP-DISCUSSION] KIP-22 Expose a Partitioner interface in the new producer

2015-04-23 Thread Sriharsha Chintalapani
Hi Jay,
         Sorry about the KIP formatting . I fixed those in the KIP.

2. We certainly need to add both the serialized and unserialized form for 
the key as both are useful. 
I added those to the interface.

3. Do we need to add the value? I suspect people will have uses for 
computing something off a few fields in the value to choose the partition. 
This would be useful in cases where the key was being used for log 
compaction purposes and did not contain the full information for computing 
the partition. 
added it as well.

4. This interface doesn't include either an init() or close() method. It 
should implement Closable and Configurable, right? 
I am not quite sure about having init() or close() for partitioner. Are we 
looking at partitioner using some external resources to initialize and close. 
If thats the case than init() should also take in some config as param, this 
can add more complexity.



5. What happens if the user both sets the partition id in the 
ProducerRecord and sets a partitioner? Does the partition id just get 
passed in to the partitioner (as sort of implied in this interface?). This 
is a bit weird since if you pass in the partition id you kind of expect it 
to get used, right? Or is it the case that if you specify a partition the 
partitioner isn't used at all (in which case no point in including 
partition in the Partitioner api). 
In current Producer Record partition id is getting passed to Partitioner. If a 
custom partitioner is not going to use that than thats up to their 
implementation  right. Similarly in our interface we’ve Value as another param 
this may or may not be used. Essentially its up to the Partitioner to disclose 
on what available information they are going to partition against.


Thanks,
Harsha


On April 23, 2015 at 9:11:33 AM, Jay Kreps (jay.kr...@gmail.com) wrote:

Hey Harsha,  

A few comments:  

Can you finish up the KIP there are some unfinished sentences and odd  
whitespace things going on.  

Here are the questions I think we should consider:  
1. Do we need this at all given that we have the partition argument in  
ProducerRecord which gives full control? I think we do need it because this  
is a way to plug in a different partitioning strategy at run time and do it  
in a fairly transparent way.  
2. We certainly need to add both the serialized and unserialized form for  
the key as both are useful.  
3. Do we need to add the value? I suspect people will have uses for  
computing something off a few fields in the value to choose the partition.  
This would be useful in cases where the key was being used for log  
compaction purposes and did not contain the full information for computing  
the partition.  
4. This interface doesn't include either an init() or close() method. It  
should implement Closable and Configurable, right?  
5. What happens if the user both sets the partition id in the  
ProducerRecord and sets a partitioner? Does the partition id just get  
passed in to the partitioner (as sort of implied in this interface?). This  
is a bit weird since if you pass in the partition id you kind of expect it  
to get used, right? Or is it the case that if you specify a partition the  
partitioner isn't used at all (in which case no point in including  
partition in the Partitioner api).  

Cheers,  

-Jay  

On Thu, Apr 23, 2015 at 6:55 AM, Sriharsha Chintalapani ka...@harsha.io  
wrote:  

 Hi,  
 Here is the KIP for adding a partitioner interface for producer.  
  
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-+22+-+Expose+a+Partitioner+interface+in+the+new+producer
   
 There is one open question about how interface should look like. Please  
 take a look and let me know if you prefer one way or the other.  
  
 Thanks,  
 Harsha  
  
  


[KIP-DISCUSSION] KIP-22 Expose a Partitioner interface in the new producer

2015-04-23 Thread Sriharsha Chintalapani
Hi,
Here is the KIP for adding a partitioner interface for producer.
https://cwiki.apache.org/confluence/display/KAFKA/KIP-+22+-+Expose+a+Partitioner+interface+in+the+new+producer
There is one open question about how interface should look like. Please take a 
look and let me know if you prefer one way or the other.

Thanks,
Harsha



Re: [DISCUSS] KIP-12 - Kafka Sasl/Kerberos implementation

2015-04-22 Thread Sriharsha Chintalapani
1. *Support for running potentially long-running delegated tasks outside 
the network thread*: It is recommended that delegated tasks indicated by 
a handshake status of NEED_TASK are run on a separate thread since they may 
block ( 
http://docs.oracle.com/javase/7/docs/api/javax/net/ssl/SSLEngine.html). 
It is easier to encapsulate this in SSLChannel without any changes to 
common code if selection keys are managed within the Channel. 


 This makes sense I can change code to not do it on the network thread.

Right now we are doing the handshake as part of the processor ( it shouldn’t be 
in acceptor) and we have multiple processors thread. Do we still see this as an 
issue if it happens on the same thread as processor? . 





-- 
Harsha
Sent with Airmail

On April 22, 2015 at 7:18:17 AM, Sriharsha Chintalapani (harsh...@fastmail.fm) 
wrote:

Hi Rajini,
       Thanks for the details. I did go through your code . There was a 
discussion before about not having selector related code into the channel or 
extending the selector it self. 

1. *Support for running potentially long-running delegated tasks outside 
the network thread*: It is recommended that delegated tasks indicated by 
a handshake status of NEED_TASK are run on a separate thread since they may 
block ( 
http://docs.oracle.com/javase/7/docs/api/javax/net/ssl/SSLEngine.html). 
It is easier to encapsulate this in SSLChannel without any changes to 
common code if selection keys are managed within the Channel. 


 This makes sense I can change code to not do it on the network thread.



2. *Renegotiation handshake*: During a read operation, handshake status 
may indicate that renegotiation is required. It will be good to encapsulate 
this state change (and any knowledge of these SSL-specific state 
transitions) within SSLChannel. Our experience was that managing keys and 
state within the SSLChannel rather than in Selector made this code neater. 
Do we even want to support renegotiation. This is a case where user/client 
handshakes with server anonymously

but later want to change and present their identity and establish a new SSL 
session. In our producer or consumers either present their identity ( two -way 
auth) or not.  Since these are long running processes I don’t see that there 
might be a case where they initially establish the session and later present 
their identity.  



*Graceful shutdown of the SSL connection*s: Our experience was that 
we could encapsulate all of the logic for shutting down SSLEngine 
gracefully within SSLChannel when the selection key and state are owned and 
managed by SSLChannel. 

Can’t this be done when channel.close() is called any reason to own the 
selection key.

4. *And finally a minor point:* We found that by managing selection key 
and selection interests within SSLChannel, protocol-independent Selector 
didn't need the concept of handshake at all and all channel state 
management and handshake related code could be held in protocol-specific 
classes. This may be worth taking into consideration since it makes it 
easier for common network layer code to be maintained without any 
understanding of the details of individual security protocols. 
The only thing network code( SocketServer) is aware of channel 
isHandshakeComplete if its not do the handshake

or go about read/write from channel. Yes socketServer need to be aware of 
channel is ready to read or not. But on the other hand

there isn’t too many details of handshake leaked into socketServer.  Either we 
let server know that a channel needs handshake or we keep the selectionKey  
state into channel which means we are adding selector related code into 
channel. 



Thanks,
Harsha


On April 22, 2015 at 3:56:04 AM, Rajini Sivaram (rajinisiva...@googlemail.com) 
wrote:

When we were working on the client-side SSL implementation for Kafka, we
found that returning selection interest from handshake() method wasn't
sufficient to handle some of the SSL sequences. We resorted to managing the
selection key and interest state within SSLChannel to avoid SSL-specific
knowledge escaping out of SSL classes into protocol-independent network
code. The current server-side SSL patch doesn't address these scenarios
yet, but we may want to take these into account while designing the common
Channel class/interface.

1. *Support for running potentially long-running delegated tasks outside
the network thread*: It is recommended that delegated tasks indicated by
a handshake status of NEED_TASK are run on a separate thread since they may
block (
http://docs.oracle.com/javase/7/docs/api/javax/net/ssl/SSLEngine.html).
It is easier to encapsulate this in SSLChannel without any changes to
common code if selection keys are managed within the Channel.
2. *Renegotiation handshake*: During a read operation, handshake status
may indicate that renegotiation is required. It will be good to encapsulate
this state change (and any knowledge of these SSL-specific state
transitions) within

Re: [DISCUSS] KIP-12 - Kafka Sasl/Kerberos implementation

2015-04-22 Thread Sriharsha Chintalapani
  
understanding of the details of individual security protocols.  

The channel classes we used are included in the patch in  
https://issues.apache.org/jira/browse/KAFKA-1690. The patch contains unit  
tests to validate these scenarios as well as other buffer overflow  
conditions which may be useful for server-side code when the scenarios  
described above are implemented.  
Regards,  

Rajini  



On Tue, Apr 21, 2015 at 11:13 PM, Sriharsha Chintalapani   
harsh...@fastmail.fm wrote:  

 Hi Jay,  
 Thanks for the review.  
  
 1. Isn't the blocking handshake going to be a performance concern? Can  
 we  
 do the handshake non-blocking instead? If anything that causes connections  
 to drop can incur blocking network roundtrips won't that eat up all the  
 network threads immediately? I guess I would have to look at that code to  
 know...  
 I’ve non-blocking handshake on the server side as well as for new  
 producer client. Blocking handshake is only done for BlockingChannel.scala  
 and it just loops over the non-blocking hand shake until the context is  
 established. So on the server side (SocketServer.scala) as it goes through  
 the steps and returns “READ or WRITE” signal for next step. For  
 BlockingChannel the worst case I look at is the connection timeout but most  
 times this handshake will finish up much quicker . I am cleaning up the  
 code will send up a patch in next few days .  
  
 2. Do we need to support blocking channel at all? That is just for the old  
 clients, and I think we should probably just leave those be to reduce  
 scope  
 here.  
 So blocking channel used not only by simple consumer but also  
 ControllerChannelManager and controlled shutdown also. Are we planning on  
 deprecating it. I think at least for ControllerChannelManager it makes  
 sense to have a blocking channel. If the users want to lock down the  
 cluster i.e no PLAINTEXT channels are allowed than all the communication  
 has to go through either SSL and KERBEROS so in this case we need add this  
 capability to BlockingChannel.  
  
  
  
 3. Can we change the APIs to drop the getters when that is not required by  
 the API being implemented. In general we don't use setters and getters as  
 a  
 naming convention.  
  
 My bad on adding getters and setters :). I’ll work on removing it and  
 change the KIP accordingly. I still need some accessor methods though .  
  
 Thanks,  
  
 Harsha  
  
  
  
 On April 21, 2015 at 2:51:15 PM, Jay Kreps (jay.kr...@gmail.com) wrote:  
  
 Hey Sriharsha,  
  
 Thanks for the excellent write-up.  
  
 Couple of minor questions:  
  
 1. Isn't the blocking handshake going to be a performance concern? Can we  
 do the handshake non-blocking instead? If anything that causes connections  
 to drop can incur blocking network roundtrips won't that eat up all the  
 network threads immediately? I guess I would have to look at that code to  
 know...  
  
 2. Do we need to support blocking channel at all? That is just for the old  
 clients, and I think we should probably just leave those be to reduce scope  
 here.  
  
 3. Can we change the APIs to drop the getters when that is not required by  
 the API being implemented. In general we don't use setters and getters as a  
 naming convention.  
  
 The long explanation on that is that setters/getters kind of imply a style  
 of java programming where you have simple structs with getters and setters  
 for each field. In general we try to have access methods only when  
 necessary, and rather than setters model the full change or action being  
 carried out, and if possible disallow change entirely. This is more in line  
 with modern java style I think. We aren't perfect in following this, but  
 once you start with getters and setters people start just adding them  
 everywhere and then using them.  
  
 -Jay  
  
  
 On Mon, Apr 20, 2015 at 10:42 AM, Sriharsha Chintalapani ka...@harsha.io  
 wrote:  
  
  Hi,  
  I updated the KIP-12 with more details. Please take a look  
   
 https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=51809888  
   
  Thanks,  
  Harsha  
   
   
  On February 11, 2015 at 10:02:43 AM, Harsha (ka...@harsha.io) wrote:  
   
  Thanks Joe. It will be part of KafkaServer and will run on its own  
  thread. Since each kafka server will run with a keytab we should make  
  sure they are all getting renewed.  
   
  On Wed, Feb 11, 2015, at 10:00 AM, Joe Stein wrote:  
   Thanks Harsha, looks good so far. How were you thinking of running  
   the KerberosTicketManager as a standalone process or like controller or  
   is  
   it a layer of code that does the plumbing pieces everywhere?  

   ~ Joestein  

   On Wed, Feb 11, 2015 at 12:18 PM, Harsha ka...@harsha.io wrote:  

Hi,  
Here is the initial proposal for sasl/kerberos implementation for  
kafka https://cwiki.apache.org/confluence/x/YI4WAw  
and JIRA https://issues.apache.org/jira/browse/KAFKA-1686. I am

Re: [DISCUSS] KIP-12 - Kafka Sasl/Kerberos implementation

2015-04-21 Thread Sriharsha Chintalapani
Hi Jay,
      Thanks for the review. 

   1. Isn't the blocking handshake going to be a performance concern? Can we 
do the handshake non-blocking instead? If anything that causes connections 
to drop can incur blocking network roundtrips won't that eat up all the 
network threads immediately? I guess I would have to look at that code to 
know... 
        I’ve non-blocking handshake on the server side as well as for new 
producer client.  Blocking handshake is only done for BlockingChannel.scala and 
it just loops over the non-blocking hand shake until the context is 
established. So on the server side (SocketServer.scala) as it goes through the 
steps and returns “READ or WRITE” signal for next step.  For BlockingChannel 
the worst case I look at is the connection timeout but most times this 
handshake will finish up much quicker . I am cleaning up the code will send up 
a patch in next few days .

2. Do we need to support blocking channel at all? That is just for the old 
clients, and I think we should probably just leave those be to reduce scope 
here. 
So blocking channel used not only by simple consumer but also 
ControllerChannelManager and controlled shutdown also. Are we planning on 
deprecating it. I think at least for ControllerChannelManager it makes sense  
to have a blocking channel. If the users want to lock down the cluster i.e no 
PLAINTEXT channels are allowed than all the communication has to go through 
either SSL and KERBEROS so in this case we need add this capability to 
BlockingChannel.



3. Can we change the APIs to drop the getters when that is not required by 
the API being implemented. In general we don't use setters and getters as a 
naming convention. 

My bad on adding getters and setters :). I’ll work on removing it and change 
the KIP accordingly. I still need some accessor methods though .

Thanks,

Harsha



On April 21, 2015 at 2:51:15 PM, Jay Kreps (jay.kr...@gmail.com) wrote:

Hey Sriharsha,  

Thanks for the excellent write-up.  

Couple of minor questions:  

1. Isn't the blocking handshake going to be a performance concern? Can we  
do the handshake non-blocking instead? If anything that causes connections  
to drop can incur blocking network roundtrips won't that eat up all the  
network threads immediately? I guess I would have to look at that code to  
know...  

2. Do we need to support blocking channel at all? That is just for the old  
clients, and I think we should probably just leave those be to reduce scope  
here.  

3. Can we change the APIs to drop the getters when that is not required by  
the API being implemented. In general we don't use setters and getters as a  
naming convention.  

The long explanation on that is that setters/getters kind of imply a style  
of java programming where you have simple structs with getters and setters  
for each field. In general we try to have access methods only when  
necessary, and rather than setters model the full change or action being  
carried out, and if possible disallow change entirely. This is more in line  
with modern java style I think. We aren't perfect in following this, but  
once you start with getters and setters people start just adding them  
everywhere and then using them.  

-Jay  


On Mon, Apr 20, 2015 at 10:42 AM, Sriharsha Chintalapani ka...@harsha.io  
wrote:  

 Hi,  
 I updated the KIP-12 with more details. Please take a look  
 https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=51809888  
  
 Thanks,  
 Harsha  
  
  
 On February 11, 2015 at 10:02:43 AM, Harsha (ka...@harsha.io) wrote:  
  
 Thanks Joe. It will be part of KafkaServer and will run on its own  
 thread. Since each kafka server will run with a keytab we should make  
 sure they are all getting renewed.  
  
 On Wed, Feb 11, 2015, at 10:00 AM, Joe Stein wrote:  
  Thanks Harsha, looks good so far. How were you thinking of running  
  the KerberosTicketManager as a standalone process or like controller or  
  is  
  it a layer of code that does the plumbing pieces everywhere?  
   
  ~ Joestein  
   
  On Wed, Feb 11, 2015 at 12:18 PM, Harsha ka...@harsha.io wrote:  
   
   Hi,  
   Here is the initial proposal for sasl/kerberos implementation for  
   kafka https://cwiki.apache.org/confluence/x/YI4WAw  
   and JIRA https://issues.apache.org/jira/browse/KAFKA-1686. I am  
   currently working on prototype which will add more details to the KIP.  
   Just opening the thread to say the work is in progress. I'll update the  
   thread with a initial prototype patch.  
   Thanks,  
   Harsha  

  


Re: Topic existance Java API

2015-04-20 Thread Sriharsha Chintalapani
Mingtao,
          I think you are looking at scala version 2.8.2 . Check your kafka 
version by kafka_2.8.2-${version} . In 0.8.2  we’ve 
AdminUtils.topicExists(zkClient, topic) .

Thanks,
Harsha


On April 20, 2015 at 2:20:27 PM, Mingtao Zhang (mail2ming...@gmail.com) wrote:

Hi,  

I am on Kafka 2.8.2. Looking for a way to check whether topic existed or  
not in Java.  

I saw there is a method called listTopics - TopicCommand.listTopics, but  
the return type is void.  

Is there an API to check topic existance? (preferable argument: (zkconnect,  
topic))  

Thanks in advance!  

Best Regards,  
Mingtao  


Re: [DISCUSS] KIP-12 - Kafka Sasl/Kerberos implementation

2015-04-20 Thread Sriharsha Chintalapani
Hi Jun,
           I am using the underlying protocol GSS-API that sasl also uses. I 
can add details about LDAP/AD . For AD , this is in general the integration of 
AD to KERBEROS. I.e   kerberos can talk to AD to get the kinit login 
credentials ( more of a setup details between kerberos and AD) . For LDAP 
GSS-API allows you to do DIGEST  auth as well. I’ll add the details regarding 
both of these.
       For SSL support I’ll add the details to the same KIP as they both extend 
the same Channel and share some of the implementation.

Thanks,
Harsha


On April 20, 2015 at 12:31:12 PM, Jun Rao (j...@confluent.io) wrote:

Hi, Harsha,  

For SASL, a common use case is the integration with LDAP/AD. For  
completeness, could you describe (or provide a link) how such integration  
can be done?  

Also, what about the SSL support, do you plan to describe it in same same  
KIP or a separate one?  

Thanks,  

Jun  

On Mon, Apr 20, 2015 at 12:42 PM, Sriharsha Chintalapani ka...@harsha.io  
wrote:  

 Hi,  
 I updated the KIP-12 with more details. Please take a look  
 https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=51809888  
  
 Thanks,  
 Harsha  
  
  
 On February 11, 2015 at 10:02:43 AM, Harsha (ka...@harsha.io) wrote:  
  
 Thanks Joe. It will be part of KafkaServer and will run on its own  
 thread. Since each kafka server will run with a keytab we should make  
 sure they are all getting renewed.  
  
 On Wed, Feb 11, 2015, at 10:00 AM, Joe Stein wrote:  
  Thanks Harsha, looks good so far. How were you thinking of running  
  the KerberosTicketManager as a standalone process or like controller or  
  is  
  it a layer of code that does the plumbing pieces everywhere?  
   
  ~ Joestein  
   
  On Wed, Feb 11, 2015 at 12:18 PM, Harsha ka...@harsha.io wrote:  
   
   Hi,  
   Here is the initial proposal for sasl/kerberos implementation for  
   kafka https://cwiki.apache.org/confluence/x/YI4WAw  
   and JIRA https://issues.apache.org/jira/browse/KAFKA-1686. I am  
   currently working on prototype which will add more details to the KIP.  
   Just opening the thread to say the work is in progress. I'll update the  
   thread with a initial prototype patch.  
   Thanks,  
   Harsha  

  


Re: [DISCUSS] KIP-12 - Kafka Sasl/Kerberos implementation

2015-04-20 Thread Sriharsha Chintalapani
Hi,
     I updated the KIP-12 with more details. Please take a look  
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=51809888

Thanks,
Harsha


On February 11, 2015 at 10:02:43 AM, Harsha (ka...@harsha.io) wrote:

Thanks Joe. It will be part of KafkaServer and will run on its own  
thread. Since each kafka server will run with a keytab we should make  
sure they are all getting renewed.  

On Wed, Feb 11, 2015, at 10:00 AM, Joe Stein wrote:  
 Thanks Harsha, looks good so far. How were you thinking of running  
 the KerberosTicketManager as a standalone process or like controller or  
 is  
 it a layer of code that does the plumbing pieces everywhere?  
  
 ~ Joestein  
  
 On Wed, Feb 11, 2015 at 12:18 PM, Harsha ka...@harsha.io wrote:  
  
  Hi,  
  Here is the initial proposal for sasl/kerberos implementation for  
  kafka https://cwiki.apache.org/confluence/x/YI4WAw  
  and JIRA https://issues.apache.org/jira/browse/KAFKA-1686. I am  
  currently working on prototype which will add more details to the KIP.  
  Just opening the thread to say the work is in progress. I'll update the  
  thread with a initial prototype patch.  
  Thanks,  
  Harsha