Re: java api code and javadoc

2014-10-16 Thread Ewen Cheslack-Postava
KafkaStream and MessageAndOffset are Scala classes, so you'll find them
under the scaladocs. The ConsumerConnector interface should show up in
the javadocs with good documentation coverage. Some classes like
MessageAndOffset are so simple (just compositions of other data) that
they aren't going to have any docs associated with them.

On Thu, Oct 16, 2014, at 08:03 AM, 4mayank wrote:
 Thanks Joseph. I built the javadoc but its incomplete.
 Where can I find the code itself for classes like KafkaStream,
 MessageAndOffset, CosumerConnector etc?
 
 On Wed, Oct 15, 2014 at 11:10 AM, Joseph Lawson jlaw...@roomkey.com
 wrote:
 
  You probably have to build your own right now.  Check out
  https://github.com/apache/kafka#building-javadocs-and-scaladocs
  
  From: 4mayank 4may...@gmail.com
  Sent: Wednesday, October 15, 2014 11:38 AM
  To: users@kafka.apache.org
  Subject: java api code and javadoc
 
  Hi
 
  I downloaded kafka 0.8.1.1 src and went through some documentation and
  wikis, but could not find any documentation (javadoc or other) on the java
  API - info on classes like SimpleConsumer, MessageAndOffset etc. Nor could
  I locate the source code (.java). I see only scala files.
 
  Can anyone provide info on where I can find doc to get list of attributes,
  methods, signatures etc?
 
  Thanks.
  -Mayank.
 


Re: Too much log for kafka.common.KafkaException

2014-10-18 Thread Ewen Cheslack-Postava
This looks very similar to the error and stacktrace I see when
reproducing https://issues.apache.org/jira/browse/KAFKA-1196 -- that's
an overflow where the data returned in a FetchResponse exceeds 2GB. (It
triggers the error you're seeing because FetchResponse's size overflows
to become negative, which breaks tests for whether data has finished
sending.) I haven't tested against 0.8.1.1, but it looks identical
modulo line #'s. If it's the same issue, unfortunately it won't fix
itself, so that log will just keep growing with more error messages as
the consumer keeps reconnecting, requesting data, then triggering the
error in the broker which forcibly disconnects the consumer.

I'm not certain what to suggest here since KAFKA-1196 still needs a lot
of refinement. But given the 0.8.1.1 code I don't think there's much
choice but to try to reduce the amount of data that will be returned.
One way to do that is is to reduce the # of partitions read in the
FetchRequest (i.e. make sure FetchRequests address fewer
TopicAndPartitions, maybe putting each TopicAndPartition in its own
request). An alternative would be to use more recent offsets (i.e. don't
start from the oldest data available in Kafka). A recent enough offset
should result in a  2GB response.
 
-Ewen

On Sat, Oct 18, 2014, at 12:07 AM, xingcan wrote:
 Hi, all
 
 Recently, I upgrade my Kafka cluster  to 0.8.1.1 and set replication with
 num.replica.fetchers=5. Last night, there's something wrong with the
 network. Soon, I found the server.log files (not data log!) on every node
 reached 4GB in an hour.
 I am not sure if it's my inappropriate configuration or other reason. Can
 anybody help me with this. Thanks~
 
 log file tail
 
 [2014-10-16 20:59:59,994] ERROR Closing socket for /192.168.1.66 because
 of
 error (kafka.network.Processor)
 kafka.common.KafkaException: This operation cannot be completed on a
 complete request.
 at
 kafka.network.Transmission$class.expectIncomplete(Transmission.scala:34)
 at
 kafka.api.FetchResponseSend.expectIncomplete(FetchResponse.scala:191)
 at kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:214)
 at kafka.network.Processor.write(SocketServer.scala:375)
 at kafka.network.Processor.run(SocketServer.scala:247)
 at java.lang.Thread.run(Thread.java:745)
 [2014-10-16 20:59:59,994] ERROR Closing socket for /192.168.1.66 because
 of
 error (kafka.network.Processor)
 kafka.common.KafkaException: This operation cannot be completed on a
 complete request.
 at
 kafka.network.Transmission$class.expectIncomplete(Transmission.scala:34)
 at
 kafka.api.FetchResponseSend.expectIncomplete(FetchResponse.scala:191)
 at kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:214)
 at kafka.network.Processor.write(SocketServer.scala:375)
 at kafka.network.Processor.run(SocketServer.scala:247)
 at java.lang.Thread.run(Thread.java:745)
 [2014-10-16 20:59:59,994] ERROR Closing socket for /192.168.1.65 because
 of
 error (kafka.network.Processor)
 kafka.common.KafkaException: This operation cannot be completed on a
 complete request.
 at
 kafka.network.Transmission$class.expectIncomplete(Transmission.scala:34)
 at
 kafka.api.FetchResponseSend.expectIncomplete(FetchResponse.scala:191)
 at kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:214)
 at kafka.network.Processor.write(SocketServer.scala:375)
 at kafka.network.Processor.run(SocketServer.scala:247)
 at java.lang.Thread.run(Thread.java:745)
 
 
 
 -- 
 *Xingcan*


Re: Does adding ConsumerTimeoutException make the code more robust?

2014-12-01 Thread Ewen Cheslack-Postava
No, hasNext will return immediately if data is available. The consumer
timeout is only helpful if your application can't safely block on the
iterator indefinitely.

-Ewen

On Sat, Nov 29, 2014, at 08:35 PM, Rahul Amaram wrote:
 Yes, I have configured consumer timeout config.
 
 Let me put my query other way. Is it possible that it.hasNext() could be 
 blocked even when there are messages available, In which case using 
 Consumer Timeout could help?
 
 Thanks,
 Rahul.
 
 On Saturday 29 November 2014 09:56 PM, Jun Rao wrote:
  By default, it.hasNext() blocks when there is no more message to consume.
  So catching ConsumerTimeoutException doesn't make any difference. You only
  need to handle ConsumerTimeoutException if you have customized the consumer
  timeout config.
 
  Thanks,
 
  Jun
 
  On Thu, Nov 27, 2014 at 7:48 AM, Rahul Amaram rahul.ama...@vizury.com
  wrote:
 
  Hi,
 
  I am just wondering if the below snippet
 
  ConsumerIteratorbyte[], byte[]) it = ...
 
  while (True)
   try {
   while (it.hasNext()) {
   ...
   ...
   ...
   } catch (ConsumerTimeoutException e) {
   // do nothing
   }
  }
 
  would be more robust than
 
  while(it.hasNext()) {
  ...
  ...
  ...
  }
 
  i.e. by setting a consumer timeout, catching it and again just waiting for
  the next message make it more robust?
 
  Regards,
  Rahul.
 
 


Re: Very slow producer

2014-12-11 Thread Ewen Cheslack-Postava
Did you set producer.type to async when creating your producer? The console
producer uses async by default, but the default producer config is sync.

-Ewen

On Thu, Dec 11, 2014 at 6:08 AM, Huy Le Van huy.le...@insight-centre.org
wrote:

 Hi,


 I’m writing my own producer to read from text files, and send line by line
 to Kafka cluster. I notice that the producer is extremely slow. It's
 currently sending at ~57KB/node/s. This is like 50-100 times slower than
 using bin/kafka-console-producer.sh


 Here’s my producer:
 final File dir = new File(dataDir);
 ListFile files = new ArrayList(Arrays.asList(dir.listFiles()));
 int key = 0;
 for (final File file : files) {
 try {
 BufferedReader br = new BufferedReader(new FileReader(file));
 for (String line = br.readLine(); line != null; line =
 br.readLine()) {
 KeyedMessageString, String data = new KeyedMessage(topic,
 Integer.toString(key++), line);
 producer.send(data);
 }
 } catch (IOException e) {
 e.printStackTrace();
 }
 }



 And partitioner:
 public int partition(Object key, int numPartitions) {
 String stringKey = (String)key;
 return Integer.parseInt(stringKey) % numPartitions;
 }


 The only difference between kafka-console-producer.sh code and my code is
 that I use a custom partitioner. I have no idea why it’s so slow.

 Best regards,Huy, Le Van




-- 
Thanks,
Ewen


Re: Kafka System test

2015-01-23 Thread Ewen Cheslack-Postava
1. Except for that hostname setting being a list instead of a single host,
the changes look reasonable. That is where you want to customize settings
for your setup.

2  3. Yes, you'll want to update those files as well. They top-level ones
provide defaults, the ones in specific test directories provide overrides
for that specific test. But they aren't combined in any way, i.e. the more
specific one is just taken as a whole rather than being like a diff, so you
do have to update both.

You might want to take a look at
https://issues.apache.org/jira/browse/KAFKA-1748. Currently if you want to
run all tests it's a pain to change the hosts they're running on since it
requires manually editing all those files. The patch gets rid of
cluster_config.json and provides a couple of different ways of configuring
the cluster -- run everything on localhost, get cluster info from a single
json file, or get the ssh info from Vagrant.



On Fri, Jan 23, 2015 at 11:50 AM, Sa Li sal...@gmail.com wrote:

 Hi, All

 From my last ticket (Subject: kafka production server test), Guozhang
 kindly point me the system test package come with kafka source build which
 is really cool package. I took a look at this package, things are clear is
 I run it on localhost, I don't need to change anything, say,
 cluster_config.json defines entities, and system test reads
 testcase__properties.json to override the properties in
 cluster_config.json. For example, cluster_config.json defaults hostname as
 localhost, and three brokers, I assume it will create 3 brokers in
 localhost and continue the test.

 Currently I install the package on a vagrant VM, and like to run the system
 test on VM and remotely access production to test production cluster. The
 production cluster has 3 nodes. kafka production cluster is on top of a
 5-node zookeeper ensemble.  My questions is how to effectively change the
 properties on vagrant system test package.

 1. change on cluster_config.json, like
 {
 entity_id: 0,
 hostname:
 10.100.70.28,10.100.70.29,10.100.70.30,10.100.70.31,10.100.70.32,
 role: zookeeper,
 cluster_name: target,
 kafka_home: /etc/kafka,
 java_home: /usr/lib/jvm/java-7-openjdk-amd64/jre,
 jmx_port: 9990
 },
 {
 entity_id: 1,
 hostname: 10.100.70.28,
 role: broker,
 cluster_name: target,
 kafka_home: /etc/kafka,
 java_home: /usr/lib/jvm/java-7-openjdk-amd64/jre,
 jmx_port: 9991
 },

  Here because I want to test remote servers, so I need to change the
 cluster_name as target, right?

 2.  In directory ./replication_testsuite/config/ , for all the properties
 files, do I need to change them all to be the same as the properties on
 production servers?

 3. in ./replication_testsuite/testcase_/, seems I need to make
 corresponding changes as well to keep consistent with
 ./config/properties, such as
 log.dir: /tmp/kafka_server_1_logs will be change to the log.dir in my
 production server.properties, is that right?


 Hope someone who has done the system test on remote server can share some
 experience, thanks



 AL

 --

 Alec Li




-- 
Thanks,
Ewen


Re: Kafka System test

2015-01-23 Thread Ewen Cheslack-Postava
On Fri, Jan 23, 2015 at 2:42 PM, Sa Li sal...@gmail.com wrote:

 Also I found ./kafka/system_test/cluster_config.json is duplicated on each
 directory ./kafka/system_test/replication_testsuite/testcase_/


Not duplicated, but customized:

$ diff -u system_test/cluster_config.json
system_test/replication_testsuite//testcase_0021/cluster_config.json
--- system_test/cluster_config.json2014-12-17 17:43:01.0 -0800
+++ system_test/replication_testsuite//testcase_0021/cluster_config.json
 2014-12-17 17:43:01.0 -0800
@@ -48,11 +48,29 @@
 {
 entity_id: 5,
 hostname: localhost,
-role: console_consumer,
+role: producer_performance,
 cluster_name: source,
 kafka_home: default,
 java_home: default,
 jmx_port: 9998
+},
+{
+entity_id: 6,
+hostname: localhost,
+role: console_consumer,
+cluster_name: source,
+kafka_home: default,
+java_home: default,
+jmx_port: 
+},
+{
+entity_id: 7,
+hostname: localhost,
+role: console_consumer,
+cluster_name: source,
+kafka_home: default,
+java_home: default,
+jmx_port: 9099
 }
 ]
 }


 When I change the ./kafka/system_test/cluster_config.json, do I need to
 overwrite it each
 ./kafka/system_test/replication_testsuite/testcase_/cluster_config.json
 ?


You'll need to edit all the cluster_config.json files for directories of
the tests you want to run. Note that this applies to entire test suites
too, e.g. there is
system_test/offset_management_testsuite/cluster_config.json which overrides
the the default cluster_config.json for all tests in
offset_management_testsuite.


 Thanks

 AL



 On Fri, Jan 23, 2015 at 1:39 PM, Sa Li sal...@gmail.com wrote:

  Thanks for reply. Ewen, pertaining to your statement ... hostname
 setting
  being a list instead of a single host, are you saying entity_id 1 or 0,
 
  entity_id: 0,
  hostname:
  10.100.70.28,10.100.70.29,10.100.70.30,10.100.70.31,10.100.70.32,
 
  entity_id: 1,
  hostname: 10.100.70.28,


Entity here means a single service on a single machine, i.e 1 zookeeper
instance. That's why the default cluster_config.json has 3 separate broker
entries.

-Ewen


 
  I thought the role zookeeper has multiple hosts, so I list all the IPs of
  ensemble. While entity 1 is only about 1 broker (my design about
 production
  cluster to fire up one broker for each host, so 3 nodes with 3 brokers),
 so
  I specify one hostname IP only here. How do I change?
 
 
  thanks
 
  AL
 
  On Fri, Jan 23, 2015 at 1:22 PM, Ewen Cheslack-Postava 
 e...@confluent.io
  wrote:
 
  1. Except for that hostname setting being a list instead of a single
 host,
  the changes look reasonable. That is where you want to customize
 settings
  for your setup.
 
  2  3. Yes, you'll want to update those files as well. They top-level
 ones
  provide defaults, the ones in specific test directories provide
 overrides
  for that specific test. But they aren't combined in any way, i.e. the
 more
  specific one is just taken as a whole rather than being like a diff, so
  you
  do have to update both.
 
  You might want to take a look at
  https://issues.apache.org/jira/browse/KAFKA-1748. Currently if you want
  to
  run all tests it's a pain to change the hosts they're running on since
 it
  requires manually editing all those files. The patch gets rid of
  cluster_config.json and provides a couple of different ways of
 configuring
  the cluster -- run everything on localhost, get cluster info from a
 single
  json file, or get the ssh info from Vagrant.
 
 
 
  On Fri, Jan 23, 2015 at 11:50 AM, Sa Li sal...@gmail.com wrote:
 
   Hi, All
  
   From my last ticket (Subject: kafka production server test), Guozhang
   kindly point me the system test package come with kafka source build
  which
   is really cool package. I took a look at this package, things are
 clear
  is
   I run it on localhost, I don't need to change anything, say,
   cluster_config.json defines entities, and system test reads
   testcase__properties.json to override the properties in
   cluster_config.json. For example, cluster_config.json defaults
 hostname
  as
   localhost, and three brokers, I assume it will create 3 brokers in
   localhost and continue the test.
  
   Currently I install the package on a vagrant VM, and like to run the
  system
   test on VM and remotely access production to test production cluster.
  The
   production cluster has 3 nodes. kafka production cluster is on top of
 a
   5-node zookeeper ensemble.  My questions is how to effectively change
  the
   properties on vagrant system test package.
  
   1. change on cluster_config.json, like
   {
   entity_id: 0,
   hostname

Re: How to run Kafka Producer in Java environment? How to set mainClass in pom file in EC2 instance?

2015-01-20 Thread Ewen Cheslack-Postava
You should only need jar.with.dependencies.jar -- maven-assembly-plugin's
jar-with-dependencies mode collects all your code and project dependencies
into one jar file. It looks like the problem is that your mainclass is set
to only 'HelloKafkaProducer'. You need to specify the full name
'com.spnotes.kafka.HelloKafkaProducer'. Then running

java -jar jar.with.dependencies.jar

should work. Alternatively, you can put the jar on the class path and then
specify the exact class you want to run

java -cp jar.with.dependencies.jar com.spnotes.kafka.HelloKafkaProducer


On Tue, Jan 20, 2015 at 11:43 AM, Su She suhsheka...@gmail.com wrote:

 Hello Everyone,

 Sorry for the duplicate post, I hadn't subscribed to the list yet, so I
 couldn't reply to suggestions. That has been fixed now.

 1) Using Maven, I wrote a Kafka Producer similar to the one found here:

 https://github.com/pppsunil/HelloKafka/blob/master/src/main/java/com/spnotes/kafka/HelloKafkaProducer.java

 2) This is my pom file:

 ?xml version=1.0 encoding=UTF-8? project xmlns=
 http://maven.apache.org/POM/4.0.0; xmlns:xsi=
 http://www.w3.org/2001/XMLSchema-instance; xsi:schemaLocation=
 http://maven.apache.org/POM/4.0.0
 http://maven.apache.org/xsd/maven-4.0.0.xsd;
 modelVersion4.0.0/modelVersion groupIdHelloKafka/groupId
 artifactIdHelloKafka/artifactId version1.0-SNAPSHOT/version build
 plugins plugin groupIdorg.apache.maven.plugins/groupId
 artifactIdmaven-jar-plugin/artifactId version2.2/version !--
 nothing here -- /plugin plugin
 groupIdorg.apache.maven.plugins/groupId
 artifactIdmaven-assembly-plugin/artifactId
 version2.2-beta-4/version configuration descriptorRefs
 descriptorRefjar-with-dependencies/descriptorRef /descriptorRefs
 archive manifest mainClassHelloKafkaProducer/mainClass /manifest
 /archive /configuration executions execution phasepackage/phase
 goals goalsingle/goal /goals /execution /executions /plugin
 /plugins /build dependencies dependency
 groupIdorg.apache.kafka/groupId artifactIdkafka_2.10/artifactId
 version0.8.2-beta/version scopecompile/scope exclusions
 exclusion artifactIdjmxri/artifactId groupIdcom.sun.jmx/groupId
 /exclusion exclusion artifactIdjms/artifactId
 groupIdjavax.jms/groupId /exclusion exclusion
 artifactIdjmxtools/artifactId groupIdcom.sun.jdmk/groupId
 /exclusion /exclusions /dependency dependency
 groupIdorg.slf4j/groupId artifactIdslf4j-log4j12/artifactId
 version1.5.6/version /dependency dependency
 groupIdorg.scala-lang/groupId artifactIdscala-library/artifactId
 version2.10.4/version /dependency dependency
 groupIdorg.scala-lang/groupId artifactIdscala-compiler/artifactId
 version2.10.4/version /dependency dependency
 groupIdorg.apache.kafka/groupId artifactIdkafka_2.10/artifactId
 version0.8.2-beta/version /dependency dependency
 groupIdcom.101tec/groupId artifactIdzkclient/artifactId
 version0.3/version /dependency dependency
 groupIdcom.yammer.metrics/groupId artifactIdmetrics-core/artifactId
 version2.2.0/version /dependency dependency
 groupIdorg.apache.kafka/groupId artifactIdkafka-clients/artifactId
 version0.8.2-beta/version /dependency dependency
 groupIdorg.apache.maven.plugins/groupId
 artifactIdmaven-assembly-plugin/artifactId version2.4/version
 /dependency /dependencies /project

 3) I am really lost on how to run kafka producer from the command line. I
 have followed the instructions here as seen in my above pom:

 http://stackoverflow.com/questions/1814526/problem-building-executable-jar-with-maven

 4) So I have my HKP.java file, then the pom file I have shown above and
 then I do mvn clean install. It builds the target directory with two jar
 files...what is my next step? I try java -cp jar.with.dependencies.jar -jar
 main.jar, but it says no main manifest attribute. I try java -cp
 jar.with.dependencies HKP, but it says it can't find class HKP

 I have a feeling I am not writing the mainClass properly as most online
 examples say com.example.class, but I'm not sure how that translates to
 working on EC2.

 Thanks a lot for the help!

 -Su




-- 
Thanks,
Ewen


Re: Poor performance running performance test

2015-01-28 Thread Ewen Cheslack-Postava
That error indicates the broker closed the connection for some reason. Any
useful logs from the broker? It looks like you're using ELB, which could
also be the culprit. A connection timeout seems doubtful, but ELB can also
close connections for other reasons, like failed health checks.

-Ewen

On Tue, Jan 27, 2015 at 11:21 PM, Dillian Murphey crackshotm...@gmail.com
wrote:

 I was running the performance command from a virtual box server, so that
 seems like it was part of the problem.  I'm getting better results running
 this on a server on aws, but that's kind of expected.  Can you look at
 these results, and comment on the occasional warning I see?  I appreciate
 it!

 1220375 records sent, 243928.6 records/sec (23.26 MB/sec), 2111.5 ms avg
 latency, 4435.0 max latency.
 1195090 records sent, 239018.0 records/sec (22.79 MB/sec), 2203.1 ms avg
 latency, 4595.0 max latency.
 1257165 records sent, 251433.0 records/sec (23.98 MB/sec), 2172.6 ms avg
 latency, 4525.0 max latency.
 1230981 records sent, 246196.2 records/sec (23.48 MB/sec), 2173.5 ms avg
 latency, 4465.0 max latency.
 [2015-01-28 07:19:07,274] WARN Error in I/O with
 myawsloadbalancer(org.apache.kafka.common.network.Selector)
 java.io.EOFException
 at

 org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:62)
 at org.apache.kafka.common.network.Selector.poll(Selector.java:248)
 at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192)
 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191)
 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122)
 at java.lang.Thread.run(Thread.java:745)
 1090689 records sent, 218137.8 records/sec (20.80 MB/sec), 2413.6 ms avg
 latency, 4829.0 max latency.

 On Tue, Jan 27, 2015 at 7:37 PM, Ewen Cheslack-Postava e...@confluent.io
 wrote:

  Where are you running ProducerPerformance in relation to ZK and the Kafka
  brokers? You should definitely see much higher performance than this.
 
  A couple of other things I can think of that might be going wrong: Are
 all
  your VMs in the same AZ? Are you storing Kafka data in EBS or local
  ephemeral storage? If EBS, have you provisioned enough IOPS.
 
 
  On Tue, Jan 27, 2015 at 4:29 PM, Dillian Murphey 
 crackshotm...@gmail.com
  wrote:
 
   I'm a new user/admin to kafka. I'm running a 3 node ZK and a 6 brokers
 on
   aws.
  
   The performance I'm seeing is shockingly bad. I need some advice!
  
   bin/kafka-run-class.sh
 org.apache.kafka.clients.tools.ProducerPerformance
   test2 5000 100 -1 acks=1 bootstrap.servers=5myloadbalancer:9092
   buffer.memory=67108864 batch.size=8196
  
  
  
  
   6097 records sent, 13198.3 records/sec (1.26 MB/sec), 2098.0 ms avg
   latency, 4306.0 max latency.
   71695 records sent, 14339.0 records/sec (1.37 MB/sec), 6658.1 ms avg
   latency, 9053.0 max latency.
   65195 records sent, 13028.6 records/sec (1.24 MB/sec), 11504.0 ms avg
   latency, 13809.0 max latency.
   71955 records sent, 14391.0 records/sec (1.37 MB/sec), 16137.4 ms avg
   latency, 18541.0 max latency.
  
   Thanks for any help!
  
 
 
 
  --
  Thanks,
  Ewen
 




-- 
Thanks,
Ewen


Re: Poor performance running performance test

2015-01-27 Thread Ewen Cheslack-Postava
Where are you running ProducerPerformance in relation to ZK and the Kafka
brokers? You should definitely see much higher performance than this.

A couple of other things I can think of that might be going wrong: Are all
your VMs in the same AZ? Are you storing Kafka data in EBS or local
ephemeral storage? If EBS, have you provisioned enough IOPS.


On Tue, Jan 27, 2015 at 4:29 PM, Dillian Murphey crackshotm...@gmail.com
wrote:

 I'm a new user/admin to kafka. I'm running a 3 node ZK and a 6 brokers on
 aws.

 The performance I'm seeing is shockingly bad. I need some advice!

 bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance
 test2 5000 100 -1 acks=1 bootstrap.servers=5myloadbalancer:9092
 buffer.memory=67108864 batch.size=8196




 6097 records sent, 13198.3 records/sec (1.26 MB/sec), 2098.0 ms avg
 latency, 4306.0 max latency.
 71695 records sent, 14339.0 records/sec (1.37 MB/sec), 6658.1 ms avg
 latency, 9053.0 max latency.
 65195 records sent, 13028.6 records/sec (1.24 MB/sec), 11504.0 ms avg
 latency, 13809.0 max latency.
 71955 records sent, 14391.0 records/sec (1.37 MB/sec), 16137.4 ms avg
 latency, 18541.0 max latency.

 Thanks for any help!




-- 
Thanks,
Ewen


Re: SimpleConsumer leaks sockets on an UnresolvedAddressException

2015-01-27 Thread Ewen Cheslack-Postava
This was fixed in commit 6ab9b1ecd8 for KAFKA-1235 and it looks like that
will only be included in 0.8.2.

Guozhang, it looks like you wrote the patch, Jun reviewed it, but the bug
is still open and there's a comment that moved it to 0.9 after the commit
was already made. Was the commit a mistake or did we just forget to close
it?

On Tue, Jan 27, 2015 at 10:20 AM, Rajiv Kurian ra...@signalfuse.com wrote:

 Here is the relevant stack trace:

 java.nio.channels.UnresolvedAddressException: null

 at sun.nio.ch.Net.checkAddress(Net.java:127) ~[na:1.7.0_55]

 at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:644)
 ~[na:1.7.0_55]

 at kafka.network.BlockingChannel.connect(Unknown Source)
 ~[kafka_2.10-0.8.0.jar:0.8.0]

 at kafka.consumer.SimpleConsumer.connect(Unknown Source)
 ~[kafka_2.10-0.8.0.jar:0.8.0]

 at kafka.consumer.SimpleConsumer.getOrMakeConnection(Unknown
 Source) ~[kafka_2.10-0.8.0.jar:0.8.0]

 at

 kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(Unknown
 Source) ~[kafka_2.10-0.8.0.jar:0.8.0]

 at

 kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(Unknown
 Source) ~[kafka_2.10-0.8.0.jar:0.8.0]

 at

 kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(Unknown
 Source) ~[kafka_2.10-0.8.0.jar:0.8.0]

 at

 kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(Unknown
 Source) ~[kafka_2.10-0.8.0.jar:0.8.0]

 at kafka.metrics.KafkaTimer.time(Unknown Source)
 ~[kafka_2.10-0.8.0.jar:0.8.0]

 at
 kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(Unknown Source)
 ~[kafka_2.10-0.8.0.jar:0.8.0]

 at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(Unknown
 Source) ~[kafka_2.10-0.8.0.jar:0.8.0]

 at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(Unknown
 Source) ~[kafka_2.10-0.8.0.jar:0.8.0]

 at kafka.metrics.KafkaTimer.time(Unknown Source)
 ~[kafka_2.10-0.8.0.jar:0.8.0]

 at kafka.consumer.SimpleConsumer.fetch(Unknown Source)
 ~[kafka_2.10-0.8.0.jar:0.8.0]

 at kafka.javaapi.consumer.SimpleConsumer.fetch(Unknown Source)
 ~[kafka_2.10-0.8.0.jar:0.8.0]

 On Tue, Jan 27, 2015 at 10:19 AM, Rajiv Kurian ra...@signalfuse.com
 wrote:

  I am using 0.8.1. The source is here:
 
 https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
 
  Here is the definition of disconnect():
  private def disconnect() = {
  if(blockingChannel.isConnected) {
debug(Disconnecting from  + host + : + port)
blockingChannel.disconnect()
  }
}
  It checks if blockingChannel.isConnected before calling
  blockingChannel.disconnect(). I think if there is an
  UnresolvedAddressException, the isConnected is never set and the
  blockingChannel.disconnect() is never called. But by this point we have
  already created a socket and will leak it.
 
  The same problem might be present in the connect method of the
  BlockingChannel at
 
 https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/network/BlockingChannel.scala
 .
  Though its own disconnect method seems to check for both the connected:
 
  def disconnect() = lock synchronized {
  // My comment: connected will not be set if we get an
  UnresolvedAddressException but channel should NOT  be null, so we will
  probably still do the right thing.
  if(connected || channel != null) {
// closing the main socket channel *should* close the read channel
// but let's do it to be sure.
swallow(channel.close())
swallow(channel.socket.close())
if(readChannel != null) swallow(readChannel.close())
channel = null; readChannel = null; writeChannel = null
connected = false
  }
}
 
 
 
  On Tue, Jan 27, 2015 at 9:03 AM, Guozhang Wang wangg...@gmail.com
 wrote:
 
  Rajiv,
 
  Which version of Kafka are you using? I just checked SimpleConsumer's
  code,
  and in its close() function, disconnect() is called, which will close
 the
  socket.
 
  Guozhang
 
 
  On Mon, Jan 26, 2015 at 2:36 PM, Rajiv Kurian ra...@signalfuse.com
  wrote:
 
   Meant to write a run loop.
  
   void run() {
 while (running) {
   if (simpleConsumer == null) {
 simpleConsumer = new SimpleConsumer(host, port,
   (int) kafkaSocketTimeout, kafkaRExeiveBufferSize, clientName);
   }
   try {
 // Do stuff with simpleConsumer.
   } catch (Exception e) {
 logger.error(e);  // Assume UnresolvedAddressException.
 if (consumer != null) {
   simpleConsumer.close();
   simpleConsumer = null;
 }
   }
 }
   }
  
   On Mon, Jan 26, 2015 at 2:27 PM, Rajiv Kurian ra...@signalfuse.com
   wrote:
  
Here is my typical flow:
void run() {
  if (simpleConsumer == null) {
simpleConsumer = new SimpleConsumer(host, port, (int)
   

Re: Get replication and partition count of a topic

2015-01-12 Thread Ewen Cheslack-Postava
I think the closest thing to what you want is
ZkUtils.getPartitionsForTopics, which returns a list of partition IDs for
each topic you specify.

-Ewen

On Mon, Jan 12, 2015 at 12:55 AM, Manikumar Reddy ku...@nmsworks.co.in
wrote:

 Hi,

 kafka-topics.sh script can be used to retrieve topic information.

 Ex: sh kafka-topics.sh --zookeeper localhost:2181 --describe --topic TOPIC1

 You can look into TopicCommand.scala code

 https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=blob;f=core/src/main/scala/kafka/admin/TopicCommand.scala;hb=HEAD

 On Mon, Jan 12, 2015 at 2:01 PM, Ankit Jain ankitm.j...@impetus.co.in
 wrote:

  Hi All,
 
 
  I want to get the replication and partition count of a topic. I tried the
  following piece of code:
 
 
  java.util.SetString topics = new HashSetString();
  topics.add(topicName);
  SetTopicMetadata topicMetadatas =
  AdminUtils.fetchTopicMetadataFromZk(JavaConversions.asScalaSet(topics),
  zkClient);
  IteratorTopicMetadata topicMetadataIterator =
  topicMetadatas.iterator();
 
  while (topicMetadataIterator.hasNext()) {
  topicMetadataIterator.next();
  topicMetadataIterator.next().
  IteratorPartitionMetadata partitionMetadataIterator =
  topicMetadataIterator.next().partitionsMetadata().iterator();
 
  }
 
 
  But, the above code returning me the metadata of each partition and also
  replica details of each partition.
 
 
  Is there any simple API available in kafka to get the partition and
  replica count for a topic.
 
 
  Thanks,
 
  Ankit
 
 
  
 
 
 
 
 
 
  NOTE: This message may contain information that is confidential,
  proprietary, privileged or otherwise protected by law. The message is
  intended solely for the named addressee. If received in error, please
  destroy and notify the sender. Any use of this email is prohibited when
  received in error. Impetus does not represent, warrant and/or guarantee,
  that the integrity of this communication has been maintained nor that the
  communication is free of errors, virus, interception or interference.
 




-- 
Thanks,
Ewen


Re: New 0.8.2 client compatibility with 0.8.1.1 during failure cases

2015-01-06 Thread Ewen Cheslack-Postava
Paul,

That behavior is currently expected, see
https://issues.apache.org/jira/browse/KAFKA-1788. There are currently no
client-side timeouts in the new producer, so the message just sits there
forever waiting for the server to come back so it can try to send it.

If you already have tests for a variety of failure modes it might be
helpful to contribute them to the WIP patch in that JIRA. Currently the
patch only adds one unit test for RecordAccumulator, a few tests using the
full server would be an improvement.

-Ewen

On Tue, Jan 6, 2015 at 7:54 AM, Paul Pearcy ppea...@gmail.com wrote:

 Hello,
   I have some of my own test cases very similar to the ones
 in ProducerFailureHandlingTest.

 I moved to the new producer against 0.8.1.1 and all of my test cases around
 disconnects fail. Moving to use 0.8.2-beta on server side things succeed.

 Here is an example test:
 - Healthy producer/server setup
 - Stop the server
 - Send a message
 - Call get on the future and it never returns. Doesn't matter if the server
 is started again or remains stopped

 So, I believe that while 0.8.2 producer is compatible with 0.8.1.1 in the
 happy case, but fails to return futures in cases where disconnects have
 occurred.

 Is it possible to run the test cases in ProducerFailureHandlingTest.scala
 against 0.8.1.1?

 Thanks,
 Paul




-- 
Thanks,
Ewen


Re: schema.registry.url = null

2015-03-17 Thread Ewen Cheslack-Postava
Clint,

Your code looks fine and the output doesn't actually have any errors, but
you're also not waiting for the messages to be published. Try changing

producer.send(data);

to

producer.send(data).get();

to wait block until the message has been acked. If it runs and exits
cleanly, then you should be able to see it using a consumer, e.g
kafka-avro-console-consumer.

The warning that you're seeing is due to the KafkaProducer's configuration
class not using the schema.registry.url setting; the same settings are
passed on to the serializers which do use it. It incorrectly reports the
value as null due to a bug, I filed
https://issues.apache.org/jira/browse/KAFKA-2026 to address that.

By the way, for Confluent stuff that's not part of the Apache Kafka
repository, you might want to ask questions on this list instead:
https://groups.google.com/forum/#!forum/confluent-platform


On Tue, Mar 17, 2015 at 8:04 AM, Clint Mcneil clintmcn...@gmail.com wrote:

 Hi

 I can't get the Kafka/Avro serializer producer example to work.

 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;

 import java.util.Properties;

 /**
  * Created by clint on 3/17/15.
  */
 public class Confluent {

 public static void  main (String[] args){

 KafkaProducerObject, Object producer;
 Properties propsKafka = new Properties();

 propsKafka.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
 localhost:9092);
 propsKafka.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
 io.confluent.kafka.serializers.KafkaAvroSerializer.class);
 propsKafka.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
 io.confluent.kafka.serializers.KafkaAvroSerializer.class);
 propsKafka.put(schema.registry.url, http://localhost:8081;);
 producer = new KafkaProducerObject, Object(propsKafka);

 String key = key1;
 String userSchema = {\type\:\record\, +
 \name\:\myrecord\, +
 \fields\:[{\name\:\f1\,\type\:\string\}]};

 Schema.Parser parser = new Schema.Parser();
 Schema schema = parser.parse(userSchema);

 GenericRecord avroRecord = new GenericData.Record(schema);
 avroRecord.put(f1, value4);

 ProducerRecordObject, Object data = new ProducerRecordObject,
 Object(test, key , avroRecord);
 producer.send(data);

 }
 }


 The output is:

 Mar 17, 2015 5:00:31 PM org.apache.kafka.common.config.AbstractConfig
 logAll
 INFO: ProducerConfig values:
 compression.type = none
 metric.reporters = []
 metadata.max.age.ms = 30
 metadata.fetch.timeout.ms = 6
 acks = 1
 batch.size = 16384
 reconnect.backoff.ms = 10
 bootstrap.servers = [localhost:9092]
 receive.buffer.bytes = 32768
 retry.backoff.ms = 100
 buffer.memory = 33554432
 timeout.ms = 3
 key.serializer = class
 io.confluent.kafka.serializers.KafkaAvroSerializer
 retries = 0
 max.request.size = 1048576
 block.on.buffer.full = true
 value.serializer = class
 io.confluent.kafka.serializers.KafkaAvroSerializer
 metrics.sample.window.ms = 3
 send.buffer.bytes = 131072
 max.in.flight.requests.per.connection = 5
 metrics.num.samples = 2
 linger.ms = 0
 client.id =

 Mar 17, 2015 5:00:32 PM org.apache.kafka.common.config.AbstractConfig
 logUnused
 WARNING: The configuration schema.registry.url = null was supplied but
 isn't a known config.

 Please help

 Thanks




-- 
Thanks,
Ewen


Re: High Level Consumer Example in 0.8.2

2015-03-11 Thread Ewen Cheslack-Postava
That example still works, the high level consumer interface hasn't changed.

There is a new high level consumer on the way and an initial version has
been checked into trunk, but it won't be ready to use until 0.9.


On Wed, Mar 11, 2015 at 9:05 AM, ankit tyagi ankittyagi.mn...@gmail.com
wrote:

 Hi All,

 we are upgrading our kafka client version from 0.8.0 to 0.8.2.

 Is there any document  for High level kafka consumer withMultiple thread
 like
 https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example
 for this newer version.




-- 
Thanks,
Ewen


Re: Possible to count for unclosed resources in process

2015-03-06 Thread Ewen Cheslack-Postava
You could also take a thread dump to try to find them by their network
threads. For example this is how new producer network threads are named:

String ioThreadName = kafka-producer-network-thread +
(clientId.length()  0 ?  |  + clientId : );



On Fri, Mar 6, 2015 at 1:04 PM, Gwen Shapira gshap...@cloudera.com wrote:

 It doesn't keep track specifically, but there are open sockets that may
 take a while to clean themselves up.

 Note that if you use the async producer and don't close the producer
 nicely, you may miss messages as the connection will close before all
 messages are sent. Guess how we found out? :)

 Similar for consumer, if you use high level consumer and don't close the
 consumer nicely, you may not acknowledge the last messages and they will be
 re-read next time the consumer starts, leading to duplicates.

 Gwen



 On Fri, Mar 6, 2015 at 12:40 PM, Stuart Reynolds s...@stureynolds.com
 wrote:

  One of our staff has has been terrible at adding finally clauses to
  close kafka resources.
 
  Does the kafka scala/Java client maintain a count or list of open
  producers/consumers/client connections?
 




-- 
Thanks,
Ewen


Re: Which node do you send data to?

2015-03-06 Thread Ewen Cheslack-Postava
Spencer,

Kafka (and it's clients) handle failover automatically for you. When you
create a topic, you can select a replication factor. For a replication
factor n, each partition of the topic will be replicated to n different
brokers. At any given time, one of those brokers is considered the leader
for that topic and that is the only server you can communicate with to
produce new messages. That broker will then make sure the data is copied to
the other replicas. If that leader fails, one of the replicas will take
over and the producer will have to send data to that node.

But all of this should happen automatically. As long as you set the
bootstrap.servers setting (for the new producer) so that at least one of
them is always available (ideally it should just include all the brokers),
then you shouldn't have to worry about this. Of course if a node fails you
have to deal with bringing it back up/moving it to a new machine, but
producers should continue to function normally by moving traffic to the new
leader.



On Fri, Mar 6, 2015 at 9:05 PM, Daniel Moreno d...@max2.com wrote:

 Hi Spencer,

 You can configure your producers with a list of brokers. You can add all,
 but usually at least two of the brokers in your cluster.

 Kind Regards,

 Daniel Moreno


 On Mar 6, 2015, at 23:43, Spencer Owen so...@netdocuments.commailto:
 so...@netdocuments.com wrote:

 I've setup a kafka cluster with 3 nodes.

 Which node should I push the data to? I would normally push to kafka01,
 but if that node goes down, then the entire cluster goes down.

 How have other people solved this. Maybe a nginx reverse proxy?



 http://stackoverflow.com/questions/28911410/which-node-should-i-push-data-to-in-a-cluster
 
 This message may contain information that is privileged or confidential.
 If you received this transmission in error, please notify the sender by
 reply e-mail and delete the message and any attachments.

 Warning: All email sent to this address will be received by the
 NetDocuments corporate e-mail system and is subject to archival and review
 by someone other than the recipient.

 The services to which this email (or any email in reply or to which this
 email replies) relates are provided solely by NetDocuments and NetDocuments
 has no responsibility for the services to which this email relates (or any
 email in reply or to which this email replies).




-- 
Thanks,
Ewen


Re: High Level Consumer Example in 0.8.2

2015-03-12 Thread Ewen Cheslack-Postava
You actually only need kafka_2.10-0.8.2.1 because it depends on
kafka-clients-0.8.2.1 so the new producer code will get pulled in
transitively. But there's nothing wrong with explicitly stating the
dependency. However, I wouldn't mix versions (i.e. use
kafka-clients-0.8.2.1 instead of kakfa-clients-0.8.2.0) -- no need to
include two different copies of nearly identical code!

Long term, we want you to only have to depend on kafka-clients-version,
but for now if you need both producer and consumer in the same project, you
should depend on the core Kafka jar.

On Wed, Mar 11, 2015 at 10:54 PM, ankit tyagi ankittyagi.mn...@gmail.com
wrote:

 Hi Eden,

 Just One Question,

 if i want to have implementation for new producer then in that case i need
 to include below dependencies

  1. *kafka-clients-0.8.2.0.jar (for new implementation of producer)*
 * 2. kafka_2.10-0.8.2.1 jar (for consumer )*

 As I see package of producer is different in both the jar, so there won't
 be any conflicts .



 On Thu, Mar 12, 2015 at 10:51 AM, Ewen Cheslack-Postava e...@confluent.io
 
 wrote:

  Ah, I see the confusion now. The kafka-clients jar was introduced only
  recently and is meant to hold the new clients, which are pure Java
  implementations and cleanly isolated from the server code. The old
 clients
  (including what we call the old consumer since a new consumer is being
  developed, but is really the current consumer) are in the core module.
  For that you want a kafka_scala_version.jar. For example, this artifact
  should work for you:
 
 
 http://search.maven.org/#artifactdetails|org.apache.kafka|kafka_2.10|0.8.2.1|jar
 
  By the way, you probably also want 0.8.2.1 now since it fixed a few
  important bugs in 0.8.2.0.
 
  -Ewen
 
  On Wed, Mar 11, 2015 at 10:08 PM, ankit tyagi 
 ankittyagi.mn...@gmail.com
  wrote:
 
   Hi Ewen,
  
   I am using* kafka-clients-0.8.2.0.jar* client jar and i don't see above
   interface in that. which dependency do i need to include??
  
   On Wed, Mar 11, 2015 at 10:17 PM, Ewen Cheslack-Postava 
  e...@confluent.io
   
   wrote:
  
That example still works, the high level consumer interface hasn't
   changed.
   
There is a new high level consumer on the way and an initial version
  has
been checked into trunk, but it won't be ready to use until 0.9.
   
   
On Wed, Mar 11, 2015 at 9:05 AM, ankit tyagi 
  ankittyagi.mn...@gmail.com
   
wrote:
   
 Hi All,

 we are upgrading our kafka client version from 0.8.0 to 0.8.2.

 Is there any document  for High level kafka consumer withMultiple
   thread
 like

  
 https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example
 for this newer version.

   
   
   
--
Thanks,
Ewen
   
  
 
 
 
  --
  Thanks,
  Ewen
 




-- 
Thanks,
Ewen


Re: High Level Consumer Example in 0.8.2

2015-03-11 Thread Ewen Cheslack-Postava
Ah, I see the confusion now. The kafka-clients jar was introduced only
recently and is meant to hold the new clients, which are pure Java
implementations and cleanly isolated from the server code. The old clients
(including what we call the old consumer since a new consumer is being
developed, but is really the current consumer) are in the core module.
For that you want a kafka_scala_version.jar. For example, this artifact
should work for you:
http://search.maven.org/#artifactdetails|org.apache.kafka|kafka_2.10|0.8.2.1|jar

By the way, you probably also want 0.8.2.1 now since it fixed a few
important bugs in 0.8.2.0.

-Ewen

On Wed, Mar 11, 2015 at 10:08 PM, ankit tyagi ankittyagi.mn...@gmail.com
wrote:

 Hi Ewen,

 I am using* kafka-clients-0.8.2.0.jar* client jar and i don't see above
 interface in that. which dependency do i need to include??

 On Wed, Mar 11, 2015 at 10:17 PM, Ewen Cheslack-Postava e...@confluent.io
 
 wrote:

  That example still works, the high level consumer interface hasn't
 changed.
 
  There is a new high level consumer on the way and an initial version has
  been checked into trunk, but it won't be ready to use until 0.9.
 
 
  On Wed, Mar 11, 2015 at 9:05 AM, ankit tyagi ankittyagi.mn...@gmail.com
 
  wrote:
 
   Hi All,
  
   we are upgrading our kafka client version from 0.8.0 to 0.8.2.
  
   Is there any document  for High level kafka consumer withMultiple
 thread
   like
  
 https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example
   for this newer version.
  
 
 
 
  --
  Thanks,
  Ewen
 




-- 
Thanks,
Ewen


Re: Message routing, Kafka-to-REST and HTTP API tools/frameworks for Kafka?

2015-03-25 Thread Ewen Cheslack-Postava
For 3, Confluent wrote a REST proxy that's pretty comprehensive. See the
docs: http://confluent.io/docs/current/kafka-rest/docs/intro.html and a
blog post describing it + future directions:
http://blog.confluent.io/2015/03/25/a-comprehensive-open-source-rest-proxy-for-kafka/

There are a few other REST proxies:
https://cwiki.apache.org/confluence/display/KAFKA/Clients#Clients-HTTPREST
But I don't think any of them support everything you need yet --
specifically, security stuff isn't included in any of them yet. You could
address this with some performance hit by putting another HTTP server like
nginx in front of the proxies (locally with each instance) to get that
control. For Confluent's proxy, we're also thinking about how to add
security features since we'll need something to protect admin operations:
https://github.com/confluentinc/kafka-rest/wiki/Project---Admin-APIs

-Ewen

On Tue, Mar 24, 2015 at 11:34 PM, Nagesh nageswara.r...@gmail.com wrote:

 Hi,

 I think for 2) you can use Kafka Consumer and push messages to vertex event
 bus, which already have REST implementation (vertx-jersey).

 I would say, Vertx cluster can be used as receive data irrespective of
 topic and then publish to particular kafka topic. Then consume messages
 from kafka by different consumer and distribute. Kafka can hold messages
 without dropping at bursts and even at the time down stream slows down.

 Regards,
 Nageswara Rao

 On Wed, Mar 25, 2015 at 10:58 AM, Manoj Khangaonkar khangaon...@gmail.com
 
 wrote:

  Hi,
 
  For (1) and perhaps even for (2) where distribution/filtering on scale is
  required, I would look at using Apache Storm with kafka.
 
  For (3) , it seems you just need REST services wrapping kafka
  consumers/producers. I would start with usual suspects like jersey.
 
  regards
 
  On Tue, Mar 24, 2015 at 12:06 PM, Valentin kafka-...@sblk.de
 wrote:
 
  
   Hi guys,
  
   we have three Kafka use cases for which we have written our own PoC
   implementations,
   but where I am wondering whether there might be any fitting open source
   solution/tool/framework out there.
   Maybe someone of you has some ideas/pointers? :)
  
   1) Message routing/distribution/filter tool
   We need to copy messages from a set of input topics to a set of output
   topics
   based on their message key values. Each message in an input topic will
 go
   to 0 to N output topics,
   each output topic will receive messages from 0 to N input topics.
   So basically the tool acts as a message routing component in our
 system.
   Example configuration:
   input topic A:output topic K:key value 1,key value 2,key value
  3
   input topic A:output topic L:key value 2,key value 4
   input topic B:output topic K:key value 5,key value 6
   ...
   It would also be interesting to define distribution/filter rules based
 on
   regular expressions on the message key or message body.
  
   2) Kafka-to-REST Push service
   We need to consume messages from a set of topics, translate them into
  REST
   web service calls
   and forward the data to existing, non-Kafka-aware systems with REST
 APIs
   that way.
  
   3) HTTP REST API for consumers and producers
   We need to expose the simple consumer and the producer functionalities
  via
   REST web service calls,
   with authentication and per-topic-authorization on REST API level and
 TLS
   for transport encryption.
   Offset tracking is done by the connected systems, not by the
   broker/zookeeper/REST API.
   We expect a high message volume in the future here, so performance
 would
   be a key concern.
  
   Greetings
   Valentin
  
 
 
 
  --
  http://khangaonkar.blogspot.com/
 



 --
 Thanks,
 Nageswara Rao.V

 *The LORD reigns*




-- 
Thanks,
Ewen


Re: schemaregistry example

2015-03-31 Thread Ewen Cheslack-Postava
The name for the int type in Avro is int not integer. Your command
should work if you change field2's type.

-Ewen

On Tue, Mar 31, 2015 at 1:51 AM, Clint Mcneil clintmcn...@gmail.com wrote:

 Hi guys

 When trying the example schema in
 http://confluent.io/docs/current/schema-registry/docs/api.html

 POST /subjects/test HTTP/1.1Host: schemaregistry.example.comAccept:
 application/vnd.schemaregistry.v1+json,
 application/vnd.schemaregistry+json, application/json
 {
   schema:
  {\type\: \record\,
 \name\: \test\,\fields\:  [
   {  \type\: \string\,
  \name\: \field1\},
 {  \type\: \integer\,
 \name\: \field2\}  ]
}
 }

 I get the following error...

 IR-52:confluent-1.0 clint$ curl -X POST -i -H Content-Type:
 application/vnd.schemaregistry.v1+json \
  --data '{schema: { \type\: \record\, \name\: \test\,
 \fields\: [{ \type\: \string\, \name\: \field1\ }, { \type\:
 \integer\, \name\: \field2\ }]}}' \
  http://localhost:8081/subjects/Clint1-key/versions
 HTTP/1.1 422
 Content-Length: 71
 Content-Type: application/vnd.schemaregistry.v1+json
 Server: Jetty(8.1.16.v20140903)

 {error_code:42201,message:Input schema is an invalid Avro
 schema}IR-52:confluent-1.0 clint$

 Please advise

 Thanks




-- 
Thanks,
Ewen


Re: How Query Topic For Metadata

2015-02-27 Thread Ewen Cheslack-Postava
You might want ZkUtils.getPartitionsForTopic. But beware that it's an
internal method that could potentially change or disappear in the future.

If you're just looking for protocol-level solutions, the metadata API has a
request that will return info about the number of partitions:
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI

On Thu, Feb 26, 2015 at 10:10 PM, Alex Melville amelvi...@g.hmc.edu wrote:

 I am writing a custom producer that needs to know information about the
 topic it's about to produce to. In particular it needs to know the number
 of partitions on the topic. Is there some utility method that returns such
 data? I am using scala v2.9.2 and kafka v8.2.0.


 Alex




-- 
Thanks,
Ewen


Re: REST/Proxy Consumer access

2015-03-05 Thread Ewen Cheslack-Postava
Yes, Confluent built a REST proxy that gives access to cluster metadata
(e.g. list topics, leaders for partitions, etc), producer (send binary or
Avro messages to any topic), and consumer (run a consumer instance and
consume messages from a topic). And you are correct, internally it uses
Jetty and Jersey.

http://confluent.io/docs/current/kafka-rest/docs/intro.html gives a pretty
detailed listing of what's supported, what isn't yet, and how to get
started. It's also open source, so if you find something you need is
missing, we'd love to get a pull request! Code is available at
https://github.com/confluentinc/kafka-rest

There's also a mailing list for Confluent's platform that you might be
interested in: https://groups.google.com/forum/#!forum/confluent-platform
If you have questions about getting started with the REST proxy, asking
there might be better so we keep this list focused on Kafka itself.

Finally, if Confluent's REST proxy doesn't work for you, we'd like to know
why, but there are also some alternatives -- see the HTTP REST section of
https://cwiki.apache.org/confluence/display/KAFKA/Clients

-Ewen

On Thu, Mar 5, 2015 at 9:55 AM, Julio Castillo 
jcasti...@financialengines.com wrote:

 I read the description of the new Confluent Platform and it briefly
 describes some REST access to a producer and a consumer.
 Does this mean there is a new process(es) running (Jetty based)?
 This process integrates both the consumer and producer libraries?

 Thanks

 Julio Castillo

 NOTICE: This e-mail and any attachments to it may be privileged,
 confidential or contain trade secret information and is intended only for
 the use of the individual or entity to which it is addressed. If this
 e-mail was sent to you in error, please notify me immediately by either
 reply e-mail or by phone at 408.498.6000, and do not use, disseminate,
 retain, print or copy the e-mail or any attachment. All messages sent to
 and from this e-mail address may be monitored as permitted by or necessary
 under applicable law and regulations.




-- 
Thanks,
Ewen


Re: If you run Kafka in AWS or Docker, how do you persist data?

2015-03-01 Thread Ewen Cheslack-Postava
On Fri, Feb 27, 2015 at 8:09 PM, Jeff Schroeder jeffschroe...@computer.org
wrote:

 Kafka on dedicated hosts running in docker under marathon under Mesos. It
 was a real bear to get working, but is really beautiful once I did manage
 to get it working. I simply run with a unique hostname constraint and
 number of instances = replication factor. If a broker dies and it isn't a
 hardware or network issue, marathon restarts it.

 The hardest part was that Kafka was registering to ZK with the internal (to
 docker) port. My workaround was that you have to use the same port inside
 and outside docker or it will register to ZK with whatever the port is
 inside the container.


You should be able to use advertised.host.name and advertised.port to
control this, so you aren't required to use the same port inside and
outside Docker.



 FYI this is an on premise dedicated Mesos cluster running on bare metal :)

 On Friday, February 27, 2015, James Cheng jch...@tivo.com wrote:

  Hi,
 
  I know that Netflix might be talking about Kafka on AWS at the March
  meetup, but I wanted to bring up the topic anyway.
 
  I'm sure that some people are running Kafka in AWS. Is anyone running
  Kafka within docker in production? How does that work?
 
  For both of these, how do you persist data? If on AWS, do you use EBS? Do
  you use ephemeral storage and then rely on replication? And if using
  docker, do you persist data outside the docker container and on the host
  machine?


On AWS, your choice will depend on a tradeoff of tolerance for data loss,
performance, and price sensitivity. You might be able to get better/more
predictable performance out of the ephemeral instance storage, but since
you are presumably running all instances in the same AZ you leave yourself
open to significant data loss if there's a coordinated outage. It's pretty
rare, but it does happen. With EBS you may have to do more work or spread
across more volumes to get the same throughput. Relevant quote from the
docs on provisioned IOPS: Additionally, you can stripe multiple volumes
together to achieve up to 48,000 IOPS or 800MBps when attached to larger
EC2 instances. (Note MBps not Mbps.) Other considerations: AWS has been
moving most of its instance storage to SSDs, so getting enough instance
storage space can be relatively pricey, and you can also potentially go
with a hybrid setup to get a balance of the two, but you'll need to be very
careful about partition assignment then to ensure at least one copy of
every partition ends up on an EBS-backed node.

For Docker, you probably want the data to be stored on a volume. If
possible, it would be better if non-hardware errors could be resolved just
by restarting the broker. You'll avoid a lot of needless copying of data.
Storing data in a volume would let you simply restart a new container and
have it pick up where the last one left off. The example of Postgres given
for a volume container in https://docs.docker.com/userguide/dockervolumes/
isn't too far from Kafka if you were to assume Postgres was replicating to
a slave -- you'd prefer to reuse the existing data on the existing node
(which a volume container enables), but could still handle bringing up a
new node if necessary.



 
  And related, how do you deal with broker failure? Do you simply replace
  it, and repopulate a new broker via replication? Or do you bring back up
  the broker with the persisted files?
 
  Trying to learn about what people are doing, beyond on premises and
  dedicated hardware.
 
  Thanks,
  -James
 
 

 --
 Text by Jeff, typos by iPhone




-- 
Thanks,
Ewen


Re: Broker w/ high memory due to index file sizes

2015-02-22 Thread Ewen Cheslack-Postava
If you haven't seen it yet, you probably want to look at
http://kafka.apache.org/documentation.html#java

-Ewen

On Thu, Feb 19, 2015 at 10:53 AM, Zakee kzak...@netzero.net wrote:

 Well are there any measurement techniques for Memory config in brokers. We
 do have a large load, with a max throughput 200MB/s. What do you suggest as
 the recommended memory config for 5 brokers to handle such loads?

 On Wed, Feb 18, 2015 at 7:13 PM, Jay Kreps jay.kr...@gmail.com wrote:

  40G is really huge, generally you would want more like 4G. Are you sure
 you
  need that? Not sure what you mean by lsof and index files being too
 large,
  but the index files are memory mapped so they should be able to grow
  arbitrarily large and their memory usage is not counted in the java heap
  (in fact by having such a large heap you are taking away OS memory from
  them).
 
  -Jay
 
  On Wed, Feb 18, 2015 at 4:13 PM, Zakee kzak...@netzero.net wrote:
 
   I am running a cluster of 5 brokers with 40G ms/mx for each. I found
 one
  of
   the brokers is constantly using above ~90% of memory for
 jvm.heapUsage. I
   checked from lsof output that the size of the index files for this
 broker
   is too large.
  
   Not sure what is going on with this one broker in the cluster? Why
 would
   the index file sizes be so hugely different on one broker? Any ideas?
  
  
   Regards
   Zakee
   
   Invest with the Trend
   Exclusive Breakout Alert On Soaring Social Media Technology
  
 http://thirdpartyoffers.netzero.net/TGL3231/54e52a9fe121d2a9f4a27st01vuc
  
  Have you been injured?
  Get a free evaluation today to see what your injury case is worth.
  http://thirdpartyoffers.netzero.net/TGL3255/54e55ad9894265ad90bcbmp13duc




-- 
Thanks,
Ewen


Re: File as message's content

2015-02-26 Thread Ewen Cheslack-Postava
Kafka can accept any type of data, you just pass a byte[] to the producer
and get a byte[] back from the consumer. How you interpret it is entirely
up to your application.

But it does have limits on message size (see the message.max.bytes and
replica.fetch.max.bytes setting for brokers) and clients have limits as
well (e.g., fetch.message.max.bytes in the consumer) and since Kafka is
usually used with pretty small messages, the defaults may be too low for
your application. Those are just the settings I can think of off the top of
my head; there may be more, and you'll probably have to tweak some other
settings to achieve the best performance.


On Thu, Feb 26, 2015 at 3:21 AM, siddharth ubale siddharth.ub...@gmail.com
wrote:

 Hi ,

 Can you please let me know if we can send a file as in a pdf,jpg or Jpeg as
 a content of a message which we send via Kafka?

 Thanks,

 Siddharth Ubale




-- 
Thanks,
Ewen


Re: producer queue size

2015-03-18 Thread Ewen Cheslack-Postava
The setting you want is buffer.memory, but I don't think there's a way to
get the amount of remaining space.

The setting block.on.buffer.full controls the behavior when you run out of
space. Neither setting silently drops messages. It will either block until
there is space to add the message or throw an exception, which your
application can catch and handle however it wants.

-Ewen

On Wed, Mar 18, 2015 at 9:24 AM, sunil kalva sambarc...@gmail.com wrote:

 essentially i want to use this property queue.buffering.max.messages with
 new KafkaProducer class, and also want to access the current value of the
 queue

 SunilKalva

 On Wed, Mar 18, 2015 at 9:51 PM, sunil kalva sambarc...@gmail.com wrote:

 
  Hi
  How do i get the size of the inmemory queue which are holding messages
 and
  ready to send in async producer, i am using new KafkaProducer class in
  0.8.2.
 
  Basically instead of dropping the messages silently, i want to avoid
  sending messages if the queue is already full. I am using async
  KafkaProdcuer class.
 
  Or is there anyother better way to handle this, since i am using async
  client i can not catch the exception i think.
 
  --
  SunilKalva
 



 --
 SunilKalva




-- 
Thanks,
Ewen


Re: Hung Kafka Threads?

2015-04-13 Thread Ewen Cheslack-Postava
Parking to wait for just means the thread has been put to sleep while
waiting for some synchronized resource. In this case, ConditionObject
indicates it's probably await()ing on a condition variable. This almost
always means that thread is just waiting for notification from another
thread that there's something to do (AbstractQueuedSynchronizer is a
generic low-level synchronization utility and used for things like
BlockingQueues). The only way it would be hung in this condition is if no
other thread will ever wake it up (e.g. due to some deadlock, the thread it
relies on to wake it up dies, etc.)

With just the line given, you can't tell exactly what's going on, but
brokers especially may use very little CPU since most of what they do is
IO. Seeing these lines in a thread dump is completely normal since there
will often be threads just waiting for new work to do, so they shouldn't
worry you unless you have other reason to believe something is wrong.



On Mon, Apr 13, 2015 at 9:19 PM, Sharma, Prashant psha...@netsuite.com
wrote:


  Kafka version is 0.8.1.1. On taking a thread dump against one of our
 servers in Kafka Cluster, i see lot of threads with message below:



 SOMEID-9 id=67784 idx=0x75c tid=24485 prio=5 alive, parked,
 native_blocked, daemon

 -- Parking to wait for:
 java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject@0x100a13500

 at jrockit/vm/Locks.park0(J)V(Native Method)



 There are multiple threads with the message “Parking to wait for” with the
 same objectId- 0x100a13500.

 Does this imply that the consumer threads are  stuck or  just sitting
 idle?

 Thanks,

 Prashant

 [image: Suite World 2015] http://www.netsuite.com/suiteworld
 To learn more about SuiteWorld, visit http://www.netsuitesuiteworld.com/
 http://www.netsuite.com/suiteworld

 NOTICE: This email and any attachments may contain confidential and
 proprietary information of NetSuite Inc. and is for the sole use of the
 intended recipient for the stated purpose. Any improper use or distribution
 is prohibited. If you are not the intended recipient, please notify the
 sender; do not review, copy or distribute; and promptly delete or destroy
 all transmitted information. Please note that all communications and
 information transmitted through this email system may be monitored and
 retained by NetSuite or its agents and that all incoming email is
 automatically scanned by a third party spam and filtering service which may
 result in deletion of a legitimate e-mail before it is read by the intended
 recipient.




-- 
Thanks,
Ewen


Re: serveral questions about auto.offset.reset

2015-04-13 Thread Ewen Cheslack-Postava
On Mon, Apr 13, 2015 at 10:10 PM, bit1...@163.com bit1...@163.com wrote:

 Hi, Kafka experts:

 I got serveral questions about auto.offset.reset. This configuration
 parameter governs how  consumer read the message from Kafka when there is
 no initial offset in ZooKeeper or if an offset is out of range.

 Q1. no initial offset in zookeeper   means that there isn't any consumer
 to consume the message yet(The offset is set once the consumer starts to
 consume)?


Yes, or if you consumed messages, but auto offset commit is disabled and
you haven't explicitly committed any offsets.


 Q2:  What does offset is out of range mean? Can you eleborate one
 scenario when offset is out of range could happen?


Kafka uses a retention policy for topics to expire data and clean it up. If
some messages expire and your consumer hasn't run in awhile, the last
committed offset may no longer exist.



 auto.offset.reset has two values:smallest and largest.
 Assume one scenario: A producer has produced 10 messages to kafka, and
 there is no consumer yet to consume it.
 Q3: If auto.offset.reset is set to smallest, does it mean that the
 consumer will read the message from the offset 0?(0 is smallest here)


Yes.


 Q4: If auto.offset.reset is set to largest, does it mean that the
 consumer will not read any message but wait until new messages come?


Also correct. This is why in the quickstart you need to use the
--from-beginning flag on the console consumer. Since the consumer is
executed after the console producer it wouldn't see any messages unless it
set auto.offset.reset to smallest, which is what --from-beginning does.





 bit1...@163.com




-- 
Thanks,
Ewen


Re: Kafka 0.8.2 beta - release

2015-04-30 Thread Ewen Cheslack-Postava
That's part of the new consumer API that hasn't been released yet. The API
happens to be included in the 0.8.2.* artifacts because it is under
development, but isn't yet released -- it hasn't been mentioned in the
release notes, nor is it in the official documentation:
http://kafka.apache.org/documentation.html

That API is currently under active development and should be available in
the next release. If you want to test it out, you can use build a copy
yourself of trunk, but the high-level consumer functionality is not yet
implemented so it likely does not include everything you want. For the time
being, you probably want to use the existing high level consumer API:
http://kafka.apache.org/documentation.html#highlevelconsumerapi


On Wed, Apr 29, 2015 at 11:07 PM, Gomathivinayagam Muthuvinayagam 
sankarm...@gmail.com wrote:

 Thank you,

 It seems the following methods are not supported in KafkaConsumer. Do you
 know when they will be supported?

 public OffsetMetadata commit(MapTopicPartition, Long offsets, boolean
 sync) {

 throw new UnsupportedOperationException();

 }

 Thanks  Regards,



 On Wed, Apr 29, 2015 at 10:52 PM, Ewen Cheslack-Postava e...@confluent.io
 
 wrote:

  It has already been released, including a minor revision to fix some
  critical bugs. The latest release is 0.8.2.1. The downloads page has
 links
  and release notes: http://kafka.apache.org/downloads.html
 
  On Wed, Apr 29, 2015 at 10:22 PM, Gomathivinayagam Muthuvinayagam 
  sankarm...@gmail.com wrote:
 
   I see lot of interesting features with Kafka 0.8.2 beta. I am just
   wondering when that will be released. Is there any timeline for that?
  
   Thanks  Regards,
  
 
 
 
  --
  Thanks,
  Ewen
 




-- 
Thanks,
Ewen


Re: New producer: metadata update problem on 2 Node cluster.

2015-04-27 Thread Ewen Cheslack-Postava
Maybe add this to the description of
https://issues.apache.org/jira/browse/KAFKA-1843 ? I can't find it now, but
I think there was another bug where I described a similar problem -- in
some cases it makes sense to fall back to the list of bootstrap nodes
because you've gotten into a bad state and can't make any progress without
a metadata update but can't connect to any nodes. The leastLoadedNode
method only considers nodes in the current metadata, so in your example K1
is not considered an option after seeing the producer metadata update that
only includes K2. In KAFKA-1501 I also found another obscure edge case
where you can run into this problem if your broker hostnames/ports aren't
consistent across restarts. Yours is obviously much more likely to occur,
and may not even be that uncommon for producers that are only sending data
to one topi.

If you have logs at debug level, are you seeing this message in between the
connection attempts:

Give up sending metadata request since no node is available

Also, if you let it continue running, does it recover after the
metadata.max.age.ms timeout? If so, I think that would definitely confirm
the issue and might suggest a fix -- preserve the bootstrap metadata and
fall back to choosing a node from it when leastLoadedNode would otherwise
return null.

-Ewen

On Mon, Apr 27, 2015 at 5:40 AM, Manikumar Reddy manikumar.re...@gmail.com
wrote:

 Any comments on this issue?
 On Apr 24, 2015 8:05 PM, Manikumar Reddy ku...@nmsworks.co.in wrote:

  We are testing new producer on a 2 node cluster.
  Under some node failure scenarios, producer is not able
  to update metadata.
 
  Steps to reproduce
  1. form a 2 node cluster (K1, K2)
  2. create a topic with single partition, replication factor = 2
  3. start producing data (producer metadata : K1,K2)
  2. Kill leader node (say K1)
  3. K2 becomes the leader (producer metadata : K2)
  4. Bring back K1 and Kill K2 before metadata.max.age.ms
  5. K1 becomes the Leader (producer metadata still contains : K2)
 
  After this point, producer is not able to update the metadata.
  producer continuously trying to connect with dead node (K2).
 
  This looks like a bug to me. Am I missing anything?
 




-- 
Thanks,
Ewen


Re: Kafka 0.8.2 beta - release

2015-04-29 Thread Ewen Cheslack-Postava
It has already been released, including a minor revision to fix some
critical bugs. The latest release is 0.8.2.1. The downloads page has links
and release notes: http://kafka.apache.org/downloads.html

On Wed, Apr 29, 2015 at 10:22 PM, Gomathivinayagam Muthuvinayagam 
sankarm...@gmail.com wrote:

 I see lot of interesting features with Kafka 0.8.2 beta. I am just
 wondering when that will be released. Is there any timeline for that?

 Thanks  Regards,




-- 
Thanks,
Ewen


Re: New producer: metadata update problem on 2 Node cluster.

2015-04-28 Thread Ewen Cheslack-Postava
Ok, all of that makes sense. The only way to possibly recover from that
state is either for K2 to come back up allowing the metadata refresh to
eventually succeed or to eventually try some other node in the cluster.
Reusing the bootstrap nodes is one possibility. Another would be for the
client to get more metadata than is required for the topics it needs in
order to ensure it has more nodes to use as options when looking for a node
to fetch metadata from. I added your description to KAFKA-1843, although it
might also make sense as a separate bug since fixing it could be considered
incremental progress towards resolving 1843.

On Tue, Apr 28, 2015 at 9:18 AM, Manikumar Reddy ku...@nmsworks.co.in
wrote:

 Hi Ewen,

  Thanks for the response.  I agree with you, In some case we should use
 bootstrap servers.


 
  If you have logs at debug level, are you seeing this message in between
 the
  connection attempts:
 
  Give up sending metadata request since no node is available
 

  Yes, this log came for couple of times.


 
  Also, if you let it continue running, does it recover after the
  metadata.max.age.ms timeout?
 

  It does not reconnect.  It is continuously trying to connect with dead
 node.


 -Manikumar




-- 
Thanks,
Ewen


Re: New Producer API - batched sync mode support

2015-04-27 Thread Ewen Cheslack-Postava
A couple of thoughts:

1. @Joel I agree it's not hard to use the new API but it definitely is more
verbose. If that snippet of code is being written across hundreds of
projects, that probably means we're missing an important API. Right now
I've only seen the one complaint, but it's worth finding out how many
people feel like it's missing. And given that internally each of the
returned Futures just uses the future for the entire batch, I think it's
probably worth investigating if getting rid of millions of allocs per
second is worth it, even if they should be in the nursery and fast to
collect.

2. For lots of small messages, there's definitely the potential for a
performance benefit by avoiding a lot of lock acquire/release in send(). If
you make a first pass to organize by topic partition and then process each
group, you lock # of partitions times rather than # of messages times. One
major drawback I see is that it seems to make a mess of error
handling/blocking when the RecordAccumulator runs out of space.

3. @Roshan In the other thread you mentioned 10 byte messages. Is this a
realistic payload size for you? I can imagine applications where it is (and
we should support those well), it just sounds unusually small.

4. I reproduced Jay's benchmark blog post awhile ago in an automated test
(see
https://github.com/confluentinc/muckrake/blob/master/muckrake/tests/kafka_benchmark_test.py).
Here's a snippet from the output on m3.2xlarge instances that might help
shed some light on the situation:
INFO:_.KafkaBenchmark:Message size:
INFO:_.KafkaBenchmark: 10: 1637825.195625 rec/sec (15.62 MB/s)
INFO:_.KafkaBenchmark: 100: 605504.877911 rec/sec (57.75 MB/s)
INFO:_.KafkaBenchmark: 1000: 90351.817570 rec/sec (86.17 MB/s)
INFO:_.KafkaBenchmark: 1: 8306.180862 rec/sec (79.21 MB/s)
INFO:_.KafkaBenchmark: 10: 978.403499 rec/sec (93.31 MB/s)

That's using the single-threaded new ProducerPerformance class, so the
m3.2xlarge's # of cores probably has little influence. There's clearly a
sharp increase in throughput from 10 - 100 byte messages. I recall double
checking that the CPU was fully utilized. Note that this is with the acks=1
setting that doesn't actually exist anymore, so take with a grain of salt.

5. I'd suggest that there may be other APIs that give the implementation
more flexibility but still provide batching. For example:
* Require batched inputs to be prepartitioned so each call specifies the
TopicPartition. Main benefit here is that the producer avoids having to do
all the sorting, which the application may already be doing anyway.
* How about an API similar to fwrite() where you provide a set of messages
but it may only write some of them and tells you how many it wrote? This
could be a clean way to expose the underlying batching that is performed
without being a completely leaky abstraction. We could then return just a
single future for the entire batch, we'd do minimal locking, etc. Not sure
how to handle different TopicPartitions in the same set. I think this could
be a good pattern for people who want maximally efficient ordered writes
where errors are properly handled too.

6. If I recall correctly, doesn't compression occur in a synchronized
block, I think in the RecordAccumulator? Or maybe it was in the network
thread? In any case, I seem to recall compression also possibly playing an
important role in performance because it operates over a set of records
which limits where you can run it. @Roshan, are you using compression, both
in your microbenchmarks and your application?

I think there's almost definitely a good case to be made for a batch API,
but probably needs some very clear motivating use cases and perf
measurements showing why it's not going to be feasible to accomplish with
the current API + a few helpers to wrap it in a batch API.

-Ewen


On Mon, Apr 27, 2015 at 4:24 PM, Joel Koshy jjkosh...@gmail.com wrote:


Fine grained tracking of status of individual events is quite painful
 in
  contrast to simply blocking on every batch. Old style Batched-sync mode
  has great advantages in terms of simplicity and performance.

 I may be missing something, but I'm not so convinced that it is that
 painful/very different from the old-style.

 In the old approach, you would compose a batch (in a list of messages)
 and do a synchronous send:

 try {
   producer.send(recordsToSend)
 }
 catch (...) {
   // handle (e.g., retry sending recordsToSend)
 }

 In the new approach, you would do (something like) this:

 for (record: recordsToSend) {
   futureList.add(producer.send(record));
 }
 producer.flush();
 for (result: futureList) {
   try { result.get(); }
   catch (...) { // handle (e.g., retry sending recordsToSend) }
 }





-- 
Thanks,
Ewen


Re: New producer: metadata update problem on 2 Node cluster.

2015-05-05 Thread Ewen Cheslack-Postava
I'm not sure about the old producer behavior in this same failure scenario,
but creating a new producer instance would resolve the issue since it would
start with the list of bootstrap nodes and, assuming at least one of them
was up, it would be able to fetch up to date metadata.

On Tue, May 5, 2015 at 5:32 PM, Jason Rosenberg j...@squareup.com wrote:

 Can you clarify, is this issue here specific to the new producer?  With
 the old producer, we routinely construct a new producer which makes a
 fresh metadata request (via a VIP connected to all nodes in the cluster).
 Would this approach work with the new producer?

 Jason


 On Tue, May 5, 2015 at 1:12 PM, Rahul Jain rahul...@gmail.com wrote:

  Mayuresh,
  I was testing this in a development environment and manually brought
 down a
  node to simulate this. So the dead node never came back up.
 
  My colleague and I were able to consistently see this behaviour several
  times during the testing.
  On 5 May 2015 20:32, Mayuresh Gharat gharatmayures...@gmail.com
 wrote:
 
   I agree that to find the least Loaded node the producer should fall
 back
  to
   the bootstrap nodes if its not able to connect to any nodes in the
  current
   metadata. That should resolve this.
  
   Rahul, I suppose the problem went off because the dead node in your
 case
   might have came back up and allowed for a metadata update. Can you
  confirm
   this?
  
   Thanks,
  
   Mayuresh
  
   On Tue, May 5, 2015 at 5:10 AM, Rahul Jain rahul...@gmail.com wrote:
  
We observed the exact same error. Not very clear about the root cause
although it appears to be related to leastLoadedNode implementation.
Interestingly, the problem went away by increasing the value of
reconnect.backoff.ms to 1000ms.
On 29 Apr 2015 00:32, Ewen Cheslack-Postava e...@confluent.io
  wrote:
   
 Ok, all of that makes sense. The only way to possibly recover from
  that
 state is either for K2 to come back up allowing the metadata
 refresh
  to
 eventually succeed or to eventually try some other node in the
  cluster.
 Reusing the bootstrap nodes is one possibility. Another would be
 for
   the
 client to get more metadata than is required for the topics it
 needs
  in
 order to ensure it has more nodes to use as options when looking
 for
  a
node
 to fetch metadata from. I added your description to KAFKA-1843,
   although
it
 might also make sense as a separate bug since fixing it could be
considered
 incremental progress towards resolving 1843.

 On Tue, Apr 28, 2015 at 9:18 AM, Manikumar Reddy 
  ku...@nmsworks.co.in
   
 wrote:

  Hi Ewen,
 
   Thanks for the response.  I agree with you, In some case we
 should
   use
  bootstrap servers.
 
 
  
   If you have logs at debug level, are you seeing this message in
between
  the
   connection attempts:
  
   Give up sending metadata request since no node is available
  
 
   Yes, this log came for couple of times.
 
 
  
   Also, if you let it continue running, does it recover after the
   metadata.max.age.ms timeout?
  
 
   It does not reconnect.  It is continuously trying to connect
 with
   dead
  node.
 
 
  -Manikumar
 



 --
 Thanks,
 Ewen

   
  
  
  
   --
   -Regards,
   Mayuresh R. Gharat
   (862) 250-7125
  
 




-- 
Thanks,
Ewen


Re: New producer: metadata update problem on 2 Node cluster.

2015-05-07 Thread Ewen Cheslack-Postava
Rahul, the mailing list filters attachments, you'd have to post the code
somewhere else for people to be able to see it.

But I don't think anyone suggested that creating a new consumer would fix
anything. Creating a new producer *and discarding the old one* basically
just makes it start from scratch using the bootstrap nodes, which is why
that would allow recovery from that condition.

But that's just a workaround. The real issue is that the producer only
maintains metadata for the nodes that are replicas for the partitions of
the topics the producer sends data to. In some cases, this is a small set
of servers and can get the producer stuck if a node goes offline and it
doesn't have any other nodes that it can try to communicate with to get
updated metadata (since the topic partitions should have a new leader).
Falling back on the original bootstrap servers is one solution to this
problem. Another would be to maintain metadata for additional servers so
you always have extra bootstrap nodes in your current metadata set, even
if they aren't replicas for any of the topics you're working with.

-Ewen



On Thu, May 7, 2015 at 12:06 AM, Rahul Jain rahul...@gmail.com wrote:

 Creating a new consumer instance *does not* solve this problem.

 Attaching the producer/consumer code that I used for testing.



 On Wed, May 6, 2015 at 6:31 AM, Ewen Cheslack-Postava e...@confluent.io
 wrote:

 I'm not sure about the old producer behavior in this same failure
 scenario,
 but creating a new producer instance would resolve the issue since it
 would
 start with the list of bootstrap nodes and, assuming at least one of them
 was up, it would be able to fetch up to date metadata.

 On Tue, May 5, 2015 at 5:32 PM, Jason Rosenberg j...@squareup.com wrote:

  Can you clarify, is this issue here specific to the new producer?
 With
  the old producer, we routinely construct a new producer which makes a
  fresh metadata request (via a VIP connected to all nodes in the
 cluster).
  Would this approach work with the new producer?
 
  Jason
 
 
  On Tue, May 5, 2015 at 1:12 PM, Rahul Jain rahul...@gmail.com wrote:
 
   Mayuresh,
   I was testing this in a development environment and manually brought
  down a
   node to simulate this. So the dead node never came back up.
  
   My colleague and I were able to consistently see this behaviour
 several
   times during the testing.
   On 5 May 2015 20:32, Mayuresh Gharat gharatmayures...@gmail.com
  wrote:
  
I agree that to find the least Loaded node the producer should fall
  back
   to
the bootstrap nodes if its not able to connect to any nodes in the
   current
metadata. That should resolve this.
   
Rahul, I suppose the problem went off because the dead node in your
  case
might have came back up and allowed for a metadata update. Can you
   confirm
this?
   
Thanks,
   
Mayuresh
   
On Tue, May 5, 2015 at 5:10 AM, Rahul Jain rahul...@gmail.com
 wrote:
   
 We observed the exact same error. Not very clear about the root
 cause
 although it appears to be related to leastLoadedNode
 implementation.
 Interestingly, the problem went away by increasing the value of
 reconnect.backoff.ms to 1000ms.
 On 29 Apr 2015 00:32, Ewen Cheslack-Postava e...@confluent.io
   wrote:

  Ok, all of that makes sense. The only way to possibly recover
 from
   that
  state is either for K2 to come back up allowing the metadata
  refresh
   to
  eventually succeed or to eventually try some other node in the
   cluster.
  Reusing the bootstrap nodes is one possibility. Another would be
  for
the
  client to get more metadata than is required for the topics it
  needs
   in
  order to ensure it has more nodes to use as options when looking
  for
   a
 node
  to fetch metadata from. I added your description to KAFKA-1843,
although
 it
  might also make sense as a separate bug since fixing it could be
 considered
  incremental progress towards resolving 1843.
 
  On Tue, Apr 28, 2015 at 9:18 AM, Manikumar Reddy 
   ku...@nmsworks.co.in

  wrote:
 
   Hi Ewen,
  
Thanks for the response.  I agree with you, In some case we
  should
use
   bootstrap servers.
  
  
   
If you have logs at debug level, are you seeing this
 message in
 between
   the
connection attempts:
   
Give up sending metadata request since no node is available
   
  
Yes, this log came for couple of times.
  
  
   
Also, if you let it continue running, does it recover after
 the
metadata.max.age.ms timeout?
   
  
It does not reconnect.  It is continuously trying to connect
  with
dead
   node.
  
  
   -Manikumar
  
 
 
 
  --
  Thanks,
  Ewen
 

   
   
   
--
-Regards,
Mayuresh R. Gharat
(862) 250-7125

Re: Kafka Client in Rust

2015-05-10 Thread Ewen Cheslack-Postava
Added to the wiki, which required adding a new Rust section :) Thanks for
the contribution, Yousuf!

On Sun, May 10, 2015 at 6:57 PM, Yousuf Fauzan yousuffau...@gmail.com
wrote:

 Hi All,

 I have create Kafka client for Rust. The client supports Metadata, Produce,
 Fetch, and Offset requests. I plan to add support of Consumers and Offset
 management soon.

 Will it be possible to get it added to
 https://cwiki.apache.org/confluence/display/KAFKA/Clients

 Info:
 Pure Rust implementation with support for Metadata, Produce, Fetch, and
 Offset requests. Supports Gzip and Snappy compression

 Maintainer: Yousuf Fauzan (http://fauzism.co)
 Licence: MIT

 code: https://github.com/spicavigo/kafka-rust
 doc: http://fauzism.co/rustdoc/kafka/index.html

 --
 Yousuf Fauzan




-- 
Thanks,
Ewen


Re: Is there a way to know when I've reached the end of a partition (consumed all messages) when using the high-level consumer?

2015-05-10 Thread Ewen Cheslack-Postava
@Gwen- But that only works for topics that have low enough traffic that you
would ever actually hit that timeout.

The Confluent schema registry needs to do something similar to make sure it
has fully consumed the topic it stores data in so it doesn't serve stale
data. We know in our case we'll only have a single producer to the topic
(the current leader of the schema registry cluster) so we have a different
solution. We produce a message to the topic (which is 1 partition, but this
works for a topic partition too), grab the resulting offset from the
response, then consume until we see the message we produced. Obviously this
isn't ideal since we a) have to produce extra bogus messages to the topic
and b) it only works in the case where you know the consumer is also the
only producer.

The new consumer interface sort of addresses this since it has seek
functionality, where one of the options is seekToEnd. However, I think you
have to be very careful with this, especially using the current
implementation. It seeks to the end, but it also marks those messages as
consumed. This means that even if you keep track of your original position
and seek back to it, if you use background offset commits you could end up
committing incorrect offsets, crashing, and then missing some messages when
another consumer claims that partition (or just due to another consumer
joining the group).

Not sure if there are many other use cases for grabbing the offset data
with a simple API. Might mean there's a use case for either some additional
API or some utilities independent of an actual consumer instance which
allow you to easily query the state of topics/partitions.


On Sun, May 10, 2015 at 12:43 AM, Gwen Shapira gshap...@cloudera.com
wrote:

 For Flume, we use the timeout configuration and catch the exception, with
 the assumption that no messages for few seconds == the end.

 On Sat, May 9, 2015 at 2:04 AM, James Cheng jch...@tivo.com wrote:

  Hi,
 
  I want to use the high level consumer to read all partitions for a topic,
  and know when I have reached the end. I know the end might be a
 little
  vague, since items keep showing up, but I'm trying to get as close as
  possible. I know that more messages might show up later, but I want to
 know
  when I've received all the items that are currently available in the
 topic.
 
  Is there a standard/recommended way to do this?
 
  I know one way to do it is to first issue an OffsetRequest for each
  partition, which would get me the last offset, and then use that
  information in my high level consumer to detect when I've reached that a
  message with that offset. Which is exactly what the SimpleConsumer
 example
  does (
 
 https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example
 ).
  That involves finding the leader for the partition, etc etc. Not hard,
 but
  a bunch of steps.
 
  I noticed that kafkacat has an option similar to what I'm looking for:
-e Exit successfully when last message received
 
  Looking at the code, it appears that a FetchRequest returns the
  HighwaterMarkOffset mark for a partition, and the API docs confirm that:
 
 https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-FetchResponse
 
  Does the Java high-level consumer expose the HighwaterMarkOffset in any
  way? I looked but I couldn't find such a thing.
 
  Thanks,
  -James
 
 




-- 
Thanks,
Ewen


Re: Kafka broker and producer max message default config

2015-05-12 Thread Ewen Cheslack-Postava
The max.request.size effectively caps the largest size message the producer
will send, but the actual purpose is, as the name implies, to limit the
size of a request, which could potentially include many messages. This
keeps the producer from sending very large requests to the broker. The
limitation on message size is just a side effect.


On Tue, May 12, 2015 at 12:33 AM, Rendy Bambang Junior 
rendy.b.jun...@gmail.com wrote:

 Hi,

 I see configuration for broker max.message.bytes 1,000,000
 and configuration for producer max.request.size 1,048,576

 Why is default config for broker is less than producer? If that is the case
 then there will be message sent by producer which is bigger than what
 broker could receive.

 Could anyone please clarify my understanding?

 Rendy




-- 
Thanks,
Ewen


Re: New Producer API Design

2015-05-13 Thread Ewen Cheslack-Postava
You can of course use KafkaProducerObject, Object to get a producer
interface that can accept a variety of types. For example, if you have an
Avro serializer that accepts both primitive types (e.g. String, integer
types) and complex types (e.g. records, arrays, maps), Object is the only
type you can use to cover all of those. As long as your serializer supports
it, you can use a general type and pass in a variety of types to a single
producer.

The drawback is that you don't get feedback at compile time if you pass in
a type that you weren't expecting. For example, if you know your keys are
always going to be Strings, it's probably a good idea to use a
KafkaProducerString, Object so that you catch a case where you
accidentally pass in a different object. There are a lot of use cases where
an application is only producing a single format of data, so supporting the
type checking can be valuable.

The type checking isn't going to be perfect because of type erasure and
since serializers are often instantiated via reflection. However, having
the type information can offer some compile-time protection to application
code using the clients.

-Ewen

On Wed, May 13, 2015 at 10:03 AM, Mohit Gupta success.mohit.gu...@gmail.com
 wrote:

 Hello,

 I've a question regarding the design of the new Producer API.

 As per the design (KafkaProducerK,V), it seems that a separate producer
 is required for every combination of key and value type. Where as, in
 documentation ( and elsewhere ) it's recommended to create a single
 producer instance per application and share it among the all the threads
 for best performance?

 One way to create only single producer would be to use byte[] as key/value
 type and handle the serialization at the client itself, rather than the
 producer, similar to the example in javadocs. But wouldn't this defeat the
 purpose of using generics in the producer?

 Specific to our use case, we have multiple types of messages, where each
 message type can have multiple custom serializers. And, a message can be
 pushed into mulitple topics with different serialization.


 --
 Best Regards,

 Mohit Gupta




-- 
Thanks,
Ewen


Re: [DISCUSS] KIP-14 Tools Standardization

2015-04-10 Thread Ewen Cheslack-Postava
Command line tools are definitely public interfaces. They should get the
same treatment as any other public interface like the APIs or protocols.
Improving and standardizing them is the right thing to do, but
compatibility is still important. Changes should really come with a
well-documented period of deprecation before any old flags are removed.

On Fri, Apr 10, 2015 at 10:26 AM, Andrew Otto ao...@wikimedia.org wrote:

 Col, that looks great!


  On Apr 10, 2015, at 12:48, Joe Stein joe.st...@stealth.ly wrote:
 
  With KIP-4 the ability to write you admin client in any language you want
  (including building your own REST interface) is possible.
 
  The new Apache project's tools admin client is both an Interactive Shell
  and CLI
 
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-4+-+Command+line+and+centralized+administrative+operations#KIP-4-Commandlineandcentralizedadministrativeoperations-4.InteractiveShell/CLItool
  that was geared to solve exaclty what I think the heart of KIP-14 is
  getting at here... so... maybe it already has everything you are looking
  for?
 
  ~ Joe Stein
  - - - - - - - - - - - - - - - - -
 
   http://www.stealth.ly
  - - - - - - - - - - - - - - - - -
 
  On Fri, Apr 10, 2015 at 10:28 AM, Wesley Chow w...@chartbeat.com wrote:
 
  +1 on the wikimedia kafka tool. We use it exclusively.
 
  Wes
  On Apr 10, 2015 9:32 AM, Krishna Kumar kku...@nanigans.com wrote:
 
  That seems like a better idea. It preserves the backward compatibility
 of
  existing tools, which can be updated with a warning to use the new
  commands, and that they will be retired in version 1.x - so that there
 is
  no confusion. Also avoids the one-off problem since there will only be
  one
  tool. Documentation also can be simplified.
 
 
  On 4/10/15, 9:21 AM, Andrew Otto ao...@wikimedia.org wrote:
 
  (WARNING: Unrelated but kinda related post below!)
 
 
  Cough cough, ditch the myriad of individual scripts and standardize in
  just one, or a few, that take subcommands, cough cough
  :)
 
 
  E.g.
 
 
 
 
 https://github.com/wikimedia/operations-debs-kafka/blob/debian/debian/bin/
  kafka
 
  kafka console-consumer --topic foo
 
  with ZOOKEEPER_URL as an env var is so much nicer!
 
  -Ao
 
 
  On Apr 10, 2015, at 06:
 
  00, Steve Miller st...@idrathernotsay.com wrote:
 
   I think people will thank you for fixing the inconsistent names,
  sure, but even if you just break their test tools they won't thank
 you
  for the firedrill while they revamp the testing or monitoring stuff
  they
  did on top of the current tools.  I'd rather have a somewhat-icky
 usage
  string than have to drop everything that needs tweaking, all at once.
 
  -Steve
 
  On Thu, Apr 09, 2015 at 08:56:31PM -0700, Jay Kreps wrote:
  Personally I think this is one where most people would thank us for
  fixing
  the random inconsistent names, and aside from MM most of the tools
  effected
  are just test tools.
 
  I do think jopt-simple supports providing multiple names for the
 same
  option so we could retain the old names, not sure if that screws up
  the
  usage message though.
 
  -Jay
 
  On Thu, Apr 9, 2015 at 12:40 AM, Steve Miller
  st...@idrathernotsay.com
  wrote:
 
  FWIW I like the standardization idea but just making the old
  switches
  fail
  seems like it's not the best plan.  People wrap this sort of thing
  for any
  number of reasons, and breaking all of their stuff all at once is
  not
  going
  to make them happy.  And it's not like keeping the old switches
  working for
  a while is all that challenging from a technical standpoint.
 
  Even if all this does is break stuff when you finally phase out the
  old
  switches, telling people that will happen and giving them time to
  adjust
  will make them a lot less annoyed with the Kafka community when
 that
  happens.  They may still be annoyed, mind you, just not at you.
 (-:
 
-Steve
 
 
 
  On Apr 8, 2015, at 10:56 PM, Matthew Warhaftig 
  mwarhaf...@gmail.com
 
  wrote:
 
  The Tool Standardization KIP that Jiangjie started has been
 updated
  to
  contain proposal details:
 
 
 
 
 
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-14+-+Tools+Standa
  rdization
 
  Any feedback is appreciated.
 
  Thanks,
  Matt
 
 
 
 
 




-- 
Thanks,
Ewen


Re: KafkaConsumer poll always returns null

2015-05-19 Thread Ewen Cheslack-Postava
The new consumer in trunk is functional when used similarly to the old
SimpleConsumer, but none of the functionality corresponding to the high
level consumer is there yet (broker-based coordination for consumer
groups). There's not a specific timeline for the next release (i.e. when
it's ready).

On Tue, May 19, 2015 at 2:26 PM, Padgett, Ben bpadg...@illumina.com wrote:

 The links below shows the code is definitely in trunk.

 Does anyone know when the source in trunk might be released?

 Thanks!

 https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache
 /kafka/clients/consumer/KafkaConsumer.java#L634
 https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L634


 https://github.com/apache/kafka/blob/0.8.2/clients/src/main/java/org/apache
 /kafka/clients/consumer/KafkaConsumer.java#L553
 https://github.com/apache/kafka/blob/0.8.2/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L553


 On 5/19/15, 2:13 PM, Padgett, Ben bpadg...@illumina.com wrote:

 I came across this google group conversation that suggests KafkaConsumer
 will not be complete until the next release.
 (
 https://groups.google.com/forum/#!msg/kafka-clients/4VLb-_wI22c/imYRlxogo
 -kJ)
 
 
 ```
 
 org.apache.kafka.clients.consumer.KafkaConsumerString, String consumer
 = new org.apache.kafka.clients.consumer.KafkaConsumerString,
 String(consumerProps);
 
 consumer.subscribe(project-created);
 
 MapString, ConsumerRecordsString, String records = consumer.poll(100);
 assertNotNull(records);
 
 ```
 
 
 If I run this from the command line I receive many records:
 
 ```
 
 bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic
 project-created --from-beginning
 
 ```
 
 
 When using the maven dependency below it appears the older scala packages
 are not available.
 
 ```
 
 dependency
 groupIdorg.apache.kafka/groupId
 artifactIdkafka-clients/artifactId
 version0.8.2.0/version
 /dependency
 
 ```
 
 Is there a workaround for this?
 
 When is the next release expected to be released?
 
 Am I just using the consumer incorrectly?
 
 Thanks!




-- 
Thanks,
Ewen


Re: KafkaConsumer poll always returns null

2015-05-20 Thread Ewen Cheslack-Postava
I don't have any small examples handy, but the javadoc for KafkaConsumer
includes some examples. The one labeled Simple Processing should work
fine as long as you stick to a single consumer in the group.


On Wed, May 20, 2015 at 7:49 AM, Padgett, Ben bpadg...@illumina.com wrote:

 @Ewen Cheslack-Postava - do you have an example you could post?
 
 From: Ewen Cheslack-Postava [e...@confluent.io]
 Sent: Tuesday, May 19, 2015 3:12 PM
 To: users@kafka.apache.org
 Subject: Re: KafkaConsumer poll always returns null

 The new consumer in trunk is functional when used similarly to the old
 SimpleConsumer, but none of the functionality corresponding to the high
 level consumer is there yet (broker-based coordination for consumer
 groups). There's not a specific timeline for the next release (i.e. when
 it's ready).

 On Tue, May 19, 2015 at 2:26 PM, Padgett, Ben bpadg...@illumina.com
 wrote:

  The links below shows the code is definitely in trunk.
 
  Does anyone know when the source in trunk might be released?
 
  Thanks!
 
 
 https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache
  /kafka/clients/consumer/KafkaConsumer.java#L634
  
 https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L634
 
 
 
 
 https://github.com/apache/kafka/blob/0.8.2/clients/src/main/java/org/apache
  /kafka/clients/consumer/KafkaConsumer.java#L553
  
 https://github.com/apache/kafka/blob/0.8.2/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L553
 
 
 
  On 5/19/15, 2:13 PM, Padgett, Ben bpadg...@illumina.com wrote:
 
  I came across this google group conversation that suggests KafkaConsumer
  will not be complete until the next release.
  (
 
 https://groups.google.com/forum/#!msg/kafka-clients/4VLb-_wI22c/imYRlxogo
  -kJ)
  
  
  ```
  
  org.apache.kafka.clients.consumer.KafkaConsumerString, String consumer
  = new org.apache.kafka.clients.consumer.KafkaConsumerString,
  String(consumerProps);
  
  consumer.subscribe(project-created);
  
  MapString, ConsumerRecordsString, String records =
 consumer.poll(100);
  assertNotNull(records);
  
  ```
  
  
  If I run this from the command line I receive many records:
  
  ```
  
  bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic
  project-created --from-beginning
  
  ```
  
  
  When using the maven dependency below it appears the older scala
 packages
  are not available.
  
  ```
  
  dependency
  groupIdorg.apache.kafka/groupId
  artifactIdkafka-clients/artifactId
  version0.8.2.0/version
  /dependency
  
  ```
  
  Is there a workaround for this?
  
  When is the next release expected to be released?
  
  Am I just using the consumer incorrectly?
  
  Thanks!
 
 


 --
 Thanks,
 Ewen




-- 
Thanks,
Ewen


Re: offsets.storage=kafka, dual.commit.enabled=false still requires ZK

2015-06-09 Thread Ewen Cheslack-Postava
The new consumer implementation, which should be included in 0.8.3, only
needs a bootstrap.servers setting and does not use a zookeeper connection.

On Tue, Jun 9, 2015 at 1:26 PM, noah iamn...@gmail.com wrote:

 We are setting up a new Kafka project (0.8.2.1) and are trying to go
 straight to consumer offsets stored in Kafka. Unfortunately it looks like
 the Java consumer will try to connect to ZooKeeper regardless of the
 settings.

 Will/When will this dependency go away completely? It would simplify our
 deployments if our consumers didn't have to connect to ZooKeeper at all.

 P.S. I've asked this on Stack Overflow, if you would like to answer there
 for posterity:

 http://stackoverflow.com/questions/30719331/kafka-0-8-2-1-offsets-storage-kafka-still-requires-zookeeper




-- 
Thanks,
Ewen


Re: NoSuchMethodError with Consumer Instantiation

2015-06-18 Thread Ewen Cheslack-Postava
It looks like you have mixed up versions of the kafka jars:

4. kafka_2.11-0.8.3-SNAPSHOT.jar
5. kafka_2.11-0.8.2.1.jar
6. kafka-clients-0.8.2.1.jar

I think org.apache.kafka.common.utils.Utils is very new, probably post
0.8.2.1, so it's probably caused by the kafka_2.11-0.8.3-SNAPSHOT.jar being
used, and then trying to use a class which should be in the kafka-clients
jar, but since that jar is the old version kafka-clients-0.8.2.1.jar it
can't find the class.

-Ewen

On Thu, Jun 18, 2015 at 1:13 PM, Srividhya Anantharamakrishnan 
srivid...@hedviginc.com wrote:

 Sorry for spamming, but any help would be greatly appreciated!


 On Thu, Jun 18, 2015 at 10:49 AM, Srividhya Anantharamakrishnan 
 srivid...@hedviginc.com wrote:

  The following are the jars in my classpath:
 
  1. slf4j-log4j12-1.6.6.jar
  2. slf4j-api-1.6.6.jar
  3. zookeeper-3.4.6.jar
  4. kafka_2.11-0.8.3-SNAPSHOT.jar
  5. kafka_2.11-0.8.2.1.jar
  6. kafka-clients-0.8.2.1.jar
  7. metrics-core-2.2.0.jar
  8. scala-library-2.11.5.jar
  9. zkclient-0.3.jar
 
  Am I missing something?
 
  On Wed, Jun 17, 2015 at 9:15 PM, Jaikiran Pai jai.forums2...@gmail.com
  wrote:
 
  You probably have the wrong version of the Kafka jar(s) within your
  classpath. Which version of Kafka are you using and how have you setup
 the
  classpath?
 
  -Jaikiran
 
  On Thursday 18 June 2015 08:11 AM, Srividhya Anantharamakrishnan wrote:
 
  Hi,
 
  I am trying to set up Kafka in our cluster and I am running into the
  following error when Consumer is getting instantiated:
 
  java.lang.NoSuchMethodError:
 
 
 org.apache.kafka.common.utils.Utils.newThread(Ljava/lang/String;Ljava/lang/Runnable;Ljava/lang/Boolean;)Ljava/lang/Thread;
 
   at
  kafka.utils.KafkaScheduler$$anon$1.newThread(KafkaScheduler.scala:84)
 
   at
 
 
 java.util.concurrent.ThreadPoolExecutor$Worker.init(ThreadPoolExecutor.java:610)
 
   at
 
 
 java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:924)
 
   at
 
 
 java.util.concurrent.ThreadPoolExecutor.ensurePrestart(ThreadPoolExecutor.java:1590)
 
   at
 
 
 java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:333)
 
   at
 
 
 java.util.concurrent.ScheduledThreadPoolExecutor.scheduleAtFixedRate(ScheduledThreadPoolExecutor.java:570)
 
   at
 kafka.utils.KafkaScheduler.schedule(KafkaScheduler.scala:116)
 
   at
 
 
 kafka.consumer.ZookeeperConsumerConnector.init(ZookeeperConsumerConnector.scala:136)
 
   at
 
 
 kafka.javaapi.consumer.ZookeeperConsumerConnector.init(ZookeeperConsumerConnector.scala:65)
 
   at
 
 
 kafka.javaapi.consumer.ZookeeperConsumerConnector.init(ZookeeperConsumerConnector.scala:68)
 
   at
 
 
 kafka.consumer.Consumer$.createJavaConsumerConnector(ConsumerConnector.scala:120)
 
   at
 
 
 kafka.consumer.Consumer.createJavaConsumerConnector(ConsumerConnector.scala)
 
 
  I am guessing that it is missing certain classpath references. If that
 is
  the reason, could someone tell me which jar is it?
 
  If not, what is it that I am missing?
 
 
  *KafkaConsumer:*
 
 
  public KafkaConsumer(String topic)
 
  {
 
  * consumer =
  Consumer.createJavaConsumerConnector(createConsumerConfig());
  //line where the error is thrown*
 
this.topic = topic;
 
  }
 
private static ConsumerConfig createConsumerConfig()
 
  {
 
Properties props = new Properties();
 
   props.put(zookeeper.connect, IP:PORT);
 
   props.put(group.id, group1);
 
   props.put(zookeeper.session.timeout.ms, 6000);
 
   props.put(zookeeper.sync.time.ms, 2000);
 
   props.put(auto.commit.interval.ms, 6);
 
 
   return new ConsumerConfig(props);
 
}
 
 
  TIA!
 
 
 
 




-- 
Thanks,
Ewen


Re: No key specified when sending the message to Kafka

2015-06-23 Thread Ewen Cheslack-Postava
It does balance data, but is sticky over short periods of time (for some
definition of short...). See this FAQ for an explanation:
https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Whyisdatanotevenlydistributedamongpartitionswhenapartitioningkeyisnotspecified
?

This behavior has been changed in the new producer to work the way you
expected, and can be overridden by providing your own Partitioner interface.

On Tue, Jun 23, 2015 at 8:28 PM, bit1...@163.com bit1...@163.com wrote:

 I have the following code snippet that use Kafka Producer to send
 message(No key is specified in the KeyedMessage):
 val data = new KeyedMessage[String, String](topicName, msg);
 Kafka_Producer.send(data)

 Kafka_Producer is an instance of kafka.producer.Producer.


 With above code, I observed that the message sent to kafka is not
 partitioned(That is, all the messages are pushed to partition 0). If I give
 the message a key, they it can be partitioned across the topic.

 So, my question is: If no key is provided in the message , will Kafka
 producer not automatically partition the message with some built-in
 balancing algorithm?

 Thanks







 bit1...@163.com




-- 
Thanks,
Ewen


Re: Consumer rebalancing based on partition sizes?

2015-06-23 Thread Ewen Cheslack-Postava
Current partition assignment only has a few limited options -- see the
partition.assignment.strategy consumer option (which seems to be listed
twice, see the second version for a more detailed explanation). There has
been some discussion of making assignment strategies user extensible to
support use cases like this.

Is there a reason your data is unbalanced that might be avoidable? Ideally
good hashing of keys combined with a large enough number of keys with
reasonable data distribution across keys (not necessarily uniform) leads to
a reasonable balance, although there are certainly some workloads that are
so skewed that this doesn't work out.



On Tue, Jun 23, 2015 at 7:34 PM, Joel Ohman maelstrom.thunderb...@gmail.com
 wrote:

 Hello!

 I'm working with a topic of largely variable partition sizes. My biggest
 concern is that I have no control over which keys are assigned to which
 consumers in my consumer group, as the amount of data my consumer sees is
 directly reflected on it's work load. Is there a way to distribute
 partitions to consumers evenly  based on the size of each partition? The
 provided Consumer Rebalancing Algorithm prioritizes assigning consumers
 even numbers of partitions, regardless of their size.

 Regards,
 Joel




-- 
Thanks,
Ewen


Re: Kafka as an event store for Event Sourcing

2015-06-13 Thread Ewen Cheslack-Postava
Daniel: By random read, I meant not reading the data sequentially as is the
norm in Kafka, not necessarily a random disk seek. That in-memory data
structure is what enables the random read. You're either going to need the
disk seek if the data isn't in the fs cache or you're trading memory to
avoid it. If it's a full index containing keys and values then you're
potentially committing to a much larger JVM memory footprint (and all the
GC issues that come with it) since you'd be storing that data in the JVM
heap. If you're only storing the keys + offset info, then you potentially
introduce random disk seeks on any CAS operation (and making page caching
harder for the OS, etc.).


On Sat, Jun 13, 2015 at 11:33 AM, Daniel Schierbeck 
daniel.schierb...@gmail.com wrote:

 Ewen: would single-key CAS necessitate random reads? My idea was to have
 the broker maintain an in-memory table that could be rebuilt from the log
 or a snapshot.
 On lør. 13. jun. 2015 at 20.26 Ewen Cheslack-Postava e...@confluent.io
 wrote:

  Jay - I think you need broker support if you want CAS to work with
  compacted topics. With the approach you described you can't turn on
  compaction since that would make it last-writer-wins, and using any
  non-infinite retention policy would require some external process to
  monitor keys that might expire and refresh them by rewriting the data.
 
  That said, I think any addition like this warrants a lot of discussion
  about potential use cases since there are a lot of ways you could go
 adding
  support for something like this. I think this is an obvious next
  incremental step, but someone is bound to have a use case that would
  require multi-key CAS and would be costly to build atop single key CAS.
 Or,
  since the compare requires a random read anyway, why not throw in
  read-by-key rather than sequential log reads, which would allow for
  minitransactions a la Sinfonia?
 
  I'm not convinced trying to make Kafka support traditional key-value
 store
  functionality is a good idea. Compacted topics made it possible to use
 it a
  bit more in that way, but didn't change the public interface, only the
 way
  storage was implemented, and importantly all the potential additional
  performance costs  data structures are isolated to background threads.
 
  -Ewen
 
  On Sat, Jun 13, 2015 at 9:59 AM, Daniel Schierbeck 
  daniel.schierb...@gmail.com wrote:
 
   @Jay:
  
   Regarding your first proposal: wouldn't that mean that a producer
  wouldn't
   know whether a write succeeded? In the case of event sourcing, a failed
  CAS
   may require re-validating the input with the new state. Simply
 discarding
   the write would be wrong.
  
   As for the second idea: how would a client of the writer service know
  which
   writer is the leader? For example, how would a load balancer know which
  web
   app process to route requests to? Ideally, all processes would be able
 to
   handle requests.
  
   Using conditional writes would allow any producer to write and provide
   synchronous feedback to the producers.
   On fre. 12. jun. 2015 at 18.41 Jay Kreps j...@confluent.io wrote:
  
I have been thinking a little about this. I don't think CAS actually
requires any particular broker support. Rather the two writers just
  write
messages with some deterministic check-and-set criteria and all the
replicas read from the log and check this criteria before applying
 the
write. This mechanism has the downside that it creates additional
  writes
when there is a conflict and requires waiting on the full roundtrip
   (write
and then read) but it has the advantage that it is very flexible as
 to
   the
criteria you use.
   
An alternative strategy for accomplishing the same thing a bit more
efficiently is to elect leaders amongst the writers themselves. This
   would
require broker support for single writer to avoid the possibility of
   split
brain. I like this approach better because the leader for a partition
  can
then do anything they want on their local data to make the decision
 of
   what
is committed, however the downside is that the mechanism is more
   involved.
   
-Jay
   
On Fri, Jun 12, 2015 at 6:43 AM, Ben Kirwin b...@kirw.in wrote:
   
 Gwen: Right now I'm just looking for feedback -- but yes, if folks
  are
 interested, I do plan to do that implementation work.

 Daniel: Yes, that's exactly right. I haven't thought much about
 per-key... it does sound useful, but the implementation seems a bit
 more involved. Want to add it to the ticket?

 On Fri, Jun 12, 2015 at 7:49 AM, Daniel Schierbeck
 daniel.schierb...@gmail.com wrote:
  Ben: your solutions seems to focus on partition-wide CAS. Have
 you
  considered per-key CAS? That would make the feature more useful
 in
  my
  opinion, as you'd greatly reduce the contention.
 
  On Fri, Jun 12, 2015 at 6:54 AM Gwen Shapira 
  gshap

Re: Kafka as an event store for Event Sourcing

2015-06-13 Thread Ewen Cheslack-Postava
If you do CAS where you compare the offset of the current record for the
key, then yes. This might work fine for applications that track key, value,
and offset. It is not quite the same as doing a normal CAS.

On Sat, Jun 13, 2015 at 12:07 PM, Daniel Schierbeck 
daniel.schierb...@gmail.com wrote:

 But wouldn't the key-offset table be enough to accept or reject a write?
 I'm not familiar with the exact implementation of Kafka, so I may be wrong.

 On lør. 13. jun. 2015 at 21.05 Ewen Cheslack-Postava e...@confluent.io
 wrote:

  Daniel: By random read, I meant not reading the data sequentially as is
 the
  norm in Kafka, not necessarily a random disk seek. That in-memory data
  structure is what enables the random read. You're either going to need
 the
  disk seek if the data isn't in the fs cache or you're trading memory to
  avoid it. If it's a full index containing keys and values then you're
  potentially committing to a much larger JVM memory footprint (and all the
  GC issues that come with it) since you'd be storing that data in the JVM
  heap. If you're only storing the keys + offset info, then you potentially
  introduce random disk seeks on any CAS operation (and making page caching
  harder for the OS, etc.).
 
 
  On Sat, Jun 13, 2015 at 11:33 AM, Daniel Schierbeck 
  daniel.schierb...@gmail.com wrote:
 
   Ewen: would single-key CAS necessitate random reads? My idea was to
 have
   the broker maintain an in-memory table that could be rebuilt from the
 log
   or a snapshot.
   On lør. 13. jun. 2015 at 20.26 Ewen Cheslack-Postava 
 e...@confluent.io
   wrote:
  
Jay - I think you need broker support if you want CAS to work with
compacted topics. With the approach you described you can't turn on
compaction since that would make it last-writer-wins, and using any
non-infinite retention policy would require some external process to
monitor keys that might expire and refresh them by rewriting the
 data.
   
That said, I think any addition like this warrants a lot of
 discussion
about potential use cases since there are a lot of ways you could go
   adding
support for something like this. I think this is an obvious next
incremental step, but someone is bound to have a use case that would
require multi-key CAS and would be costly to build atop single key
 CAS.
   Or,
since the compare requires a random read anyway, why not throw in
read-by-key rather than sequential log reads, which would allow for
minitransactions a la Sinfonia?
   
I'm not convinced trying to make Kafka support traditional key-value
   store
functionality is a good idea. Compacted topics made it possible to
 use
   it a
bit more in that way, but didn't change the public interface, only
 the
   way
storage was implemented, and importantly all the potential additional
performance costs  data structures are isolated to background
 threads.
   
-Ewen
   
On Sat, Jun 13, 2015 at 9:59 AM, Daniel Schierbeck 
daniel.schierb...@gmail.com wrote:
   
 @Jay:

 Regarding your first proposal: wouldn't that mean that a producer
wouldn't
 know whether a write succeeded? In the case of event sourcing, a
  failed
CAS
 may require re-validating the input with the new state. Simply
   discarding
 the write would be wrong.

 As for the second idea: how would a client of the writer service
 know
which
 writer is the leader? For example, how would a load balancer know
  which
web
 app process to route requests to? Ideally, all processes would be
  able
   to
 handle requests.

 Using conditional writes would allow any producer to write and
  provide
 synchronous feedback to the producers.
 On fre. 12. jun. 2015 at 18.41 Jay Kreps j...@confluent.io wrote:

  I have been thinking a little about this. I don't think CAS
  actually
  requires any particular broker support. Rather the two writers
 just
write
  messages with some deterministic check-and-set criteria and all
 the
  replicas read from the log and check this criteria before
 applying
   the
  write. This mechanism has the downside that it creates additional
writes
  when there is a conflict and requires waiting on the full
 roundtrip
 (write
  and then read) but it has the advantage that it is very flexible
 as
   to
 the
  criteria you use.
 
  An alternative strategy for accomplishing the same thing a bit
 more
  efficiently is to elect leaders amongst the writers themselves.
  This
 would
  require broker support for single writer to avoid the possibility
  of
 split
  brain. I like this approach better because the leader for a
  partition
can
  then do anything they want on their local data to make the
 decision
   of
 what
  is committed, however the downside is that the mechanism is more
 involved.
 
  -Jay
 
  On Fri

Re: Kafka as an event store for Event Sourcing

2015-06-13 Thread Ewen Cheslack-Postava
Jay - I think you need broker support if you want CAS to work with
compacted topics. With the approach you described you can't turn on
compaction since that would make it last-writer-wins, and using any
non-infinite retention policy would require some external process to
monitor keys that might expire and refresh them by rewriting the data.

That said, I think any addition like this warrants a lot of discussion
about potential use cases since there are a lot of ways you could go adding
support for something like this. I think this is an obvious next
incremental step, but someone is bound to have a use case that would
require multi-key CAS and would be costly to build atop single key CAS. Or,
since the compare requires a random read anyway, why not throw in
read-by-key rather than sequential log reads, which would allow for
minitransactions a la Sinfonia?

I'm not convinced trying to make Kafka support traditional key-value store
functionality is a good idea. Compacted topics made it possible to use it a
bit more in that way, but didn't change the public interface, only the way
storage was implemented, and importantly all the potential additional
performance costs  data structures are isolated to background threads.

-Ewen

On Sat, Jun 13, 2015 at 9:59 AM, Daniel Schierbeck 
daniel.schierb...@gmail.com wrote:

 @Jay:

 Regarding your first proposal: wouldn't that mean that a producer wouldn't
 know whether a write succeeded? In the case of event sourcing, a failed CAS
 may require re-validating the input with the new state. Simply discarding
 the write would be wrong.

 As for the second idea: how would a client of the writer service know which
 writer is the leader? For example, how would a load balancer know which web
 app process to route requests to? Ideally, all processes would be able to
 handle requests.

 Using conditional writes would allow any producer to write and provide
 synchronous feedback to the producers.
 On fre. 12. jun. 2015 at 18.41 Jay Kreps j...@confluent.io wrote:

  I have been thinking a little about this. I don't think CAS actually
  requires any particular broker support. Rather the two writers just write
  messages with some deterministic check-and-set criteria and all the
  replicas read from the log and check this criteria before applying the
  write. This mechanism has the downside that it creates additional writes
  when there is a conflict and requires waiting on the full roundtrip
 (write
  and then read) but it has the advantage that it is very flexible as to
 the
  criteria you use.
 
  An alternative strategy for accomplishing the same thing a bit more
  efficiently is to elect leaders amongst the writers themselves. This
 would
  require broker support for single writer to avoid the possibility of
 split
  brain. I like this approach better because the leader for a partition can
  then do anything they want on their local data to make the decision of
 what
  is committed, however the downside is that the mechanism is more
 involved.
 
  -Jay
 
  On Fri, Jun 12, 2015 at 6:43 AM, Ben Kirwin b...@kirw.in wrote:
 
   Gwen: Right now I'm just looking for feedback -- but yes, if folks are
   interested, I do plan to do that implementation work.
  
   Daniel: Yes, that's exactly right. I haven't thought much about
   per-key... it does sound useful, but the implementation seems a bit
   more involved. Want to add it to the ticket?
  
   On Fri, Jun 12, 2015 at 7:49 AM, Daniel Schierbeck
   daniel.schierb...@gmail.com wrote:
Ben: your solutions seems to focus on partition-wide CAS. Have you
considered per-key CAS? That would make the feature more useful in my
opinion, as you'd greatly reduce the contention.
   
On Fri, Jun 12, 2015 at 6:54 AM Gwen Shapira gshap...@cloudera.com
   wrote:
   
Hi Ben,
   
Thanks for creating the ticket. Having check-and-set capability will
  be
sweet :)
Are you planning to implement this yourself? Or is it just an idea
 for
the community?
   
Gwen
   
On Thu, Jun 11, 2015 at 8:01 PM, Ben Kirwin b...@kirw.in wrote:
 As it happens, I submitted a ticket for this feature a couple days
   ago:

 https://issues.apache.org/jira/browse/KAFKA-2260

 Couldn't find any existing proposals for similar things, but it's
 certainly possible they're out there...

 On the other hand, I think you can solve your particular issue by
 reframing the problem: treating the messages as 'requests' or
 'commands' instead of statements of fact. In your flight-booking
 example, the log would correctly reflect that two different people
 tried to book the same flight; the stream consumer would be
 responsible for finalizing one booking, and notifying the other
  client
 that their request had failed. (In-browser or by email.)

 On Wed, Jun 10, 2015 at 5:04 AM, Daniel Schierbeck
 daniel.schierb...@gmail.com wrote:
 I've been working on an application which uses 

Re: Batch producer latencies and flush()

2015-06-28 Thread Ewen Cheslack-Postava
The logic you're requesting is basically what the new producer implements.
The first condition is the batch size limit and the second is linger.ms.
The actual logic is a bit more complicated and has some caveats dealing
with, for example, backing off after failures, but you can see in this code

https://github.com/apache/kafka/blob/0.8.2.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L222

that the two normal conditions that will trigger a send are full and
expired.

Note that increasing batch size and linger ms will generally *increase*
your latency -- in most cases their effect is to make messages wait longer
on the client before being sent because it can result in higher throughput.
There may be edge cases where this isn't the case (e.g. high latencies to
the broker can cause a low linger.ms to have a negative effect in
combination with max.in.flight.requests.per.connection), but usually this
will be the case.

For the specific case you gave with increasing batch size, I would guess it
stopped having any effect because after 64KB you were never getting full
batches -- they were probably being sent out due to linger.ms expiring with
few enough in flight requests before the batch was full.

Maybe giving some more concrete numbers for the settings and some idea of
message size + message rate in specific instances would allow people to
suggest tweaks that might help?

-Ewen

On Sun, Jun 28, 2015 at 11:17 AM, Achanta Vamsi Subhash 
achanta.va...@flipkart.com wrote:

 *bump*

 On Tue, Jun 23, 2015 at 1:03 PM, Achanta Vamsi Subhash 
 achanta.va...@flipkart.com wrote:

  Hi,
 
  We are using the batch producer of 0.8.2.1 and we are getting very bad
  latencies for the topics. We have ~40K partitions now in a 20-node
 cluster.
 
  - We have many topics and each with messages published to them varying.
  Ex: some topics take 10k/sec and other 2000/minute.
  - We are seeing latencies of 99th percentile 2sec and 95th percentile of
  1sec.
  - The current parameters that are tunable are batch size, buffer size and
  linger. We monitor the metrics for the new producer and tuned the above
  accordingly. Still, we are not able to get any improvements. Batch size
 in
  a sense didn't matter after increasing from 64KB (we increased it till
 1MB).
  - We also noticed that the record queue time is high (2-3sec).
  Documentation describes that this is the time records wait in the
  accumulator to be sent.
 
  Later looking at the code in the trunk, I see that the batch size set is
  same for all the TopicPartitions and each have their own RecordBatch.
 Also,
  flush() method is added in the latest code.
 
  We want to have an upper bound on the latencies for every message push
  irrespective of the incoming rate. Can we achieve it by following logic:
 
  - Wait until X-Kb of batch size / Topic Partition is reached
  (or)
  - Wait for Y-ms
 
  If either of them is reached, flush the producer records. Can this be
 part
  of the producer code itself? This will avoid the case of records getting
  accumulated for 2-3 sec.
 
  Please correct me if the analysis is wrong and suggest me on how do we
  improve latencies of the new producer. Thanks.
 
  --
  Regards
  Vamsi Subhash
 



 --
 Regards
 Vamsi Subhash

 --



 --

 This email and any files transmitted with it are confidential and intended
 solely for the use of the individual or entity to whom they are addressed.
 If you have received this email in error please notify the system manager.
 This message contains confidential information and is intended only for the
 individual named. If you are not the named addressee you should not
 disseminate, distribute or copy this e-mail. Please notify the sender
 immediately by e-mail if you have received this e-mail by mistake and
 delete this e-mail from your system. If you are not the intended recipient
 you are notified that disclosing, copying, distributing or taking any
 action in reliance on the contents of this information is strictly
 prohibited. Although Flipkart has taken reasonable precautions to ensure no
 viruses are present in this email, the company cannot accept responsibility
 for any loss or damage arising from the use of this email or attachments




-- 
Thanks,
Ewen


Re: How Producer handles Network Connectivity Issues

2015-05-26 Thread Ewen Cheslack-Postava
It's not being switched in this case because the broker hasn't failed. It
can still connect to all the other brokers and zookeeper. The only failure
is of the link between a client and the broker.

Another way to think of this is to extend the scenario with more producers.
If I have 100 other producers and they can all still connect, would you
still consider this a failure and expect the leader to change? Since
network partitions (or periods of high latency, or long GC pauses, etc) can
happen arbitrarily and clients might be spread far and wide, you can't rely
on their connectivity as an indicator of the health of the Kafka broker.

Of course, there's also a pretty big practical issue: since the client
can't connect to the broker, how would it even report that it has a
connectivity issue?

-Ewen

On Mon, May 25, 2015 at 10:05 PM, Kamal C kamaltar...@gmail.com wrote:

 Hi,

 I have a cluster of 3 Kafka brokers and a remote producer. Producer
 started to send messages to *SampleTopic*. Then I blocked the network
 connectivity between the Producer and the leader node for the topic
 *SampleTopic* but network connectivity is healthy between the cluster and
 producer is able to reach the other two nodes.

 *With Script*

 sh kafka-topics.sh --zookeeper localhost --describe
 Topic:SampleTopicPartitionCount:1ReplicationFactor:3Configs:
 Topic: SampleTopicPartition: 0Leader: 1Replicas: 1,2,0
 Isr: 1,2,0


 Producer tries forever to reach the leader node by throwing connection
 refused exception. I understand that when there is a node failure leader
 gets switched. Why it's not switching the leader in this scenario ?

 --
 Kamal C




-- 
Thanks,
Ewen


Re: bootstrap.servers for the new Producer

2015-08-21 Thread Ewen Cheslack-Postava
Are you seeing this in practice or is this just a concern about the way the
code currently works? If the broker is actually down and the host is
rejecting connections, the situation you describe shouldn't be a problem.
It's true that the NetworkClient chooses a fixed nodeIndexOffset, but the
expectation is that if we run one iteration of leastLoadedNode and select a
node, we'll try to connect and any failure will be handled by putting that
node into a blackout period during which subsequent calls to
leastLoadedNode will give priority to other options. If your server is
*not* explicitly rejecting connections, I think it could be possible that
we end up hanging for a long while just waiting for that connection. If
this is the case (e.g., if you are running on EC2 and it has this behavior
-- I believe default firewall rules will not kill the connection), this
would be useful to know.

A couple of bugs you might want to be aware of:

https://issues.apache.org/jira/browse/KAFKA-1843 is meant to generally
address the fact that there are a lot of states that we could be in, and
the way we handle them (especially with leastLoadedNode), may not work well
in all cases. It's very difficult to be comprehensive here, so if there is
a scenario that is not failing for you, the more information you can give
about the state of the system and the producer, the better.

https://issues.apache.org/jira/browse/KAFKA-1842 might also be relevant --
right now we rely on the underlying TCP connection timeouts, but this is
definitely not ideal. They can be quite long by default, and we might want
to consider connections failed much sooner.

I also could have sworn there was a JIRA filed about the fact that the
bootstrap servers are never reused, but I can't find it at the moment -- in
some cases, if you have no better option then it would be best to revert
back to the original set of bootstrap servers for loading metadata. This
can especially become a problem in some cases where your only producing to
one or a small number of topics and therefore only have metadata for a
couple of servers. If anything happens to those servers too quickly (within
the metadata refresh period) you might potentially get stuck with only
references to dead nodes.

-Ewen

On Fri, Aug 21, 2015 at 6:56 PM, Kishore Senji kse...@gmail.com wrote:

 If one of the broker we specify in the bootstrap servers list is down,
 there is a chance that the Producer (a brand new instance with no prior
 metadata) will never be able to publish anything to Kafka until that broker
 is up. Because the logic for getting the initial metadata is based on some
 random index to the set of bootstrap nodes and if it happens to be the down
 node, Kafka producer keeps on trying to get the metadata on that node only.
 It is never switched to another node. Without metadata, the Producer can
 never send anything.

 The nodeIndexOffset is chosen at the creation of the NetworkClient (and
 this offset is not changed when we fail to get a new connection) and so for
 getting the metadata for the first time, there is a possibility that we
 keep on trying on the broker that is down.

 This can be a problem if a broker goes down and also a Producer is
 restarted or a new instance is brought up. Is this a known issue?




-- 
Thanks,
Ewen


Re: bootstrap.servers for the new Producer

2015-08-22 Thread Ewen Cheslack-Postava
You're just seeing that exception in the debugger, not the log, right?

ConnectException is an IOException, so it should be caught by this block
https://github.com/apache/kafka/blob/0.8.2.1/clients/src/main/java/org/apache/kafka/common/network/Selector.java#L271
, logged, and then the SelectionKey should be closed. Part of the close
process adds it to the list of disconnections if there's an associated
transmission object (which there should be, it is set up in the connect()
call). This list is then processed by NetworkClient in
handleDisconnections, which is invoked after the poll() call. That, in
turn, marks the node as disconnected via the ClusterConnectionStates
object. So it should still be getting marked as disconnected.

However, maybe the issues is in the way we handle the blackout period. The
corresponding setting is ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG (
reconnect.backoff.ms). The default value is 10ms. However, the way this is
currently handled, we mark the time when we *start* the connection attempt.
If it takes more than 10ms for the connection attempt to fail, then the
blackout period wouldn't actually apply since the the period would have
already elapsed. If that happened, then leastLoadedNode would indeed
continue to select the same node repeatedly.

Can you tell from the logs how long the connection attempts are taking? You
could try increasing the backoff time, although that has broader impact
that could be negative (e.g., if a broker is temporarily down and you
aren't stuck in this metadata fetch state, it increases the amount of time
before you can start producing to that broker again). However, if you can't
verify that this is the problem from the logs, it might at least help to
verify in a test environment.

I've filed https://issues.apache.org/jira/browse/KAFKA-2459 for that issue.

-Ewen


On Fri, Aug 21, 2015 at 11:42 PM, Kishore Senji kse...@gmail.com wrote:

 Thank you Ewen. This behavior is something that I'm observing. I see in the
 logs continuous Connect failures to the dead broker.

 The important thing here is I'm starting a brand new instance of the
 Producer after a broker is down (so no prior metadata), with that down
 broker also as part of the bootstrap list. With the brand new instance all
 requests to send are blocked until the metadata is fetched. The metadata
 fetching is where I'm seeing the issue. Currently the code randomly picks a
 node to fetch the metadata and if it happens to the down node, I see the
 connect failure and then it tries to fetch metadata again from the same
 node (I do not see it going to black out because the status is always
 CONNECTING and other nodes are not yet connected). This goes on forever
 until I either bring the broker up or kill  restart the Producer and
 on-restart if it picks a different node then it works to get the metadata.
 Once it gets the metadata, it is fine as like you described above, it
 updates the Cluster nodes.

 This can be a problem because we have to give a standard set of bootstrap
 brokers across multiple producers whose lifecycle is not in control. The
 producers can go down and a new instance can be brought up just like the
 brokers where we expect a broker going down (so we do more partitioning and
 replications)

 I get this exception -

 java.net.ConnectException: Connection refused: no further information
 at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[na:1.7.0_67]
 at sun.nio.ch.SocketChannelImpl.finishConnect(Unknown Source)
 ~[na:1.7.0_67]
 at org.apache.kafka.common.network.Selector.poll(Selector.java:238)
 ~[kafka-clients-0.8.2.1.jar:na]
 at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192)
 [kafka-clients-0.8.2.1.jar:na]
 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191)
 [kafka-clients-0.8.2.1.jar:na]
 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122)
 [kafka-clients-0.8.2.1.jar:na]
 at java.lang.Thread.run(Unknown Source) [na:1.7.0_67]

 I think the status never goes to blackout because this exception really
 happens in the poll() and not in the connect() method, which is also
 mentioned in the javadoc that the call is only initiated (and as long as
 the dns entry is there) it only fails to connect in the poll() method. And
 in the poll() method the status is not reset to DISCONNECTED and so it not
 blacked out.


 On Fri, Aug 21, 2015 at 10:06 PM, Ewen Cheslack-Postava e...@confluent.io
 
 wrote:

  Are you seeing this in practice or is this just a concern about the way
 the
  code currently works? If the broker is actually down and the host is
  rejecting connections, the situation you describe shouldn't be a problem.
  It's true that the NetworkClient chooses a fixed nodeIndexOffset, but the
  expectation is that if we run one iteration of leastLoadedNode and
 select a
  node, we'll try to connect and any failure will be handled by putting
 that
  node into a blackout period during which subsequent calls

Re: Cache Memory Kafka Process

2015-07-28 Thread Ewen Cheslack-Postava
Nilesh,

It's expected that a lot of memory is used for cache. This makes sense
because under the hood, Kafka mostly just reads and writes data to/from
files. While Kafka does manage some in-memory data, mostly it is writing
produced data (or replicated data) to log files and then serving those same
messages to consumers directly out of the log files. It relies on OS-level
file system caching optimize how data is managed. Operating systems are
already designed to do this well, so it's generally better to reuse this
functionality than to try to implement a custom caching layer.

So when you see most of your memory consumed as cache, that's because the
OS has used the access patterns for data in those files to select which
parts of different files seem most likely to be useful in the future. As
Daniel's link points out, it's only doing this when that memory is not
needed for some other purpose.

This approach isn't always perfect. If you have too much data to fit in
memory and you scan through it, performance will suffer. Eventually, you
will hit regions of files that are not in cache and the OS will be forced
to read those off disk, which is much slower than reading from cache.

From your description I'm not sure if you have 120 partitions *per topic*
or *total* across all topics. Let's go with the lesser, 120 partitions
total. You also mention 3 brokers. Dividing 120 partitions across 3
brokers, we get about 40 partitions each broker is a leader for, which is
data it definitely needs cached in order to serve consumers. You didn't
mention the replication factor, so let's just ignore it here and assume the
lowest possible, only 1 copy of the data. Even so, it looks like you have
~8GB of memory (based on the free -m numbers), and at 15 MB/message with 40
partitions per broker, that's only 8192/(15*40) = ~14 messages per
partition that would fit in memory, assuming it was all used for file
cache. That's not much, so if your total data stored is much larger and you
ever have to read through any old data, your throughput will likely suffer.

It's hard to say much more without understanding what your workload is
like, if you're consuming data other than what the Storm spout is
consuming, the rate at which you're producing data, etc. However, my
initial impression is that you may be trying to process too much data with
too little memory and too little disk throughput.

If you want more details, I'd suggest reading this section of the docs,
which further explains how a lot of this stuff works:
http://kafka.apache.org/documentation.html#persistence

-Ewen

On Mon, Jul 27, 2015 at 11:19 PM, Nilesh Chhapru 
nilesh.chha...@ugamsolutions.com wrote:

 Hi Ewen,

 I am using 3 brokers with 12 topic and near about 120-125 partitions
 without any replication and the message size is approx 15MB/message.

 The problem is when the cache memory increases and reaches to the max
 available the performance starts degrading also i am using Storm spot as
 consumer which  stops reading at times.

 When i do a free -m on my broker node after 1/2 - 1 hr the memory foot
 print is as follows.
 1) Physical memory - 500 MB - 600 MB
 2) Cache Memory - 6.5 GB
 3) Free Memory - 50 - 60 MB

 Regards,
 Nilesh Chhapru.

 On Monday 27 July 2015 11:02 PM, Ewen Cheslack-Postava wrote:
  Having the OS cache the data in Kafka's log files is useful since it
 means
  that data doesn't need to be read back from disk when consumed. This is
  good for the latency and throughput of consumers. Usually this caching
  works out pretty well, keeping the latest data from your topics in cache
  and only pulling older data into memory if a consumer reads data from
  earlier in the log. In other words, by leveraging OS-level caching of
  files, Kafka gets an in-memory caching layer for free.
 
  Generally you shouldn't need to clear this data -- the OS should only be
  using memory that isn't being used anyway. Is there a particular problem
  you're encountering that clearing the cache would help with?
 
  -Ewen
 
  On Mon, Jul 27, 2015 at 2:33 AM, Nilesh Chhapru 
  nilesh.chha...@ugamsolutions.com wrote:
 
  Hi All,
 
  I am facing issues with kafka broker process taking  a lot of cache
  memory, just wanted to know if the process really need that much of
  cache memory, or can i clear the OS level cache by setting a cron.
 
  Regards,
  Nilesh Chhapru.
 
 
 




-- 
Thanks,
Ewen


Re: Cache Memory Kafka Process

2015-07-29 Thread Ewen Cheslack-Postava
On Tue, Jul 28, 2015 at 11:29 PM, Nilesh Chhapru 
nilesh.chha...@ugamsolutions.com wrote:

 Hi Ewen,

 Thanks for reply.
 The assumptions that you made for replication and partitions are
 correct, 120 is total number of partitions and replication factor is 1
 for all the topics.

 Does that mean that a broker will keep all the messages that are
 produced in memory, or will only the unconsumed messages.


The operating system is caching the data, not Kafka. So there's no policy
in Kafka that controls caching at that level. If you have consumers that
repeatedly consume old data, the operating system will cache those sections
of the files. If consumers are normally at the end of the logs, the
operating system will cache those parts of the log files. (In fact, this
doesn't even happen at the granularity of messages, this cache operates at
the granularity of pages: https://en.wikipedia.org/wiki/Page_cache)

But this is a good thing. If something else needed that memory, the OS
would just get rid of the cached data, opting to read the data back from
disk if it was needed again in the future. It's very unlikely that clearing
any of this data would affect the performance of your workload. If you're
seeing degradation of performance due to memory usage, it probably means
you're simply trying to access more data than fits in memory and end up
being limited by disk throughput as data needs to be reloaded.



 is there a way we can restrict this to only x number of messages or x MB
 of total data  in memory.


This works at the operating system level. You can adjust the retention
policies, which would just delete the data (and by definition that will
also take it out of cache), but you probably don't want to lose that data
completely.

Think of it this way: if you applied the type of restriction you're talking
about, what data would you have discarded? Are any of your applications
currently accessing the data that would have been discarded, e.g. because
they are resetting to the beginning of the log and scanning through the
full data set? If the answer is yes, then another way to view the situation
is that its your applications that are misbehaving in the sense that they
exhibit bad data access patterns that aren't actually required, resulting
in accessing more data than necessary which doesn't fit in memory an
therefore reduces your throughput.

-Ewen



 Regards,
 Nilesh Chhapru.

 On Tuesday 28 July 2015 12:37 PM, Ewen Cheslack-Postava wrote:
  Nilesh,
 
  It's expected that a lot of memory is used for cache. This makes sense
  because under the hood, Kafka mostly just reads and writes data to/from
  files. While Kafka does manage some in-memory data, mostly it is writing
  produced data (or replicated data) to log files and then serving those
 same
  messages to consumers directly out of the log files. It relies on
 OS-level
  file system caching optimize how data is managed. Operating systems are
  already designed to do this well, so it's generally better to reuse this
  functionality than to try to implement a custom caching layer.
 
  So when you see most of your memory consumed as cache, that's because the
  OS has used the access patterns for data in those files to select which
  parts of different files seem most likely to be useful in the future. As
  Daniel's link points out, it's only doing this when that memory is not
  needed for some other purpose.
 
  This approach isn't always perfect. If you have too much data to fit in
  memory and you scan through it, performance will suffer. Eventually, you
  will hit regions of files that are not in cache and the OS will be forced
  to read those off disk, which is much slower than reading from cache.
 
  From your description I'm not sure if you have 120 partitions *per topic*
  or *total* across all topics. Let's go with the lesser, 120 partitions
  total. You also mention 3 brokers. Dividing 120 partitions across 3
  brokers, we get about 40 partitions each broker is a leader for, which is
  data it definitely needs cached in order to serve consumers. You didn't
  mention the replication factor, so let's just ignore it here and assume
 the
  lowest possible, only 1 copy of the data. Even so, it looks like you have
  ~8GB of memory (based on the free -m numbers), and at 15 MB/message with
 40
  partitions per broker, that's only 8192/(15*40) = ~14 messages per
  partition that would fit in memory, assuming it was all used for file
  cache. That's not much, so if your total data stored is much larger and
 you
  ever have to read through any old data, your throughput will likely
 suffer.
 
  It's hard to say much more without understanding what your workload is
  like, if you're consuming data other than what the Storm spout is
  consuming, the rate at which you're producing data, etc. However, my
  initial impression is that you may be trying to process too much data
 with
  too little memory and too little disk throughput.
 
  If you want more details, I'd

Re: best way to call ReassignPartitionsCommand programmatically

2015-08-10 Thread Ewen Cheslack-Postava
It's not public API so it may not be stable between releases, but you could
try using the ReassignPartitionsCommand class directly. Or, you can see
that the code in that class is a very simple use of ZkUtils, so you could
just make the necessary calls to ZkUtils directly.

In the future, when KIP-4 (
https://cwiki.apache.org/confluence/display/KAFKA/KIP-4+-+Command+line+and+centralized+administrative+operations)
is implemented, we'll have public, supported APIs for these types of
commands.


On Wed, Aug 5, 2015 at 7:41 PM, tao xiao xiaotao...@gmail.com wrote:

 Hi,

 I have a requirement that needs to call partition reassignment inside Java
 code. At the current implementation of ReassignPartitionsCommand it expects
 a json file to be passed in. Other than generating a json file and save it
 somewhere in my code what are other options that I can invoke the command
 like passing a json string directly?




-- 
Thanks,
Ewen


Re: How to read messages from Kafka by specific time?

2015-08-10 Thread Ewen Cheslack-Postava
You can use SimpleConsumer.getOffsetsBefore to get a list of offsets before
a Unix timestamp. However, this isn't per-message. The offests returned are
for the log segments stored on the broker, so the granularity will depend
on your log rolling settings.

-Ewen

On Wed, Aug 5, 2015 at 2:11 AM, shahab shahab.mok...@gmail.com wrote:

 Hi,

 Probably this question has been already asked before, but I couldn't find
 it,

 I would like to fetch data from kafka by timestamp, and according to Kafk
 FAQ (

 https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-HowdoIaccuratelygetoffsetsofmessagesforacertaintimestampusingOffsetRequest
 ?)
 Kafka allows querying offsets of messages by time, I tried to use
 UnixTimeStamp instead  in the offset request, but every time I got an empty
 array, simply it didn't work.

 Based on my google search this is not possible, but Kafka FAQ states that
 this is possible!
 Does any one know how to do this? I do appreciate it.

 best,
 /Shahab




-- 
Thanks,
Ewen


Re: how to get single record from kafka topic+partition @ specified offset

2015-08-10 Thread Ewen Cheslack-Postava
Right now I think the only place the new API is documented is in the
javadocs. Here are the relevant sections for replacing the simple consumer.

Subscribing to specific partitions:
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L204
Seeking to specific partitions:
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L282

With the new API you'll just need to do something like this:

TopicPartition tp = new TopicPartition(topic, 1);
long offset = 100;

KafkaConsumer consumer = new KafkaConsumerObject,Object(props);
consumer.subscribe(tp);
consumer.seek(tp, offset);
while(true) {
   ConsumerRecords records = consumer.poll();
   if (!records.isEmpty()) {
  // records[0] will be the message you wanted
  break;
   }
}



On Mon, Aug 10, 2015 at 3:52 PM, Joe Lawson 
jlaw...@opensourceconnections.com wrote:

 Ewen,

 Do you have an example or link for the changes/plans that will bring the
 benefits you describe?

 Cheers,

 Joe Lawson
 On Aug 10, 2015 3:27 PM, Ewen Cheslack-Postava e...@confluent.io
 wrote:

  You can do this using the SimpleConsumer. See
 
 
 https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example
  for details with some code.
 
  When the new consumer is released in 0.8.3, this will get a *lot*
 simpler.
 
  -Ewen
 
  On Fri, Aug 7, 2015 at 9:26 AM, Padgett, Ben bpadg...@illumina.com
  wrote:
 
   Does anyone have an example of how to get a single record from a
   topic+partition given a specific offset?
  
   I am interested in this for some retry logic for failed messages.
  
   Thanks!
  
  
 
 
  --
  Thanks,
  Ewen
 




-- 
Thanks,
Ewen


Re: how to get single record from kafka topic+partition @ specified offset

2015-08-10 Thread Ewen Cheslack-Postava
You can do this using the SimpleConsumer. See
https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example
for details with some code.

When the new consumer is released in 0.8.3, this will get a *lot* simpler.

-Ewen

On Fri, Aug 7, 2015 at 9:26 AM, Padgett, Ben bpadg...@illumina.com wrote:

 Does anyone have an example of how to get a single record from a
 topic+partition given a specific offset?

 I am interested in this for some retry logic for failed messages.

 Thanks!




-- 
Thanks,
Ewen


Re: problem about the offset

2015-08-10 Thread Ewen Cheslack-Postava
Kafka doesn't track per-message timestamps. The request you're using gets a
list of offsets for *log segments* with timestamps earlier than the one you
specify. If you start consuming from the offset returned, you should find
the timestamp you specified in the same log file.

-Ewen

On Mon, Aug 10, 2015 at 2:21 AM, jinhong lu lujinho...@gmail.com wrote:

 Hi, all

 I try to use SimpleConsumer follow the example at
 https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example
 
 https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example
 .

I modify the offset in the code:
 long readOffset = getLastOffset(consumer,a_topic, a_partition,
 kafka.api.OffsetRequest.EarliestTime(), clientName);

   It works well when I use kafka.api.OffsetRequest.EarliestTime() or
 kafka.api.OffsetRequest.LatestTime(). But when I set it to a UNIX
 TIMESTAMP, it return not the message at that moment.

For example,
 long readOffset = getLastOffset(consumer,a_topic, a_partition,
 143919600L, clientName);

   I set the timestamp to  143919600L, it means 2015/8/10 16:40:0. But
 it return the message about one hour before that time.

 (1)Is it the right way to assign the time stamp? the time stamp should be
 13bit, not 10bit, right?
 (2)I am in china, using Beijing time, is it has an effect?
 (3) Or any possbile that kafka has any parameter to set the time of the
 cluster?

 thanks a lot.


 BR//lujinhong




-- 
Thanks,
Ewen


Re: Kafka java consumer

2015-08-14 Thread Ewen Cheslack-Postava
There's not a precise date for the release, ~1.5 or 2 months from now.

On Fri, Aug 14, 2015 at 3:45 PM, Abhijith Prabhakar abhi.preda...@gmail.com
 wrote:

 Thanks Ewen.  Any idea when we can expect 0.8.3?


  On Aug 14, 2015, at 5:36 PM, Ewen Cheslack-Postava e...@confluent.io
 wrote:
 
  Hi Abhijith,
 
  You should be using KafkaProducer, but KafkaConsumer is not ready yet.
 The
  APIs are included in 0.8.2.1, but the implementation is not ready. Until
  0.8.3 is released, you cannot rely only on kafka-clients if you want to
  write a consumer. You'll need to depend on the main kafka jar and use
  kafka.consumer.Consumer, as described on that wiki page. It has not been
  deprecated yet since the new consumer implementation is not ready yet.
 
  -Ewen
 
  On Fri, Aug 14, 2015 at 2:17 PM, Abhijith Prabhakar 
 abhi.preda...@gmail.com
  wrote:
 
  Hi All,
 
  I am newbie to Kafka and was looking to use java client implementation
  org.apache.kafka:kafka-clients:0.8.2.1.  I was trying to write a
 consumer
  group using example given here:
 
 https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example 
 
 https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example
 
  I see couple of issues here.
 
  1.  Above confluence page uses kafka.consumer.Consumer which seems to be
  deprecated and taken out in 0.8.2.1.
  2. I realized that in documentation it mentions that 0.8.2 only has
  Producer implementation inside Java client. But I also see
  org/apache/kafka/clients/consumer/KafkaConsumer in this 0.8.2.1 version.
  Not sure if this is ready to be used.  Also javadoc on this class is
  different than 0.8.3
 
 
 http://kafka.apache.org/083/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html
  
 
 http://kafka.apache.org/083/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html
 
 
  Can someone please let me know if using KafkaConsumer is a good idea?
 If
  yes, then please point me to an example.
 
  Thanks
  Abhi
 
 
 
 
  --
  Thanks,
  Ewen




-- 
Thanks,
Ewen


Re: Kafka java consumer

2015-08-14 Thread Ewen Cheslack-Postava
Hi Abhijith,

You should be using KafkaProducer, but KafkaConsumer is not ready yet. The
APIs are included in 0.8.2.1, but the implementation is not ready. Until
0.8.3 is released, you cannot rely only on kafka-clients if you want to
write a consumer. You'll need to depend on the main kafka jar and use
kafka.consumer.Consumer, as described on that wiki page. It has not been
deprecated yet since the new consumer implementation is not ready yet.

-Ewen

On Fri, Aug 14, 2015 at 2:17 PM, Abhijith Prabhakar abhi.preda...@gmail.com
 wrote:

 Hi All,

 I am newbie to Kafka and was looking to use java client implementation
 org.apache.kafka:kafka-clients:0.8.2.1.  I was trying to write a consumer
 group using example given here:
 https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example 
 https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example

 I see couple of issues here.

 1.  Above confluence page uses kafka.consumer.Consumer which seems to be
 deprecated and taken out in 0.8.2.1.
 2. I realized that in documentation it mentions that 0.8.2 only has
 Producer implementation inside Java client. But I also see
 org/apache/kafka/clients/consumer/KafkaConsumer in this 0.8.2.1 version.
 Not sure if this is ready to be used.  Also javadoc on this class is
 different than 0.8.3

 http://kafka.apache.org/083/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html
 
 http://kafka.apache.org/083/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html
 

 Can someone please let me know if using KafkaConsumer is a good idea?  If
 yes, then please point me to an example.

 Thanks
 Abhi




-- 
Thanks,
Ewen


Re: Best practices - Using kafka (with http server) as source-of-truth

2015-07-27 Thread Ewen Cheslack-Postava
Hi Prabhjot,

Confluent has a REST proxy with docs that may give some guidance:
http://docs.confluent.io/1.0/kafka-rest/docs/intro.html The new producer
that it uses is very efficient, so you should be able to get pretty good
throughput. You take a bit of a hit due to the overhead of sending data
through a proxy, but with appropriate batching you can get about 2/3 the
performance as you would get using the Java producer directly.

There are also a few other proxies you can find here:
https://cwiki.apache.org/confluence/display/KAFKA/Clients#Clients-HTTPREST

You can also put nginx (or HAProxy, or a variety of other solutions) in
front of REST proxies for load balancing, HA, SSL termination, etc. This is
yet another hop, so it might affect throughput and latency.

-Ewen

On Mon, Jul 27, 2015 at 6:55 AM, Prabhjot Bharaj prabhbha...@gmail.com
wrote:

 Hi Folks,

 I would like to understand the best practices when using kafka as the
 source-of-truth, given the fact that I want to pump in data to Kafka using
 http methods.

 What are the current production configurations for such a use case:-

 1. Kafka-http-client - is it scalable the way Nginx is ??
 2. Using Kafka and Nginx together - If anybody has used this, please
 explain
 3. Any other scalable method ?

 Regards,
 prabcs




-- 
Thanks,
Ewen


Re: Cache Memory Kafka Process

2015-07-27 Thread Ewen Cheslack-Postava
Having the OS cache the data in Kafka's log files is useful since it means
that data doesn't need to be read back from disk when consumed. This is
good for the latency and throughput of consumers. Usually this caching
works out pretty well, keeping the latest data from your topics in cache
and only pulling older data into memory if a consumer reads data from
earlier in the log. In other words, by leveraging OS-level caching of
files, Kafka gets an in-memory caching layer for free.

Generally you shouldn't need to clear this data -- the OS should only be
using memory that isn't being used anyway. Is there a particular problem
you're encountering that clearing the cache would help with?

-Ewen

On Mon, Jul 27, 2015 at 2:33 AM, Nilesh Chhapru 
nilesh.chha...@ugamsolutions.com wrote:

 Hi All,

 I am facing issues with kafka broker process taking  a lot of cache
 memory, just wanted to know if the process really need that much of
 cache memory, or can i clear the OS level cache by setting a cron.

 Regards,
 Nilesh Chhapru.




-- 
Thanks,
Ewen


Re: deleting data automatically

2015-07-27 Thread Ewen Cheslack-Postava
As I mentioned, adjusting any settings such that files are small enough
that you don't get the benefits of append-only writes or file
creation/deletion become a bottleneck might affect performance. It looks
like the default setting for log.segment.bytes is 1GB, so given fast enough
cleanup of old logs, you may not need to adjust that setting -- assuming
you have a reasonable amount of storage, you'll easily fit many dozen log
files of that size.

-Ewen

On Mon, Jul 27, 2015 at 10:36 AM, Yuheng Du yuheng.du.h...@gmail.com
wrote:

 Thank you! what performance impacts will it be if I change
 log.segment.bytes? Thanks.

 On Mon, Jul 27, 2015 at 1:25 PM, Ewen Cheslack-Postava e...@confluent.io
 wrote:

  I think log.cleanup.interval.mins was removed in the first 0.8 release.
 It
  sounds like you're looking at outdated docs. Search for
  log.retention.check.interval.ms here:
  http://kafka.apache.org/documentation.html
 
  As for setting the values too low hurting performance, I'd guess it's
  probably only an issue if you set them extremely small, such that file
  creation and cleanup become a bottleneck.
 
  -Ewen
 
  On Mon, Jul 27, 2015 at 10:03 AM, Yuheng Du yuheng.du.h...@gmail.com
  wrote:
 
   If I want to get higher throughput, should I increase the
   log.segment.bytes?
  
   I don't see log.retention.check.interval.ms, but there is
   log.cleanup.interval.mins, is that what you mean?
  
   If I set log.roll.ms or log.cleanup.interval.mins too small, will it
  hurt
   the throughput? Thanks.
  
   On Fri, Jul 24, 2015 at 11:03 PM, Ewen Cheslack-Postava 
  e...@confluent.io
   
   wrote:
  
You'll want to set the log retention policy via
log.retention.{ms,minutes,hours} or log.retention.bytes. If you want
   really
aggressive collection (e.g., on the order of seconds, as you
  specified),
you might also need to adjust log.segment.bytes/log.roll.{ms,hours}
 and
log.retention.check.interval.ms.
   
On Fri, Jul 24, 2015 at 12:49 PM, Yuheng Du 
 yuheng.du.h...@gmail.com
wrote:
   
 Hi,

 I am testing the kafka producer performance. So I created a queue
 and
 writes a large amount of data to that queue.

 Is there a way to delete the data automatically after some time,
 say
 whenever the data size reaches 50GB or the retention time exceeds
 10
 seconds, it will be deleted so my disk won't get filled and new
 data
can't
 be written in?

 Thanks.!

   
   
   
--
Thanks,
Ewen
   
  
 
 
 
  --
  Thanks,
  Ewen
 




-- 
Thanks,
Ewen


Re: deleting data automatically

2015-07-27 Thread Ewen Cheslack-Postava
I think log.cleanup.interval.mins was removed in the first 0.8 release. It
sounds like you're looking at outdated docs. Search for
log.retention.check.interval.ms here:
http://kafka.apache.org/documentation.html

As for setting the values too low hurting performance, I'd guess it's
probably only an issue if you set them extremely small, such that file
creation and cleanup become a bottleneck.

-Ewen

On Mon, Jul 27, 2015 at 10:03 AM, Yuheng Du yuheng.du.h...@gmail.com
wrote:

 If I want to get higher throughput, should I increase the
 log.segment.bytes?

 I don't see log.retention.check.interval.ms, but there is
 log.cleanup.interval.mins, is that what you mean?

 If I set log.roll.ms or log.cleanup.interval.mins too small, will it hurt
 the throughput? Thanks.

 On Fri, Jul 24, 2015 at 11:03 PM, Ewen Cheslack-Postava e...@confluent.io
 
 wrote:

  You'll want to set the log retention policy via
  log.retention.{ms,minutes,hours} or log.retention.bytes. If you want
 really
  aggressive collection (e.g., on the order of seconds, as you specified),
  you might also need to adjust log.segment.bytes/log.roll.{ms,hours} and
  log.retention.check.interval.ms.
 
  On Fri, Jul 24, 2015 at 12:49 PM, Yuheng Du yuheng.du.h...@gmail.com
  wrote:
 
   Hi,
  
   I am testing the kafka producer performance. So I created a queue and
   writes a large amount of data to that queue.
  
   Is there a way to delete the data automatically after some time, say
   whenever the data size reaches 50GB or the retention time exceeds 10
   seconds, it will be deleted so my disk won't get filled and new data
  can't
   be written in?
  
   Thanks.!
  
 
 
 
  --
  Thanks,
  Ewen
 




-- 
Thanks,
Ewen


Re: wow--kafka--why? unresolved dependency: com.eed3si9n#sbt-assembly;0.8.8: not found

2015-07-23 Thread Ewen Cheslack-Postava
 on no_buildscript class cache for script
 '/var/kafka/gradle/buildscript.gradle'

 (/root/.gradle/caches/2.5/scripts/buildscript_1vxtwern8bk8c0aam5ho9cjpa/DefaultScript/no_buildscript).
 04:32:54.594 [DEBUG] [org.gradle.cache.internal.DefaultCacheAccess] Cache
 Plugin Resolution Cache (/root/.gradle/caches/2.5/plugin-resolution) was
 closed 0 times.
 04:32:54.599 [DEBUG]
 [org.gradle.cache.internal.btree.BTreePersistentIndexedCache] Closing cache
 artifact-at-url.bin
 (/root/.gradle/caches/modules-2/metadata-2.15/artifact-at-url.bin)
 04:32:54.627 [DEBUG]
 [org.gradle.cache.internal.btree.BTreePersistentIndexedCache] Closing cache
 artifact-at-repository.bin
 (/root/.gradle/caches/modules-2/metadata-2.15/artifact-at-repository.bin)
 04:32:54.629 [DEBUG]
 [org.gradle.cache.internal.btree.BTreePersistentIndexedCache] Closing cache
 module-metadata.bin
 (/root/.gradle/caches/modules-2/metadata-2.15/module-metadata.bin)
 04:32:54.633 [DEBUG] [org.gradle.cache.internal.DefaultFileLockManager]
 Releasing lock on artifact cache (/root/.gradle/caches/modules-2).
 04:32:54.636 [DEBUG]

 [org.gradle.api.internal.artifacts.ivyservice.resolveengine.store.CachedStoreFactory]
 Resolution result cache closed. Cache reads: 0, disk reads: 0 (avg: 0.0
 secs, total: 0.0 secs)
 04:32:54.638 [DEBUG]

 [org.gradle.api.internal.artifacts.ivyservice.resolveengine.store.CachedStoreFactory]
 Resolved configuration cache closed. Cache reads: 0, disk reads: 0 (avg:
 0.0 secs, total: 0.0 secs)
 04:32:54.647 [DEBUG]

 [org.gradle.api.internal.artifacts.ivyservice.resolveengine.store.ResolutionResultsStoreFactory]
 Deleted 2 resolution results binary files in 0.012 secs
 04:32:54.648 [DEBUG]

 [org.gradle.api.internal.artifacts.ivyservice.ivyresolve.memcache.InMemoryCachedRepositoryFactory]
 In-memory dependency metadata cache closed. Repos cached: 2, cache
 instances: 2, modules served from cache: 0, artifacts: 0
 04:32:54.649 [DEBUG] [org.gradle.launcher.daemon.server.exec.ExecuteBuild]
 The daemon has finished executing the build.
 04:32:54.856 [DEBUG]
 [org.gradle.launcher.daemon.client.DaemonClientInputForwarder] Dispatching
 close input message: CloseInput[id=fd029ae1-df9b-4914-bdcb-bbd601ce36d6.2]
 04:32:54.858 [DEBUG]
 [org.gradle.launcher.daemon.client.DaemonClientConnection] thread 16:
 dispatching class org.gradle.launcher.daemon.protocol.CloseInput
 04:32:54.863 [INFO] [org.gradle.launcher.daemon.client.DaemonClient]
 Received result
 CommandFailure[value=org.gradle.initialization.ReportedException:
 org.gradle.internal.exceptions.LocationAwareException: A problem occurred
 configuring root project 'kafka'.] from daemon DaemonInfo{pid=26478,
 address=[184687ba-2c47-419b-93ff-c7754cbd4770 port:38024,
 addresses:[/0:0:0:0:0:0:0:1%1, /127.0.0.1]], idle=false,

 context=DefaultDaemonContext[uid=a0fd5fdb-d325-4cb2-a40d-5800b5f966ee,javaHome=/usr/lib/jvm/java-7-openjdk-amd64,daemonRegistryDir=/root/.gradle/daemon,pid=26478,idleTimeout=12,daemonOpts=-XX:MaxPermSize=512m,-Xmx1024m,-Dfile.encoding=UTF-8,-Duser.country=PH,-Duser.language=en,-Duser.variant]}.
 04:32:54.865 [DEBUG] [org.gradle.launch

 On Fri, Jul 24, 2015 at 3:34 AM, Ewen Cheslack-Postava e...@confluent.io
 wrote:

  Also, the branch you're checking out is very old. If you want the most
  recent release, that's tagged as 0.8.2.1. Otherwise, you'll want to use
 the
  trunk branch.
 
  -Ewen
 
  On Thu, Jul 23, 2015 at 11:45 AM, Gwen Shapira gshap...@cloudera.com
  wrote:
 
   Sorry, we don't actually do SBT builds anymore.
  
   You can build successfully using Gradle:
  
   You need to have [gradle](http://www.gradle.org/installation)
 installed.
  
   ### First bootstrap and download the wrapper ###
   cd kafka_source_dir
   gradle
  
   Now everything else will work
  
   ### Building a jar and running it ###
   ./gradlew jar
  
  
   Can you let us know where you saw the SBT instructions, so we can fix
 it?
  
   On Thu, Jul 23, 2015 at 11:39 AM, David Montgomery
   davidmontgom...@gmail.com wrote:
Just wondering I am getting this very disapointing error with kafka
   install.
   
git clone https://git-wip-us.apache.org/repos/asf/kafka.git
cd kafka
git checkout -b 0.8 remotes/origin/0.8
./sbt ++2.9.2 update
   
   
Thanks
   
   
[warn]
   
  
 
 http://repo1.maven.org/maven2/com/jsuereth/xsbt-gpg-plugin_2.9.2_0.12/0.6/xsbt-gpg-plugin-0.6.pom
[info] Resolving org.scala-sbt#precompiled-2_10_0-m7;0.12.1 ...
[warn] ::
[warn] ::  UNRESOLVED DEPENDENCIES ::
[warn] ::
[warn] :: com.eed3si9n#sbt-assembly;0.8.8: not found
[warn] :: com.jsuereth#xsbt-gpg-plugin;0.6: not found
[warn] ::
[warn]
[warn] Note: Some unresolved dependencies have extra attributes.
   Check
that these dependencies exist with the requested

Re: wow--kafka--why? unresolved dependency: com.eed3si9n#sbt-assembly;0.8.8: not found

2015-07-23 Thread Ewen Cheslack-Postava
Also, the branch you're checking out is very old. If you want the most
recent release, that's tagged as 0.8.2.1. Otherwise, you'll want to use the
trunk branch.

-Ewen

On Thu, Jul 23, 2015 at 11:45 AM, Gwen Shapira gshap...@cloudera.com
wrote:

 Sorry, we don't actually do SBT builds anymore.

 You can build successfully using Gradle:

 You need to have [gradle](http://www.gradle.org/installation) installed.

 ### First bootstrap and download the wrapper ###
 cd kafka_source_dir
 gradle

 Now everything else will work

 ### Building a jar and running it ###
 ./gradlew jar


 Can you let us know where you saw the SBT instructions, so we can fix it?

 On Thu, Jul 23, 2015 at 11:39 AM, David Montgomery
 davidmontgom...@gmail.com wrote:
  Just wondering I am getting this very disapointing error with kafka
 install.
 
  git clone https://git-wip-us.apache.org/repos/asf/kafka.git
  cd kafka
  git checkout -b 0.8 remotes/origin/0.8
  ./sbt ++2.9.2 update
 
 
  Thanks
 
 
  [warn]
 
 http://repo1.maven.org/maven2/com/jsuereth/xsbt-gpg-plugin_2.9.2_0.12/0.6/xsbt-gpg-plugin-0.6.pom
  [info] Resolving org.scala-sbt#precompiled-2_10_0-m7;0.12.1 ...
  [warn] ::
  [warn] ::  UNRESOLVED DEPENDENCIES ::
  [warn] ::
  [warn] :: com.eed3si9n#sbt-assembly;0.8.8: not found
  [warn] :: com.jsuereth#xsbt-gpg-plugin;0.6: not found
  [warn] ::
  [warn]
  [warn] Note: Some unresolved dependencies have extra attributes.
 Check
  that these dependencies exist with the requested attributes.
  [warn] com.eed3si9n:sbt-assembly:0.8.8 (sbtVersion=0.12,
  scalaVersion=2.9.2)
  [warn] com.jsuereth:xsbt-gpg-plugin:0.6 (sbtVersion=0.12,
  scalaVersion=2.9.2)
  [warn]
  sbt.ResolveException: unresolved dependency:
  com.eed3si9n#sbt-assembly;0.8.8: not found
  unresolved dependency: com.jsuereth#xsbt-gpg-plugin;0.6: not found
  at sbt.IvyActions$.sbt$IvyActions$$resolve(IvyActions.scala:214)
  at sbt.IvyActions$$anonfun$update$1.apply(IvyActions.scala:122)
  at sbt.IvyActions$$anonfun$update$1.apply(IvyActions.scala:121)
  at sbt.IvySbt$Module$$anonfun$withModule$1.apply(Ivy.scala:114)
  at sbt.IvySbt$Module$$anonfun$withModule$1.apply(Ivy.scala:114)
  at sbt.IvySbt$$anonfun$withIvy$1.apply(Ivy.scala:102)
  at sbt.IvySbt.liftedTree1$1(Ivy.scala:49)
  at sbt.IvySbt.action$1(Ivy.scala:49)
  at sbt.IvySbt$$anon$3.call(Ivy.scala:58)
  at xsbt.boot.Locks$GlobalLock.withChannel$1(Locks.scala:75)
  at xsbt.boot.Locks$GlobalLock.withChannelRetries$1(Locks.scala:58)
  at
  xsbt.boot.Locks$GlobalLock$$anonfun$withFileLock$1.apply(Locks.scala:79)
  at xsbt.boot.Using$.withResource(Using.scala:11)
  at xsbt.boot.Using$.apply(Using.scala:10)
  at xsbt.boot.Locks$GlobalLock.liftedTree1$1(Locks.scala:51)
  at xsbt.boot.Locks$GlobalLock.withLock(Locks.scala:51)
  at xsbt.boot.Locks$.apply0(Locks.scala:30)
  at xsbt.boot.Locks$.apply(Locks.scala:27)
  at sbt.IvySbt.withDefaultLogger(Ivy.scala:58)
  at sbt.IvySbt.withIvy(Ivy.scala:99)
  at sbt.IvySbt.withIvy(Ivy.scala:95)
  at sbt.IvySbt$Module.withModule(Ivy.scala:114)
  at sbt.IvyActions$.update(IvyActions.scala:121)
  at sbt.Classpaths$$anonfun$work$1$1.apply(Defaults.scala:951)
  at sbt.Classpaths$$anonfun$work$1$1.apply(Defaults.scala:949)
  at
  sbt.Classpaths$$anonfun$doWork$1$1$$anonfun$54.apply(Defaults.scala:972)
  at
  sbt.Classpaths$$anonfun$doWork$1$1$$anonfun$54.apply(Defaults.scala:970)
  at sbt.Tracked$$anonfun$lastOutput$1.apply(Tracked.scala:35)
  at sbt.Classpaths$$anonfun$doWork$1$1.apply(Defaults.scala:974)
  at sbt.Classpaths$$anonfun$doWork$1$1.apply(Defaults.scala:969)
  at sbt.Tracked$$anonfun$inputChanged$1.apply(Tracked.scala:45)
  at sbt.Classpaths$.cachedUpdate(Defaults.scala:977)
  at sbt.Classpaths$$anonfun$45.apply(Defaults.scala:856)
  at sbt.Classpaths$$anonfun$45.apply(Defaults.scala:853)
  at sbt.Scoped$$anonfun$hf10$1.apply(Structure.scala:586)
  at sbt.Scoped$$anonfun$hf10$1.apply(Structure.scala:586)
  at scala.Function1$$anonfun$compose$1.apply(Function1.scala:49)
  at
 
 sbt.Scoped$Reduced$$anonfun$combine$1$$anonfun$apply$12.apply(Structure.scala:311)
  at
 
 sbt.Scoped$Reduced$$anonfun$combine$1$$anonfun$apply$12.apply(Structure.scala:311)
  at sbt.$tilde$greater$$anonfun$$u2219$1.apply(TypeFunctions.scala:41)
  at sbt.std.Transform$$anon$5.work(System.scala:71)
  at
  sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:232)
  at
  sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:232)
  at sbt.ErrorHandling$.wideConvert(ErrorHandling.scala:18)
  at sbt.Execute.work(Execute.scala:238)
  at 

Re: Choosing brokers when creating topics

2015-07-27 Thread Ewen Cheslack-Postava
Try the --replica-assignment option for kafka-topics.sh. It allows you to
specify which brokers to assign as replicas instead of relying on the
assignments being made automatically.

-Ewen

On Mon, Jul 27, 2015 at 12:25 AM, Jilin Xie jilinxie1...@gmail.com wrote:

 Hi
   Is it possible to choose which brokers to use when creating a topic?
 The general command of creating topic is:

 *bin/kafka-topics.sh --create --zookeeper localhost:2181
 --replication-factor 1 --partitions 1 --topic test*

 What I'm looking for is:

 *bin/kafka-topics.sh --create .  --broker-to-use xxx;xxx;xxx*


 *It's because, I want the topic to be hosted on the brokers which
 would be closest to the possible producer.*


 *   Thanks in advance.*




-- 
Thanks,
Ewen


Re: New producer hangs inifitely when it looses connection to Kafka cluster

2015-07-21 Thread Ewen Cheslack-Postava
This is a known issue. There are a few relevant JIRAs and a KIP:

https://issues.apache.org/jira/browse/KAFKA-1788
https://issues.apache.org/jira/browse/KAFKA-2120
https://cwiki.apache.org/confluence/display/KAFKA/KIP-19+-+Add+a+request+timeout+to+NetworkClient

-Ewen

On Tue, Jul 21, 2015 at 7:05 AM, Stevo Slavić ssla...@gmail.com wrote:

 Hello Apache Kafka community,

 Just noticed that :
 - message is successfully published using new 0.8.2.1 producer
 - and then Kafka is stopped

 next attempt to publish message using same instance of new producer hangs
 forever, and following stacktrace gets logged repeatedly:

 [WARN ] [o.a.kafka.common.network.Selector] [] Error in I/O with localhost/
 127.0.0.1
 java.net.ConnectException: Connection refused
 at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[na:1.8.0_31]
 at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:716)
 ~[na:1.8.0_31]
 at org.apache.kafka.common.network.Selector.poll(Selector.java:238)
 ~[kafka-clients-0.8.2.1.jar:na]
 at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192)
 [kafka-clients-0.8.2.1.jar:na]
 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191)
 [kafka-clients-0.8.2.1.jar:na]
 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122)
 [kafka-clients-0.8.2.1.jar:na]
 at java.lang.Thread.run(Thread.java:745) [na:1.8.0_31]


 I expect producer to respect timeout settings even in this connection lost
 scenario.

 Is this a known bug? Is there something I can do/configure as a workaround?

 Kind regards,
 Stevo Slavic.




-- 
Thanks,
Ewen


Re: New consumer - poll/seek javadoc confusing, need clarification

2015-07-21 Thread Ewen Cheslack-Postava
On Tue, Jul 21, 2015 at 2:38 AM, Stevo Slavić ssla...@gmail.com wrote:

 Hello Apache Kafka community,

 I find new consumer poll/seek javadoc a bit confusing. Just by reading docs
 I'm not sure what the outcome will be, what is expected in following
 scenario:

 - kafkaConsumer is instantiated with auto-commit off
 - kafkaConsumer.subscribe(someTopic)
 - kafkaConsumer.position is called for every TopicPartition HLC is actively
 subscribed on

 and then when doing multiple poll calls in succession (without calling
 commit), does seek have to be called in between poll calls to position HLC
 to skip what was read in previous poll, or does HLC keep that state
 (position after poll) in memory, so that next poll (without seek in between
 two poll calls) will continue from where last poll stopped?


The position is tracked in-memory within the consumer, so as long as there
isn't a consumer rebalance, consumption will just proceed with subsequent
messages (i.e. the behavior I think most people would find intuitive).
However, if a rebalance occurs (another consumer instance joins the group
or some leave), then a partition may be assigned to an different consumer
instance that has no idea about the current position and will restart based
on the offset reset setting (because attempting to fetch the committed
offset will fail since no offsets have been committed).

-Ewen


 Could be it's just me not understanding this from javadoc. If not, maybe
 javadoc can be improved to make this (even) more obvious.

 Kind regards,
 Stevo Slavic.




-- 
Thanks,
Ewen


Re: Retrieving lost messages produced while the consumer was down.

2015-07-21 Thread Ewen Cheslack-Postava
Since you mentioned consumer groups, I'm assuming you're using the high
level consumer? Do you have auto.commit.enable set to true?

It sounds like when you start up you are always getting the
auto.offset.reset behavior, which indicates you don't have any offsets
committed. By default, that behavior is to consume from the latest offset
(which would only get messages produced after the consumer starts).

To get the behavior you're looking for, you should make sure to commit
offsets when you're shutting down your consumer so it will resume where you
left off the next time you start it. Unless you are using the
SimpleConsumer, you shouldn't need to explicitly make any requests yourself.


On Tue, Jul 21, 2015 at 2:24 PM, Tomas Niño Kehoe tomasninoke...@gmail.com
wrote:

 Hi,

 We've been using Kafka for a couple of months, and now we're trying to to
 write a Simple application using the ConsumerGroup to fully understand
 Kafka.

 Having the producer continually writing data, our consumer occasionally
 needs to be restarted. However, once the program is brought back up,
 messages which we're produced during that period of time are not being
 read. Instead, the consumer (this is a single consumer inside a Consume
 group) will read the messages produced after it was brought back up.  Its
 configuration doesn't change at all.

 For example using the simple consumer/producer apps:

 Produced 1, 2, 3, 4, 5
 Consumed 1, 2, 3, 4, 5

 [Stop the consumer]
 Produce 20, 21, 22, 23

 When the consumer is brought back up, I'd like to get 20, 21, 22, 23, but I
 will only get either new messages, or all the messages using
 (--from-beginning).

 Is there a way of achieving this programatically, without for example
 writing an offset into the zookeeper node? Is the OffsetCommitRequest the
 way to go?

 Thanks in advance


 Tomás




-- 
Thanks,
Ewen


Re: deleting data automatically

2015-07-24 Thread Ewen Cheslack-Postava
You'll want to set the log retention policy via
log.retention.{ms,minutes,hours} or log.retention.bytes. If you want really
aggressive collection (e.g., on the order of seconds, as you specified),
you might also need to adjust log.segment.bytes/log.roll.{ms,hours} and
log.retention.check.interval.ms.

On Fri, Jul 24, 2015 at 12:49 PM, Yuheng Du yuheng.du.h...@gmail.com
wrote:

 Hi,

 I am testing the kafka producer performance. So I created a queue and
 writes a large amount of data to that queue.

 Is there a way to delete the data automatically after some time, say
 whenever the data size reaches 50GB or the retention time exceeds 10
 seconds, it will be deleted so my disk won't get filled and new data can't
 be written in?

 Thanks.!




-- 
Thanks,
Ewen


Re: Data Structure abstractions over kafka

2015-07-13 Thread Ewen Cheslack-Postava
Tim,

Kafka can be used as a key-value store if you turn on log compaction:
http://kafka.apache.org/documentation.html#compaction You need to be
careful with that since it's purely last-writer-wins and doesn't have
anything like CAS that might help you manage concurrent writers, but the
basic functionality is there. This is used by the brokers to store offsets
in Kafka (where keys are (consumer-group, topic, partition), values are the
offset, and they already have a mechanism to ensure only a single writer at
a time).

You could possibly use this to implement the linked list functionality
you're talking about, although there are probably a number of challenges
(e.g., performing atomic updates if you need a doubly-linked list, ensuring
garbage is collected after removals even if you only need a singly-linked
list, etc). Also, I'm not sure it would be particularly efficient, you'd
still need to ensure a single writer (or at least single writer per linked
list node), etc.

You're almost definitely better off using a specialized store for something
like that simply because Kafka isn't designed around that use case, but
it'd be interesting to see how far you could get with Kafka's current
functionality, and what would be required to make it practical!

-Ewen

On Mon, Jul 13, 2015 at 11:36 AM, Tim Smith secs...@gmail.com wrote:

 Hi,

 In the big data ecosystem, I have started to use kafka, essentially, as a:
 -  unordered list/array, and
 - a cluster-wide pipe

 I guess you could argue that any message bus product is a simple array/pipe
 but kafka's scale and model make things so easy :)

 I am wondering if there are any abstractions on top of kafka that will let
 me use kafka to store/organize other simple data structures like a
 linked-list? I have a use case for massive linked list that can easily grow
 to tens of gigabytes and could easily use - (1) redundancy (2) multiple
 producers/consumers working on processing the list (implemented over spark,
 storm etc).

 Any ideas? Maybe maintain a linked-list of offsets in another store like
 ZooKeeper or a NoSQL DB while store the messages on kafka?

 Thanks,

 - Tim




-- 
Thanks,
Ewen


Re: kafka benchmark tests

2015-07-13 Thread Ewen Cheslack-Postava
I implemented (nearly) the same basic set of tests in the system test
framework we started at Confluent and that is going to move into Kafka --
see the wip patch for KIP-25 here: https://github.com/apache/kafka/pull/70
In particular, that test is implemented in benchmark_test.py:
https://github.com/apache/kafka/pull/70/files#diff-ca984778cf9943407645eb6784f19dc8

Hopefully once that's merged people can reuse that benchmark (and add to
it!) so they can easily run the same benchmarks across different hardware.
Here are some results from an older version of that test on m3.2xlarge
instances on EC2 using local ephemeral storage (I think... it's been awhile
since I ran these numbers and I didn't document methodology that carefully):

INFO:_.KafkaBenchmark:=
INFO:_.KafkaBenchmark:BENCHMARK RESULTS
INFO:_.KafkaBenchmark:=
INFO:_.KafkaBenchmark:Single producer, no replication: 684097.470208
rec/sec (65.24 MB/s)
INFO:_.KafkaBenchmark:Single producer, async 3x replication:
667494.359673 rec/sec (63.66 MB/s)
INFO:_.KafkaBenchmark:Single producer, sync 3x replication:
116485.764275 rec/sec (11.11 MB/s)
INFO:_.KafkaBenchmark:Three producers, async 3x replication:
1696519.022182 rec/sec (161.79 MB/s)
INFO:_.KafkaBenchmark:Message size:
INFO:_.KafkaBenchmark: 10: 1637825.195625 rec/sec (15.62 MB/s)
INFO:_.KafkaBenchmark: 100: 605504.877911 rec/sec (57.75 MB/s)
INFO:_.KafkaBenchmark: 1000: 90351.817570 rec/sec (86.17 MB/s)
INFO:_.KafkaBenchmark: 1: 8306.180862 rec/sec (79.21 MB/s)
INFO:_.KafkaBenchmark: 10: 978.403499 rec/sec (93.31 MB/s)
INFO:_.KafkaBenchmark:Throughput over long run, data  memory:
INFO:_.KafkaBenchmark: Time block 0: 684725.151324 rec/sec (65.30 MB/s)
INFO:_.KafkaBenchmark:Single consumer: 701031.14 rec/sec (56.830500 MB/s)
INFO:_.KafkaBenchmark:Three consumers: 3304011.014900 rec/sec (267.830800 MB/s)
INFO:_.KafkaBenchmark:Producer + consumer:
INFO:_.KafkaBenchmark: Producer: 624984.375391 rec/sec (59.60 MB/s)
INFO:_.KafkaBenchmark: Consumer: 624984.375391 rec/sec (59.60 MB/s)
INFO:_.KafkaBenchmark:End-to-end latency: median 2.00 ms, 99%
4.00 ms, 99.9% 19.00 ms

Don't trust these numbers for anything, the were a quick one-off test. I'm
just pasting the output so you get some idea of what the results might look
like. Once we merge the KIP-25 patch, Confluent will be running the tests
regularly and results will be available publicly so we'll be able to keep
better tabs on performance, albeit for only a specific class of hardware.

For the batch.size question -- I'm not sure the results in the blog post
actually have different settings, it could be accidental divergence between
the script and the blog post. The post specifically notes that tuning the
batch size in the synchronous case might help, but that he didn't do that.
If you're trying to benchmark the *optimal* throughput, tuning the batch
size would make sense. Since synchronous replication will have higher
latency and there's a limit to how many requests can be in flight at once,
you'll want a larger batch size to compensate for the additional latency.
However, in practice the increase you see may be negligible. Somebody who
has spent more time fiddling with tweaking producer performance may have
more insight.

-Ewen

On Mon, Jul 13, 2015 at 10:08 AM, JIEFU GONG jg...@berkeley.edu wrote:

 Hi all,

 I was wondering if any of you guys have done benchmarks on Kafka
 performance before, and if they or their details (# nodes in cluster, #
 records / size(s) of messages, etc.) could be shared.

 For comparison purposes, I am trying to benchmark Kafka against some
 similar services such as Kinesis or Scribe. Additionally, I was wondering
 if anyone could shed some insight on Jay Kreps' benchmarks that he has
 openly published here:

 https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines

 Specifically, I am unsure of why between his tests of 3x synchronous
 replication and 3x async replication he changed the batch.size, as well as
 why he is seemingly publishing to incorrect topics:

 Configs:
 https://gist.github.com/jkreps/c7ddb4041ef62a900e6c

 Any help is greatly appreciated!



 --

 Jiefu Gong
 University of California, Berkeley | Class of 2017
 B.A Computer Science | College of Letters and Sciences

 jg...@berkeley.edu elise...@berkeley.edu | (925) 400-3427




-- 
Thanks,
Ewen


Re: resources for simple consumer?

2015-07-15 Thread Ewen Cheslack-Postava
Hi Jeff,

The simple consumer hasn't really changed, the info you found should still
be relevant. The wiki page on it might be the most useful reference for
getting started:
https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example
And if you want a version all setup to compile and run, we've included it
in Confluent's examples repo: https://github.com/confluentinc/examples

On Wed, Jul 15, 2015 at 4:03 PM, Jeff Gong j.gon...@gmail.com wrote:

 hey all,

 typically i've only ever had to use the high level consumer for my personal
 needs when handling data. recently, however, I have found the need to be
 more selective and careful with managing offsets and want the extended
 capability to do so. i know that there is a bit of documentation on a
 simple consumer example online, but that seems to be for version 0.8.0 and
 i am not sure if any of those features are deprecated?

 does anyone have any other resources / repos / etc that they found helpful
 for diving into simple consumer usage? looks like there are a lot of scala
 classes to dig into if i wanted to read the source code. thanks!




-- 
Thanks,
Ewen


Re: Kafka partitioning is pretty much broken

2015-07-15 Thread Ewen Cheslack-Postava
Also worth mentioning is that the new producer doesn't have this behavior
-- it will round robin over available partitions for records without keys.
Available means it currently has a leader -- under normal cases this
means it distributes evenly across all partitions, but if a partition is
down temporarily it will just avoid it. It's highly recommended you use the
new producer anyway since it comes with a lot of other improvements as well.

-Ewen

On Wed, Jul 15, 2015 at 4:57 PM, Lance Laursen llaur...@rubiconproject.com
wrote:

 From the FAQ:

 To reduce # of open sockets, in 0.8.0 (
 https://issues.apache.org/jira/browse/KAFKA-1017), when the partitioning
 key is not specified or null, a producer will pick a random partition and
 stick to it for some time (default is 10 mins) before switching to another
 one. So, if there are fewer producers than partitions, at a given point of
 time, some partitions may not receive any data. To alleviate this problem,
 one can either reduce the metadata refresh interval or specify a message
 key and a customized random partitioner. For more detail see this thread

 http://mail-archives.apache.org/mod_mbox/kafka-dev/201310.mbox/%3CCAFbh0Q0aVh%2Bvqxfy7H-%2BMnRFBt6BnyoZk1LWBoMspwSmTqUKMg%40mail.gmail.com%3E
 


 On Wed, Jul 15, 2015 at 4:13 PM, Stefan Miklosovic mikloso...@gmail.com
 wrote:

  Maybe there is some reason why produce sticks with a partition for
  some period of time - mostly performance related. I can imagine that
  constant switching between partitions can be kind of slow in such
  sense that producer has to refocus on another partition to send a
  message to and this switching may cost something so switching happens
  sporadically.
 
  On the other hand, I would never expect such behaviour I encountered.
  If it is once propagated as random, I expect that it is really
  random and not random but  not random every time. It is hard to
  figure out these information, the only way seems to be to try all
  other solutions and the most awkward one you would never expect to
  work is actually the proper one ...
 
  On Thu, Jul 16, 2015 at 12:53 AM, JIEFU GONG jg...@berkeley.edu wrote:
   This is a total shot in the dark here so please ignore this if it fails
  to
   make sense, but I remember that on some previous implementation of the
   producer prior to when round-robin was enabled, producers would send
   messages to only one of the partitions for a set period of time
   (configurable, I believe) before moving onto the next one. This caused
  me a
   similar grievance as I would notice only a few of my consumers would
 get
   data while others were completely idle.
  
   Sounds similar, so check if that's a possibility at all?
  
   On Wed, Jul 15, 2015 at 3:04 PM, Jagbir Hooda jho...@gmail.com
 wrote:
  
   Hi Stefan,
  
   Have you looked at the following output for message distribution
   across the topic-partitions and which topic-partition is consumed by
   which consumer thread?
  
   kafaka-server/bin./kafka-run-class.sh
   kafka.tools.ConsumerOffsetChecker --zkconnect localhost:2181 --group
   consumer_group_name
  
   Jagbir
  
   On Wed, Jul 15, 2015 at 12:50 PM, Stefan Miklosovic
   mikloso...@gmail.com wrote:
I have following problem, I tried almost everything I could but
  without
   any luck
   
All I want to do is to have 1 producer, 1 topic, 10 partitions and
 10
   consumers.
   
All I want is to send 1M of messages via producer to these 10
  consumers.
   
I am using built Kafka 0.8.3 from current upstream so I have
 bleeding
edge stuff. It does not work on 0.8.1.1 nor 0.8.2 stream.
   
The problem I have is that I expect that when I send 1 milion of
messages via that producer, I will have all consumers busy. In other
words, if a message to be sent via producer is sent to partition
randomly (roundrobin / range), I expect that all 10 consumers will
process about 100k of messages each because producer sends it to
random partition of these 10.
   
But I have never achieved such outcome.
   
I was trying these combinations:
   
1) old scala producer vs old scala consumer
   
Consumer was created by Consumers.createJavaConsumer() ten times.
Every consumer is running in the separate thread.
   
2) old scala producer vs new java consumer
   
new consumer was used like I have 10 consumers listening for a topic
and 10 consumers subscribed to 1 partition. (consumer 1 - partition
 1,
consumer 2 - paritition 2 and so on)
   
3) old scala producer with custom partitioner
   
I even tried to use my own partitioner, I just generated a random
number from 0 to 9 so I expected that the messages will be sent
randomly to the partition of that number.
   
All I see is that there are only couple of consumers from these 10
utilized, even I am sending 1M of messages, all I got from the
debugging output is some preselected set of consumers which appear
 

Re: latency performance test

2015-07-15 Thread Ewen Cheslack-Postava
The tests are meant to evaluate different things and the way they send
messages is the source of the difference.

EndToEndLatency works with a single message at a time. It produces the
message then waits for the consumer to receive it. This approach guarantees
there is no delay due to queuing. The goal with this test is to evaluate
the *minimum* latency.

ProducerPerformance focuses on achieving maximum throughput. This means it
will enqueue lots of records so it will always have more data to send (and
can use batching to increase the throughput). Unlike EndToEndLatency, this
means records may just sit in a queue on the producer for awhile because
the maximum number of in flight requests has been reached and it needs to
wait for responses for those requests. Since EndToEndLatency only ever has
one record outstanding, it will never encounter this case.

Batching itself doesn't increase the latency because it only occurs when
the producer is either a) already unable to send messages anyway or b)
linger.ms is greater than 0, but the tests use the default setting that
doesn't linger at all.

In your example for ProducerPerformance, you have 100 byte records and will
buffer up to 64MB. Given the batch size of 8K and default producer settings
of 5 in flight requests, you can roughly think of one round trip time
handling 5 * 8K = 40K bytes of data. If your roundtrip is 1ms, then if your
buffer is full at 64MB it will take you 64 MB / (40 KB/ms) = 1638ms = 1.6s.
That means that the record that was added at the end of the buffer had to
just sit in the buffer for 1.6s before it was sent off to the broker. And
if your buffer is consistently full (which it should be for
ProducerPerformance since it's sending as fast as it can), that means
*every* record waits that long.

Of course, these numbers are estimates, depend on my having used 1ms, but
hopefully should make it clear why you can see relatively large latencies.

-Ewen


On Wed, Jul 15, 2015 at 1:38 AM, Yuheng Du yuheng.du.h...@gmail.com wrote:

 Hi,

 I have run the end to end latency test and the producerPerformance test on
 my kafka cluster according to
 https://gist.github.com/jkreps/c7ddb4041ef62a900e6c

 In end to end latency test, the latency was around 2ms. In
 producerperformance test, if use batch size 8196 to send 50,000,000
 records:

 bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance
 speedx1 5000 100 -1 acks=1 bootstrap.servers=192.168.1.1:9092
 buffer.memory=67108864 batch.size=8196


 The results show that max latency is 3617ms, avg latency 626.7ms. I wanna
 know why the latency in producerperformance test is significantly larger
 than end to end test? Is it because of batching? Are the definitons of
 these two latencies different? I looked at the source code and I believe
 the latency is measure for the producer.send() function to complete. So
 does this latency includes transmission delay, transferring delay, and what
 other components?


 Thanks.


 best,

 Yuheng




-- 
Thanks,
Ewen


Re: kafka benchmark tests

2015-07-14 Thread Ewen Cheslack-Postava
@Jiefu, yes! The patch is functional, I think it's just waiting on a bit of
final review after the last round of changes. You can definitely use it for
your own benchmarking, and we'd love to see patches for any additional
tests we missed in the first pass!

-Ewen

On Tue, Jul 14, 2015 at 10:53 AM, JIEFU GONG jg...@berkeley.edu wrote:

 Yuheng,
 I would recommend looking here:
 http://kafka.apache.org/documentation.html#brokerconfigs and scrolling
 down
 to get a better understanding of the default settings and what they mean --
 it'll tell you what different options for acks does.

 Ewen,
 Thank you immensely for your thoughts, they shed a lot of insight into the
 issue. Though it is understandable that your specific results need to be
 verified, it seems that the KIP-25 patch is functional and I can use it for
 my own benchmarking purposes? Is that correct? Thanks again!

 On Tue, Jul 14, 2015 at 8:22 AM, Yuheng Du yuheng.du.h...@gmail.com
 wrote:

  Also, I guess setting the target throughput to -1 means let it be as high
  as possible?
 
  On Tue, Jul 14, 2015 at 10:36 AM, Yuheng Du yuheng.du.h...@gmail.com
  wrote:
 
   Thanks. If I set the acks=1 in the producer config options in
   bin/kafka-run-class.sh
 org.apache.kafka.clients.tools.ProducerPerformance
   test7 5000 100 -1 acks=1 bootstrap.servers=
   esv4-hcl198.grid.linkedin.com:9092 buffer.memory=67108864
  batch.size=8196?
  
   Does that mean for each message generated at the producer, the producer
   will wait until the broker sends the ack back, then send another
 message?
  
   Thanks.
  
   Yuheng
  
   On Tue, Jul 14, 2015 at 10:06 AM, Manikumar Reddy 
 ku...@nmsworks.co.in
   wrote:
  
   Yes, A list of  Kafka Server host/port pairs to use for establishing
 the
   initial connection to the Kafka cluster
  
   https://kafka.apache.org/documentation.html#newproducerconfigs
  
   On Tue, Jul 14, 2015 at 7:29 PM, Yuheng Du yuheng.du.h...@gmail.com
   wrote:
  
Does anyone know what is bootstrap.servers=
esv4-hcl198.grid.linkedin.com:9092 means in the following test
  command:
   
bin/kafka-run-class.sh
   org.apache.kafka.clients.tools.ProducerPerformance
test7 5000 100 -1 acks=1 bootstrap.servers=
esv4-hcl198.grid.linkedin.com:9092 buffer.memory=67108864
   batch.size=8196?
   
what is bootstrap.servers? Is it the kafka server that I am running
 a
   test
at?
   
Thanks.
   
Yuheng
   
   
   
   
On Tue, Jul 14, 2015 at 12:18 AM, Ewen Cheslack-Postava 
   e...@confluent.io

wrote:
   
 I implemented (nearly) the same basic set of tests in the system
  test
 framework we started at Confluent and that is going to move into
   Kafka --
 see the wip patch for KIP-25 here:
https://github.com/apache/kafka/pull/70
 In particular, that test is implemented in benchmark_test.py:


   
  
 
 https://github.com/apache/kafka/pull/70/files#diff-ca984778cf9943407645eb6784f19dc8

 Hopefully once that's merged people can reuse that benchmark (and
  add
   to
 it!) so they can easily run the same benchmarks across different
hardware.
 Here are some results from an older version of that test on
  m3.2xlarge
 instances on EC2 using local ephemeral storage (I think... it's
 been
awhile
 since I ran these numbers and I didn't document methodology that
 carefully):

 INFO:_.KafkaBenchmark:=
 INFO:_.KafkaBenchmark:BENCHMARK RESULTS
 INFO:_.KafkaBenchmark:=
 INFO:_.KafkaBenchmark:Single producer, no replication:
 684097.470208
 rec/sec (65.24 MB/s)
 INFO:_.KafkaBenchmark:Single producer, async 3x replication:
 667494.359673 rec/sec (63.66 MB/s)
 INFO:_.KafkaBenchmark:Single producer, sync 3x replication:
 116485.764275 rec/sec (11.11 MB/s)
 INFO:_.KafkaBenchmark:Three producers, async 3x replication:
 1696519.022182 rec/sec (161.79 MB/s)
 INFO:_.KafkaBenchmark:Message size:
 INFO:_.KafkaBenchmark: 10: 1637825.195625 rec/sec (15.62 MB/s)
 INFO:_.KafkaBenchmark: 100: 605504.877911 rec/sec (57.75 MB/s)
 INFO:_.KafkaBenchmark: 1000: 90351.817570 rec/sec (86.17 MB/s)
 INFO:_.KafkaBenchmark: 1: 8306.180862 rec/sec (79.21 MB/s)
 INFO:_.KafkaBenchmark: 10: 978.403499 rec/sec (93.31 MB/s)
 INFO:_.KafkaBenchmark:Throughput over long run, data  memory:
 INFO:_.KafkaBenchmark: Time block 0: 684725.151324 rec/sec
  (65.30
MB/s)
 INFO:_.KafkaBenchmark:Single consumer: 701031.14 rec/sec
   (56.830500
 MB/s)
 INFO:_.KafkaBenchmark:Three consumers: 3304011.014900 rec/sec
   (267.830800
 MB/s)
 INFO:_.KafkaBenchmark:Producer + consumer:
 INFO:_.KafkaBenchmark: Producer: 624984.375391 rec/sec (59.60
   MB/s)
 INFO:_.KafkaBenchmark: Consumer: 624984.375391 rec/sec (59.60
   MB/s)
 INFO:_.KafkaBenchmark:End-to-end latency: median 2.00 ms, 99%
 4.00

Re: Kafka brokers on HTTP port

2015-07-16 Thread Ewen Cheslack-Postava
Yes, those are potential issues if you make your Kafka cluster publicly
accessible. There are some features currently being worked on that could
help address these problems (some security on connections and quotas), but
they are still in progress. You'll probably want a proxy layer to handle
this.

As for auto-scaling, you can do this but you should be aware that Kafka
doesn't currently do automatic balancing of data as brokers are added.
You'll need to manage that process yourself when new brokers are added.

-Ewen

On Thu, Jul 16, 2015 at 7:35 PM, Chandrashekhar Kotekar 
shekhar.kote...@gmail.com wrote:

 Thanks  a lot Ewen and Edward for your valuable answers. According to new
 update from admin side, they can allow TCP only connections on Kafka
 brokers.

 Now another problem is that we want to keep Kafka brokers in AWS so that
 kafka brokers can be auto scaled in/out. As most of instances in AWS do not
 have public IP addresses, first we have to assign public IP to all Kafka
 brokers and keep them visible over internet.

 I would like to know if there will be any security issue like DoS attack or
 malicious user sending Kafka messages or something like that?


 Thanks,
 Chandrash3khar Kotekar
 Mobile - +91 8600011455

 On Fri, Jul 17, 2015 at 4:57 AM, Ewen Cheslack-Postava e...@confluent.io
 wrote:

  Chandrashekhar,
 
  If the firewall rules allow any TCP connection on those ports, you can
 just
  use Kafka directly and change the default port. If they actually verify
  that its HTTP traffic then you'd have to the REST Proxy Edward mentioned
 or
  another HTTP-based proxy.
 
  -Ewen
 
  On Thu, Jul 16, 2015 at 9:23 AM, Edward Ribeiro 
 edward.ribe...@gmail.com
  wrote:
 
   Maybe what you are looking for is Kafka REST Proxy:
   http://docs.confluent.io/1.0/kafka-rest/docs/intro.html
  
   Edward
  
   On Thu, Jul 16, 2015 at 10:24 AM, Chandrashekhar Kotekar 
   shekhar.kote...@gmail.com wrote:
  
Hi,
   
In my project Kafka producers won't be in the same network of Kafka
   brokers
and due to security reasons other ports are blocked.
   
I would like to know if it is possible to run Kafka brokers on HTTP
  port
(8080) so that Kafka producers will send Kafka messages over HTTP and
brokers can store them until consumers consume them.
   
I tried to search for this type of question in mailing list but
  couldn't
find exact question/answer. Sorry if this is duplicate question.
   
Thanks,
Chandrash3khar Kotekar
Mobile - +91 8600011455
   
  
 
 
 
  --
  Thanks,
  Ewen
 




-- 
Thanks,
Ewen


Re: Kafka brokers on HTTP port

2015-07-16 Thread Ewen Cheslack-Postava
Chandrashekhar,

If the firewall rules allow any TCP connection on those ports, you can just
use Kafka directly and change the default port. If they actually verify
that its HTTP traffic then you'd have to the REST Proxy Edward mentioned or
another HTTP-based proxy.

-Ewen

On Thu, Jul 16, 2015 at 9:23 AM, Edward Ribeiro edward.ribe...@gmail.com
wrote:

 Maybe what you are looking for is Kafka REST Proxy:
 http://docs.confluent.io/1.0/kafka-rest/docs/intro.html

 Edward

 On Thu, Jul 16, 2015 at 10:24 AM, Chandrashekhar Kotekar 
 shekhar.kote...@gmail.com wrote:

  Hi,
 
  In my project Kafka producers won't be in the same network of Kafka
 brokers
  and due to security reasons other ports are blocked.
 
  I would like to know if it is possible to run Kafka brokers on HTTP port
  (8080) so that Kafka producers will send Kafka messages over HTTP and
  brokers can store them until consumers consume them.
 
  I tried to search for this type of question in mailing list but couldn't
  find exact question/answer. Sorry if this is duplicate question.
 
  Thanks,
  Chandrash3khar Kotekar
  Mobile - +91 8600011455
 




-- 
Thanks,
Ewen


Re: latency performance test

2015-07-16 Thread Ewen Cheslack-Postava
That's a good bet. Running with a few different buffer sizes would verify
this. Adjusting other settings (e.g. max in flight requests, as mentioned
earlier) could also affect latency and throughput.

-Ewen

On Thu, Jul 16, 2015 at 5:20 AM, Yuheng Du yuheng.du.h...@gmail.com wrote:

 Hi Ewen,

 Thank you for your patient explaining. It is very helpful.

 Can we assume that the long latency of ProducerPerformance comes from
 queuing delay in the buffer and it is related to buffer size?

 Thank you!

 best,
 Yuheng

 On Thu, Jul 16, 2015 at 12:21 AM, Ewen Cheslack-Postava e...@confluent.io
 
 wrote:

  The tests are meant to evaluate different things and the way they send
  messages is the source of the difference.
 
  EndToEndLatency works with a single message at a time. It produces the
  message then waits for the consumer to receive it. This approach
 guarantees
  there is no delay due to queuing. The goal with this test is to evaluate
  the *minimum* latency.
 
  ProducerPerformance focuses on achieving maximum throughput. This means
 it
  will enqueue lots of records so it will always have more data to send
 (and
  can use batching to increase the throughput). Unlike EndToEndLatency,
 this
  means records may just sit in a queue on the producer for awhile because
  the maximum number of in flight requests has been reached and it needs to
  wait for responses for those requests. Since EndToEndLatency only ever
 has
  one record outstanding, it will never encounter this case.
 
  Batching itself doesn't increase the latency because it only occurs when
  the producer is either a) already unable to send messages anyway or b)
  linger.ms is greater than 0, but the tests use the default setting that
  doesn't linger at all.
 
  In your example for ProducerPerformance, you have 100 byte records and
 will
  buffer up to 64MB. Given the batch size of 8K and default producer
 settings
  of 5 in flight requests, you can roughly think of one round trip time
  handling 5 * 8K = 40K bytes of data. If your roundtrip is 1ms, then if
 your
  buffer is full at 64MB it will take you 64 MB / (40 KB/ms) = 1638ms =
 1.6s.
  That means that the record that was added at the end of the buffer had to
  just sit in the buffer for 1.6s before it was sent off to the broker. And
  if your buffer is consistently full (which it should be for
  ProducerPerformance since it's sending as fast as it can), that means
  *every* record waits that long.
 
  Of course, these numbers are estimates, depend on my having used 1ms, but
  hopefully should make it clear why you can see relatively large
 latencies.
 
  -Ewen
 
 
  On Wed, Jul 15, 2015 at 1:38 AM, Yuheng Du yuheng.du.h...@gmail.com
  wrote:
 
   Hi,
  
   I have run the end to end latency test and the producerPerformance test
  on
   my kafka cluster according to
   https://gist.github.com/jkreps/c7ddb4041ef62a900e6c
  
   In end to end latency test, the latency was around 2ms. In
   producerperformance test, if use batch size 8196 to send 50,000,000
   records:
  
   bin/kafka-run-class.sh
  org.apache.kafka.clients.tools.ProducerPerformance
   speedx1 5000 100 -1 acks=1 bootstrap.servers=192.168.1.1:9092
   buffer.memory=67108864 batch.size=8196
  
  
   The results show that max latency is 3617ms, avg latency 626.7ms. I
 wanna
   know why the latency in producerperformance test is significantly
 larger
   than end to end test? Is it because of batching? Are the definitons of
   these two latencies different? I looked at the source code and I
 believe
   the latency is measure for the producer.send() function to complete. So
   does this latency includes transmission delay, transferring delay, and
  what
   other components?
  
  
   Thanks.
  
  
   best,
  
   Yuheng
  
 
 
 
  --
  Thanks,
  Ewen
 




-- 
Thanks,
Ewen


Re: [VOTE] 0.9.0.0 Candiate 1

2015-11-10 Thread Ewen Cheslack-Postava
Jun, not sure if this is just because of the RC vs being published on the
site, but the links in the release notes aren't pointing to
issues.apache.org. They're relative URLs instead of absolute.

-Ewen

On Tue, Nov 10, 2015 at 3:38 AM, Flavio Junqueira  wrote:

> -1 (non-binding)
>
> I'm getting an error with gradle when using the source artifact because it
> seems to be expecting a git repository here:
>
> line 68 of build.gradle: def repo = Grgit.open(project.file('.'))
>
> and I get this message:
> FAILURE: Build failed with an exception.
>
> * Where:
> Build file 'kafka-0.9.0.0-src/build.gradle' line: 68
>
> * What went wrong:
> A problem occurred evaluating root project 'kafka-0.9.0.0-src'.
> > repository not found: kafka-0.9.0.0-src
>
> The definitions for rat make sense when working on a git branch, but not
> for the release artifact. One
> way around this is to disable rat by commenting out the corresponding
> lines, but that isn't what the
> README file says. I'd rather have an RC that fixes this issue by possibly
> disabling rat altogether.
>
> -Flavio
>
> > On 10 Nov 2015, at 07:17, Jun Rao  wrote:
> >
> > This is the first candidate for release of Apache Kafka 0.9.0.0. This a
> > major release that includes (1) authentication (through SSL and SASL) and
> > authorization, (2) a new java consumer, (3) a Kafka connect framework for
> > data ingestion and egression, and (4) quotas. Since this is a major
> > release, we will give people a bit more time for trying this out.
> >
> > Release Notes for the 0.9.0.0 release
> >
> https://people.apache.org/~junrao/kafka-0.9.0.0-candidate1/RELEASE_NOTES.html
> >
> > *** Please download, test and vote by Thursday, Nov. 19, 11pm PT
> >
> > Kafka's KEYS file containing PGP keys we use to sign the release:
> > http://kafka.apache.org/KEYS in addition to the md5, sha1
> > and sha2 (SHA256) checksum.
> >
> > * Release artifacts to be voted upon (source and binary):
> > https://people.apache.org/~junrao/kafka-0.9.0.0-candidate1/
> >
> > * Maven artifacts to be voted upon prior to release:
> > https://repository.apache.org/content/groups/staging/
> >
> > * scala-doc
> > https://people.apache.org/~junrao/kafka-0.9.0.0-candidate1/scaladoc/
> >
> > * java-doc
> > https://people.apache.org/~junrao/kafka-0.9.0.0-candidate1/javadoc/
> >
> > * The tag to be voted upon (off the 0.9.0 branch) is the 0.9.0.0 tag
> >
> https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=6cee4f38aba612209b0a8171736c6e2948c35b6f
> >
> > * Documentation
> > http://kafka.apache.org/090/documentation.html
> >
> > /***
> >
> > Thanks,
> >
> > Jun
>
>


-- 
Thanks,
Ewen


Re: Ephemeral ports for Kafka broker

2015-11-11 Thread Ewen Cheslack-Postava
Passing 0 as the port should let you do this. This is how we get the tests
to work without assuming a specific port is available. The
KafkaServer.boundPort(SecurityProtocol) method can be used to get the port
that was bound.

-Ewen

On Tue, Nov 10, 2015 at 11:23 PM, Hemanth Yamijala 
wrote:

> Hello,
>
> Is there any way to specify that the Kafka broker should bind to any
> available port. This could be used, maybe for an embedded Kafka instance,
> etc.
>
> Thanks
> Hemanth
>



-- 
Thanks,
Ewen


Re: kafka connect(copycat) question

2015-11-10 Thread Ewen Cheslack-Postava
Hi Venkatesh,

If you're using the default settings included in the sample configs, it'll
expect JSON data in a special format to support passing schemas along with
the data. This is turned on by default because it makes it possible to work
with a *lot* more connectors and data storage systems (many require
schemas!), though it does mean consuming regular JSON data won't work out
of the box. You can easily switch this off by changing these lines in the
worker config:

key.converter.schemas.enable=true
value.converter.schemas.enable=true

to be false instead. However, note that this will only work with connectors
that can work with "schemaless" data. This wouldn't work for, e.g., writing
Avro files in HDFS since they need schema information, but it might work
for other formats. This would allow you to consume JSON data from any topic
it already existed in.

Note that JSON is not the only format you can use. You can also substitute
other implementations of the Converter interface. Confluent has implemented
an Avro version that works well with our schema registry (
https://github.com/confluentinc/schema-registry/tree/master/avro-converter).
The JSON implementation made sense to add as the one included with Kafka
simply because it didn't introduce any other dependencies that weren't
already in Kafka. It's also possible to write implementations for other
formats (e.g. Thrift, Protocol Buffers, Cap'n Proto, MessagePack, and
more), but I'm not aware of anyone who has started to tackle those
converters yet.

-Ewen

On Tue, Nov 10, 2015 at 1:23 PM, Venkatesh Rudraraju <
venkatengineer...@gmail.com> wrote:

> Hi,
>
> I am trying out the new kakfa connect service.
>
> version : kafka_2.11-0.9.0.0
> mode: standalone
>
> I have a conceptual question on the service.
>
> Can I just start a sink connector which reads from Kafka and writes to say
> HDFS ?
> From what I have tried, it's expecting a source-connector as well because
> the sink-connector is expecting a particular pattern of the message in
> kafka-topic.
>
> Thanks,
> Venkat
>



-- 
Thanks,
Ewen


Re: kafka connect(copycat) question

2015-11-12 Thread Ewen Cheslack-Postava
Yes, though it's still awaiting some updates after some renaming and API
modifications that happened in Kafka recently.

-Ewen

On Thu, Nov 12, 2015 at 9:10 AM, Venkatesh Rudraraju <
venkatengineer...@gmail.com> wrote:

> Ewen,
>
> How do I use a HDFSSinkConnector. I see the sink as part of a confluent
> project (
>
> https://github.com/confluentinc/copycat-hdfs/blob/master/src/main/java/io/confluent/copycat/hdfs/HdfsSinkConnector.java
> ).
> Does it mean that I build this project and add the jar to kafka libs ?
>
>
>
>
> On Tue, Nov 10, 2015 at 9:35 PM, Ewen Cheslack-Postava <e...@confluent.io>
> wrote:
>
> > Venkatesh,
> >
> > 1. It only works with quotes because the message needs to be parsed as
> JSON
> > -- a bare string without quotes is not valid JSON. If you're just using a
> > file sink, you can also try the StringConverter, which only supports
> > strings and uses a fixed schema, but is also very easy to use since it
> has
> > minimal requirements. It's really meant for demonstration purposes more
> > than anything else, but may be helpful just to get up and running.
> > 2. Which JsonParser error? When processing a message fails, we need to be
> > careful about how we handle it. Currently it will not proceed if it can't
> > process a message since for a lot of applications it isn't acceptable to
> > drop messages. By default, we want at least once semantics, with exactly
> > once as long as we don't encounter any crashes or network errors. Manual
> > intervention is currently required in that case.
> >
> > -Ewen
> >
> > On Tue, Nov 10, 2015 at 8:58 PM, Venkatesh Rudraraju <
> > venkatengineer...@gmail.com> wrote:
> >
> > > Hi Ewen,
> > >
> > > Thanks for the explanation. with your suggested setting, I was able to
> > > start just a sink connector like below :
> > >
> > > >* bin/connect-standalone.sh config/connect-standalone.properties
> > > config/connect-file-sink.properties*
> > >
> > > But I have a couple of issues yet,
> > > 1) Since I am only testing a simple file sink connector, I am manually
> > > producing some messages to the 'connect-test' kafka topic, where the
> > > sink-Task is reading from. And it works only if the message is within
> > > double-quotes.
> > > 2) Once I hit the above JsonParser error on the SinkTask, the connector
> > is
> > > hung, doesn't take any more messages even proper ones.
> > >
> > >
> > > On Tue, Nov 10, 2015 at 1:59 PM, Ewen Cheslack-Postava <
> > e...@confluent.io>
> > > wrote:
> > >
> > > > Hi Venkatesh,
> > > >
> > > > If you're using the default settings included in the sample configs,
> > > it'll
> > > > expect JSON data in a special format to support passing schemas along
> > > with
> > > > the data. This is turned on by default because it makes it possible
> to
> > > work
> > > > with a *lot* more connectors and data storage systems (many require
> > > > schemas!), though it does mean consuming regular JSON data won't work
> > out
> > > > of the box. You can easily switch this off by changing these lines in
> > the
> > > > worker config:
> > > >
> > > > key.converter.schemas.enable=true
> > > > value.converter.schemas.enable=true
> > > >
> > > > to be false instead. However, note that this will only work with
> > > connectors
> > > > that can work with "schemaless" data. This wouldn't work for, e.g.,
> > > writing
> > > > Avro files in HDFS since they need schema information, but it might
> > work
> > > > for other formats. This would allow you to consume JSON data from any
> > > topic
> > > > it already existed in.
> > > >
> > > > Note that JSON is not the only format you can use. You can also
> > > substitute
> > > > other implementations of the Converter interface. Confluent has
> > > implemented
> > > > an Avro version that works well with our schema registry (
> > > >
> > >
> >
> https://github.com/confluentinc/schema-registry/tree/master/avro-converter
> > > > ).
> > > > The JSON implementation made sense to add as the one included with
> > Kafka
> > > > simply because it didn't introduce any other dependencies that
> weren't
> > > > already in Kafka. It's also possible to write implementations for
>

Re: kafka connect(copycat) question

2015-11-16 Thread Ewen Cheslack-Postava
Sorry, there was an out of date reference in the pom.xml, the version on
master should build fine now.

-Ewen

On Sat, Nov 14, 2015 at 1:54 PM, Venkatesh Rudraraju <
venkatengineer...@gmail.com> wrote:

> I tried building copycat-hdfs but its not able to pull dependencies from
> maven...
>
> error trace :
> ---
>  Failed to execute goal on project kafka-connect-hdfs: Could not resolve
> dependencies for project
> io.confluent:kafka-connect-hdfs:jar:2.0.0-SNAPSHOT: The following artifacts
> could not be resolved: org.apache.kafka:connect-api:jar:0.9.0.0,
> io.confluent:kafka-connect-avro-converter:jar:2.0.0-SNAPSHOT,
> io.confluent:common-config:jar:2.0.0-SNAPSHOT: Could not find artifact
> org.apache.kafka:connect-api:jar:0.9.0.0 in confluent
>
> On Thu, Nov 12, 2015 at 2:59 PM, Ewen Cheslack-Postava <e...@confluent.io>
> wrote:
>
> > Yes, though it's still awaiting some updates after some renaming and API
> > modifications that happened in Kafka recently.
> >
> > -Ewen
> >
> > On Thu, Nov 12, 2015 at 9:10 AM, Venkatesh Rudraraju <
> > venkatengineer...@gmail.com> wrote:
> >
> > > Ewen,
> > >
> > > How do I use a HDFSSinkConnector. I see the sink as part of a confluent
> > > project (
> > >
> > >
> >
> https://github.com/confluentinc/copycat-hdfs/blob/master/src/main/java/io/confluent/copycat/hdfs/HdfsSinkConnector.java
> > > ).
> > > Does it mean that I build this project and add the jar to kafka libs ?
> > >
> > >
> > >
> > >
> > > On Tue, Nov 10, 2015 at 9:35 PM, Ewen Cheslack-Postava <
> > e...@confluent.io>
> > > wrote:
> > >
> > > > Venkatesh,
> > > >
> > > > 1. It only works with quotes because the message needs to be parsed
> as
> > > JSON
> > > > -- a bare string without quotes is not valid JSON. If you're just
> > using a
> > > > file sink, you can also try the StringConverter, which only supports
> > > > strings and uses a fixed schema, but is also very easy to use since
> it
> > > has
> > > > minimal requirements. It's really meant for demonstration purposes
> more
> > > > than anything else, but may be helpful just to get up and running.
> > > > 2. Which JsonParser error? When processing a message fails, we need
> to
> > be
> > > > careful about how we handle it. Currently it will not proceed if it
> > can't
> > > > process a message since for a lot of applications it isn't acceptable
> > to
> > > > drop messages. By default, we want at least once semantics, with
> > exactly
> > > > once as long as we don't encounter any crashes or network errors.
> > Manual
> > > > intervention is currently required in that case.
> > > >
> > > > -Ewen
> > > >
> > > > On Tue, Nov 10, 2015 at 8:58 PM, Venkatesh Rudraraju <
> > > > venkatengineer...@gmail.com> wrote:
> > > >
> > > > > Hi Ewen,
> > > > >
> > > > > Thanks for the explanation. with your suggested setting, I was able
> > to
> > > > > start just a sink connector like below :
> > > > >
> > > > > >* bin/connect-standalone.sh config/connect-standalone.properties
> > > > > config/connect-file-sink.properties*
> > > > >
> > > > > But I have a couple of issues yet,
> > > > > 1) Since I am only testing a simple file sink connector, I am
> > manually
> > > > > producing some messages to the 'connect-test' kafka topic, where
> the
> > > > > sink-Task is reading from. And it works only if the message is
> within
> > > > > double-quotes.
> > > > > 2) Once I hit the above JsonParser error on the SinkTask, the
> > connector
> > > > is
> > > > > hung, doesn't take any more messages even proper ones.
> > > > >
> > > > >
> > > > > On Tue, Nov 10, 2015 at 1:59 PM, Ewen Cheslack-Postava <
> > > > e...@confluent.io>
> > > > > wrote:
> > > > >
> > > > > > Hi Venkatesh,
> > > > > >
> > > > > > If you're using the default settings included in the sample
> > configs,
> > > > > it'll
> > > > > > expect JSON data in a special format to support passing schemas
> > along
> > > > > with
> > > > > > the 

Re: where do I get the Kafka classes

2015-11-16 Thread Ewen Cheslack-Postava
Hi Adaryl,

First, it looks like you might be trying to use the old producer interface.
That interface is going to be deprecated in favor of the new producer
(under org.apache.kafka.clients.producer). I'd highly recommend using the
new producer interface instead.

Second, perhaps this repository of examples will be a helpful starting
point: https://github.com/confluentinc/examples It's just a few basic
examples, but also includes the necessary Maven build scripts. For example,
the couple of lines after the highlighted one here:
https://github.com/confluentinc/examples/blob/master/producer/pom.xml#L32
will include the necessary jar that includes the new producer.

-Ewen

On Mon, Nov 16, 2015 at 10:16 PM, Adaryl "Bob" Wakefield, MBA <
adaryl.wakefi...@hotmail.com> wrote:

> I'm somewhat new to java development and am studying how to write
> producers. The sample code I'm looking at has the following import
> statements:
>
> import kafka.javaapi.producer.Producer;
> import kafka.producer.KeyedMessage;
> import kafka.producer.ProducerConfig;
>
> The thing is, he doesn't use any packages that contain these classes.
> You're supposed to use Gradle to compile the code but I'm not a whiz with
> Gradle yet. I'm guessing that Gradle is somehow importing the necessary
> classes at compile time. If I didn't want to use Gradle, how would I go
> about just getting Kafka packages with the classes I need? I can't seem to
> find them by googling.
>
>
> Adaryl "Bob" Wakefield, MBA
> Principal
> Mass Street Analytics, LLC
> 913.938.6685
> www.linkedin.com/in/bobwakefieldmba
> Twitter: @BobLovesData
>



-- 
Thanks,
Ewen


Re: kafka connect(copycat) question

2015-11-10 Thread Ewen Cheslack-Postava
Venkatesh,

1. It only works with quotes because the message needs to be parsed as JSON
-- a bare string without quotes is not valid JSON. If you're just using a
file sink, you can also try the StringConverter, which only supports
strings and uses a fixed schema, but is also very easy to use since it has
minimal requirements. It's really meant for demonstration purposes more
than anything else, but may be helpful just to get up and running.
2. Which JsonParser error? When processing a message fails, we need to be
careful about how we handle it. Currently it will not proceed if it can't
process a message since for a lot of applications it isn't acceptable to
drop messages. By default, we want at least once semantics, with exactly
once as long as we don't encounter any crashes or network errors. Manual
intervention is currently required in that case.

-Ewen

On Tue, Nov 10, 2015 at 8:58 PM, Venkatesh Rudraraju <
venkatengineer...@gmail.com> wrote:

> Hi Ewen,
>
> Thanks for the explanation. with your suggested setting, I was able to
> start just a sink connector like below :
>
> >* bin/connect-standalone.sh config/connect-standalone.properties
> config/connect-file-sink.properties*
>
> But I have a couple of issues yet,
> 1) Since I am only testing a simple file sink connector, I am manually
> producing some messages to the 'connect-test' kafka topic, where the
> sink-Task is reading from. And it works only if the message is within
> double-quotes.
> 2) Once I hit the above JsonParser error on the SinkTask, the connector is
> hung, doesn't take any more messages even proper ones.
>
>
> On Tue, Nov 10, 2015 at 1:59 PM, Ewen Cheslack-Postava <e...@confluent.io>
> wrote:
>
> > Hi Venkatesh,
> >
> > If you're using the default settings included in the sample configs,
> it'll
> > expect JSON data in a special format to support passing schemas along
> with
> > the data. This is turned on by default because it makes it possible to
> work
> > with a *lot* more connectors and data storage systems (many require
> > schemas!), though it does mean consuming regular JSON data won't work out
> > of the box. You can easily switch this off by changing these lines in the
> > worker config:
> >
> > key.converter.schemas.enable=true
> > value.converter.schemas.enable=true
> >
> > to be false instead. However, note that this will only work with
> connectors
> > that can work with "schemaless" data. This wouldn't work for, e.g.,
> writing
> > Avro files in HDFS since they need schema information, but it might work
> > for other formats. This would allow you to consume JSON data from any
> topic
> > it already existed in.
> >
> > Note that JSON is not the only format you can use. You can also
> substitute
> > other implementations of the Converter interface. Confluent has
> implemented
> > an Avro version that works well with our schema registry (
> >
> https://github.com/confluentinc/schema-registry/tree/master/avro-converter
> > ).
> > The JSON implementation made sense to add as the one included with Kafka
> > simply because it didn't introduce any other dependencies that weren't
> > already in Kafka. It's also possible to write implementations for other
> > formats (e.g. Thrift, Protocol Buffers, Cap'n Proto, MessagePack, and
> > more), but I'm not aware of anyone who has started to tackle those
> > converters yet.
> >
> > -Ewen
> >
> > On Tue, Nov 10, 2015 at 1:23 PM, Venkatesh Rudraraju <
> > venkatengineer...@gmail.com> wrote:
> >
> > > Hi,
> > >
> > > I am trying out the new kakfa connect service.
> > >
> > > version : kafka_2.11-0.9.0.0
> > > mode: standalone
> > >
> > > I have a conceptual question on the service.
> > >
> > > Can I just start a sink connector which reads from Kafka and writes to
> > say
> > > HDFS ?
> > > From what I have tried, it's expecting a source-connector as well
> because
> > > the sink-connector is expecting a particular pattern of the message in
> > > kafka-topic.
> > >
> > > Thanks,
> > > Venkat
> > >
> >
> >
> >
> > --
> > Thanks,
> > Ewen
> >
>
>
>
> --
> Victory awaits him who has everything in order--luck, people call it.
>



-- 
Thanks,
Ewen


Re: Kafka 090 maven coordinate

2015-11-03 Thread Ewen Cheslack-Postava
0.9.0.0 is not released yet, but the last blockers are being addressed and
release candidates should follow soon. The docs there are just staged as we
prepare for the release (note, e.g., that the latest release on the
downloads page http://kafka.apache.org/downloads.html is still 0.8.2.2).

-Ewen

On Tue, Nov 3, 2015 at 10:57 AM, Fajar Maulana Firdaus 
wrote:

> Hi,
>
> I saw that there is new kafka client 0.9.0 in here:
> http://kafka.apache.org/090/javadoc/index.html So what is the maven
> coordinate for this version? I am asking this because it has
> KafkaConsumer api which doesn't exist in 0.8.2
>
> Thank you
>



-- 
Thanks,
Ewen


Re: Difference between storing offset on Kafka and on Zookeeper server?

2015-10-15 Thread Ewen Cheslack-Postava
There are a couple of advantages. First, scalability. Writes to Kafka are
cheaper than writes to ZK. Kafka-based offset storage is going to be able
to handle significantly more consumers (and scale out as needed since
writes are spread across all partitions in the offsets topic). Second, once
you move offsets *and* consumer group coordination off of ZK, then clients
don't need access to ZK at all.

-Ewen

On Wed, Oct 14, 2015 at 10:09 PM, Kiran Singh  wrote:

> What are the major advantage to store Offset on kafka server instead of
> zookeeper?
>
> Please share any link for the same.
>



-- 
Thanks,
Ewen


Re: Partition ownership with high-level consumer

2015-10-07 Thread Ewen Cheslack-Postava
And you can get the current assignment in the new consumer after the
rebalance completes too:
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L593

On Tue, Oct 6, 2015 at 5:27 PM, Gwen Shapira  wrote:

> Ah, so the new consumer got that one:
>
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java#L88
>
> On Tue, Oct 6, 2015 at 3:55 PM, Joey Echeverria  wrote:
>
> > I really only want them for the partitions I own. The client should know
> > that in order to acquire the zookeeper locks and could potentially
> execute
> > a callback to tell me the partitions I own after a rebalance.
> >
> > -Joey
> >
> > On Tue, Oct 6, 2015 at 4:08 PM, Gwen Shapira  wrote:
> >
> > > I don't think so. AFAIK, even the new API won't send this information
> to
> > > every consumer, because in some cases it can be huge.
> > >
> > >
> > >
> > > On Tue, Oct 6, 2015 at 1:44 PM, Joey Echeverria 
> wrote:
> > >
> > > > But nothing in the API?
> > > >
> > > > -Joey
> > > >
> > > > On Tue, Oct 6, 2015 at 3:43 PM, Gwen Shapira 
> > wrote:
> > > >
> > > > > Zookeeper will have this information under /consumers/ > > >/owners
> > > > >
> > > > >
> > > > >
> > > > > On Tue, Oct 6, 2015 at 12:22 PM, Joey Echeverria 
> > > > wrote:
> > > > >
> > > > > > Hi!
> > > > > >
> > > > > > Is there a way to track current partition ownership when using
> the
> > > > > > high-level consumer? It looks like the rebalance callback only
> > tells
> > > me
> > > > > the
> > > > > > partitions I'm (potentially) losing.
> > > > > >
> > > > > > -Joey
> > > > > >
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > Joey Echeverria
> > > > Director of Engineering
> > > >
> > >
> >
> >
> >
> > --
> > Joey Echeverria
> > Director of Engineering
> >
>



-- 
Thanks,
Ewen


Re: Kafka topic message consumer fetch response time checking

2015-10-20 Thread Ewen Cheslack-Postava
If you want the full round-trip latency, you need to measure that at the
client. The performance measurement tools should do this pretty accurately.
For example, if you just want to know how long it takes to produce a
message to Kafka and get an ack back, you can use the latency numbers
reported by ProducerPerformance, ensuring to use a low throughput to avoid
queuing (so you measure the latency rather than queuing delay). If you
wanted producer -> brokers -> consumer latency, use EndToEndLatency.

There is some startup cost, but they don't start timing anything until a
lot of the startup has already taken place and as long as you run a long
enough test, the impact of the initial startup cost shouldn't be noticeable.

-Ewen

On Mon, Oct 19, 2015 at 2:45 PM, David Luu  wrote:

> If one wanted to check response time for a consumer fetching a topic
> message (on the client side), similar to checking an HTTP request's
> response time for the web, what's the best approach to take?
>
> I notice the kafka shell scripts if used for that have some startup
> overhead if used to assess response times frequently.
>
> Is there a speedier CLI alternative, or will one have to write a custom
> script/program in one of the kafka language clients for less startup
> overhead for frequent periodic checks?
>
> Or is there a kafka server side metric that is sufficient to check for this
> (no need for client side checking, just look at the server metrics)?
>



-- 
Thanks,
Ewen


Re: kafka consumer shell scripts (and kafkacat) with respect to JSON pretty printing

2015-10-20 Thread Ewen Cheslack-Postava
You can accomplish this with the console consumer -- it has a formatter
flag that lets you plug in custom logic for formatting messages. The
default does not do any formatting, but if you write your own
implementation, you just need to set the flag to plug it in.

You can see an example of this in Confluent's Avro support:
https://github.com/confluentinc/schema-registry/blob/master/avro-serializer/src/main/java/io/confluent/kafka/formatter/AvroMessageFormatter.java
This deserializes using Avro and writes using Avro's JSON encoding to get
nice, human-readable output. There are also wrapper scripts included to set
the appropriate flags so it is as easy to use as the normal console
consumer scripts.

If you'd like something similar, our next release will have JSON
serializers (see
https://github.com/confluentinc/schema-registry/tree/master/json-serializer)
but does not yet include the corresponding formatter. However, the
formatter implementation should look very similar to the deserializer.

-Ewen

-Ewen


On Fri, Oct 16, 2015 at 7:10 PM, David Luu  wrote:

> I was wondering, do the kafka consumer shell scripts (high and low level
> ones) and kafkacat do any pre-processing of the topic messages before
> outputting to stdout or does it just output "as is" in the format the
> message originally came in through kafka from the producer?
>
> Meaning pretty printed JSON produced is consumed as pretty printed JSON,
> line delimited JSON blobs (not pretty printed) is consumed the same way.
>
> I'm asking this as I notice some topics I'm consuming with the shell
> scripts (and kafkacat) are pretty printed and some not. So just wanted to
> confirm this while also checking with the developers of the topic producers
> on whether they are pretty printing on their end or not.
>
> In general, I'm assuming as best practice, it's better not to pretty print
> the JSON as a producer to save on message size for network transmission and
> file storage by kafka, since those extra newlines and spaces/tabs add up
> over time?
>



-- 
Thanks,
Ewen


Re: Documentation for 0.8.2 kafka-client api?

2015-10-08 Thread Ewen Cheslack-Postava
ConsumerConnector is part of the old consumer API (which is what is
currently released; new consumer is coming in 0.9.0). That class is not in
kafka-clients, it is in the core Kafka jar, which is named with the Scala
version you want to use, e.g. kafka_2.10.

-Ewen

On Thu, Oct 8, 2015 at 1:24 PM, Feroze Daud 
wrote:

> hi!
> where can I find a quickstart doc for kafka-client java api version 0.8.2 ?
> The documentation at http://kafka.apache.org/documentation.html does not
> seem to sync with the 0.8.2 API in the kafka-clients artifact.
> Specifically, I cannot find the class ConsumerConnector that is referenced
> here:
> https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example
> Thanks
> feroze.
>



-- 
Thanks,
Ewen


Re: Better API/Javadocs?

2015-10-11 Thread Ewen Cheslack-Postava
On Javadocs, both new clients (producer and consumer) have very thorough
documentation in the javadocs. 0.9.0.0 will be the first release with the
new consumer.

On deserialization, the new consumer lets you specify deserializers just
like you do for the new producer. But the old consumer supports this as
well -- you can use the createMessageStreams(topicCountMap, keyDecoder,
valueDecoder) form to specify how keys and values should be decoded from
byte[] to a Java object.

On JSON, generally Kafka has only provided very simple
serializers/deserializers that don't introduce any additional dependencies
-- byte[], string, int, etc. Essentially the built in support is only for a
few primitive types. There actually is a JsonSerializer and
JsonDeserializer now in trunk (and will be in 0.9.0.0) because Copycat,
Kafka's new import/export tool, needs to ship with *some* serializer that
can handle complex data. However, I'm not sure it works quite like what you
probably want -- it uses Jackson JsonNodes, whereas I'm guessing you'd want
to be able to pass in any POJO.

The next version of Confluent Platform will ship with a JsonSerializer that
has the behavior I think you're looking for -- see
https://github.com/confluentinc/schema-registry/tree/master/json-serializer/src/main/java/io/confluent/kafka/serializers.
It's also been integrated with Confluent's REST proxy.

-Ewen

On Sun, Oct 11, 2015 at 9:04 AM, Andrew Pennebaker <
andrew.penneba...@gmail.com> wrote:

> Will Kafka v0.9 publish official javadocs for the entire API? In 0.8,
> javadocs appear rather sparse. It's hard to find a javadoc that documents
> both Consumers and Producers.
>
> Also, will future versions of Kafka have more intuitive
> serializer/deserializer interfaces? E.g., if a Producer can configure an
> automatic POJO -> byte[] serializer, why does the Consumer API not have the
> option to configure an automatic byte[] -> POJO deserializer?
>
> Could a basic JSON serializer/deserializer be included? JSON's a common
> enough wire format that I think it's reasonable to include a decent one by
> default, so Kafka users aren't reinventing that wheel.
>
> --
> Cheers,
> Andrew
>



-- 
Thanks,
Ewen


Re: Kafka 0.9.0 release branch

2015-10-13 Thread Ewen Cheslack-Postava
Yes, 0.9 will include the new consumer.

On Tue, Oct 13, 2015 at 12:50 PM, Rajiv Kurian  wrote:

> A bit off topic but does this release contain the new single threaded
> consumer that supports the poll interface?
>
> Thanks!
>
> On Mon, Oct 12, 2015 at 1:31 PM, Jun Rao  wrote:
>
> > Hi, Everyone,
> >
> > As we are getting closer to the 0.9.0 release, we plan to cut an 0.9.0
> > release branch in about two weeks from now. In the meantime, we will try
> to
> > resolve most if not all 0.9.0 blockers listed below.
> >
> >
> >
> https://issues.apache.org/jira/issues/?jql=project%20%3D%20KAFKA%20AND%20status%20in%20(Open%2C%20%22In%20Progress%22%2C%20Reopened%2C%20%22Patch%20Available%22)%20AND%20priority%20%3D%20Blocker%20AND%20fixVersion%20%3D%200.9.0.0%20ORDER%20BY%20updated%20DESC
> >
> > Are there any concerns? We will also discuss the 0.9.0 release in
> > tomorrow's KIP meeting.
> >
> > Thanks,
> >
> > Jun
> >
>



-- 
Thanks,
Ewen


Re: [kafka-clients] Kafka 0.9.0 release branch

2015-10-13 Thread Ewen Cheslack-Postava
Not sure if I'd call it a blocker, but if we can get it in I would *really*
like to see some solution to
https://issues.apache.org/jira/browse/KAFKA-2397 committed. Without an
explicit leave group, even normal operation of groups can leave some
partitions unprocessed for 30-60s at a time under *normal* circumstances
like a graceful shutdown/restart. It might be manageable, but that's a
pretty large availability gap...

Not that the length of tests is critical, but this is also very obvious in
system tests. The copycat tests take about 50% longer than they should
because they have to wait for the session timeout.

-Ewen

On Mon, Oct 12, 2015 at 1:31 PM, Jun Rao  wrote:

> Hi, Everyone,
>
> As we are getting closer to the 0.9.0 release, we plan to cut an 0.9.0
> release branch in about two weeks from now. In the meantime, we will try to
> resolve most if not all 0.9.0 blockers listed below.
>
>
> https://issues.apache.org/jira/issues/?jql=project%20%3D%20KAFKA%20AND%20status%20in%20(Open%2C%20%22In%20Progress%22%2C%20Reopened%2C%20%22Patch%20Available%22)%20AND%20priority%20%3D%20Blocker%20AND%20fixVersion%20%3D%200.9.0.0%20ORDER%20BY%20updated%20DESC
>
> Are there any concerns? We will also discuss the 0.9.0 release in
> tomorrow's KIP meeting.
>
> Thanks,
>
> Jun
>
> --
> You received this message because you are subscribed to the Google Groups
> "kafka-clients" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to kafka-clients+unsubscr...@googlegroups.com.
> To post to this group, send email to kafka-clie...@googlegroups.com.
> Visit this group at http://groups.google.com/group/kafka-clients.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/kafka-clients/CAFc58G_4SSzTR%3D0%3DRCVheS_3zfGXfNfqzGPQ2-TvmMT%3Dz0UObQ%40mail.gmail.com
> 
> .
> For more options, visit https://groups.google.com/d/optout.
>



-- 
Thanks,
Ewen


Re: Http Kafka producer

2015-08-27 Thread Ewen Cheslack-Postava
Marc,

As Erik said, the behavior I was referring to is for the standard Kafka
producer, which the REST proxy uses for sending data to Kafka. Batches are
not of a predetermined fixed size or anything like that. Internally, the
producer maintains a queue of messages, keeping track of both outstanding
requests (we've sent the request to the broker, but not seen the ack yet)
and requests that we can't yet send (we already have
max.in.flight.requests.per.connection requests to the broker). If you have
nothing in the queue, you'll always be able to send immediately. The
linger.ms setting is one of the settings that controls batching. By default
it is set to 0, meaning that if it is able to send a new request (has at
least one more request allowed for the connection), it will not wait at all
for any more messages. So if the rate of messages is low enough, messages
will always be sent out immediately. If it is higher, batching will
automatically occur on messages that have to wait to be sent -- as soon as
an ack from another message is received, the producer will look at what
messages need to be sent, collect *up to* a full batch (batch.size setting)
of them, and send out a request with that data.

If you turn linger.ms up, it delays messages for the specified number of
milliseconds, providing some time for other messages to collect before
sending out a message. You might want to do this if you are willing to risk
losing that data as it sits in the in-memory queue in order to avoid making
a bunch of smaller produce requests to the server.

Note that there is *always* a risk of losing some messages if they have to
sit in the buffer on the producer. The way to minimize this is to adjust
the settings to match your workload such that messages *never* wait to be
sent. However, beware that this can result in a *lot* of very small
messages to Kafka, and each message has overhead. Sending requests that
aggressively can impact performance and throughput.

-Ewen


On Thu, Aug 27, 2015 at 6:58 AM, Helleren, Erik erik.helle...@cmegroup.com
wrote:

 Hi Marc,
 That describes the behavior of the kafka producer library that batches
 writes to kafka.  This post on confluent.io explains it pretty well:
 http://kafka.apache.org/082/javadoc/index.html?org/apache/kafka/clients/pro
 ducer/KafkaProducer.html

 But the general idea is that the producer will group together a bunch of
 writes to kafka for a specific topic and partition, and then send them as
 a single request.

 Durability guarantees in kafka depend on your configuration, and can be
 very week, or very strong. Reading the kafka documentation page¹s sections
 about producers should make it clear which setting improve Durability at
 the cost of latency and throughput.  But there would be a risk of loosing
 the messages that are inside the proxy application during a failure,
 unless there is a replay ability from the source.
 -Erik




 On 8/27/15, 12:34 AM, Marc Bollinger m...@lumoslabs.com wrote:

 Apologies if this is somewhat redundant, I'm quite new to both Kafka and
 the Confluent Platform. Ewen, when you say Under the hood, the new
 producer will automatically batch requests.
 
 Do you mean that this is a current or planned behavior of the REST proxy?
 Are there any durability guarantees, or are batches just held in memory
 before being sent to Kafka (or some other option)?
 
 Thanks!
 
  On Aug 26, 2015, at 9:50 PM, Ewen Cheslack-Postava e...@confluent.io
 wrote:
 
  Hemanth,
 
  The Confluent Platform 1.0 version of have JSON embedded format support
  (i.e. direct embedding of JSON messages), but you can serialize, base64
  encode, and use the binary mode, paying a bit of overhead. However,
 since
  then we merged a patch to add JSON support:
  https://github.com/confluentinc/kafka-rest/pull/89 The JSON support
 does
  not interact with the schema registry at all. If you're ok building your
  own version from trunk you could use that, or this will be released with
  our next platform version.
 
  In the REST proxy, each HTTP requests will result in one call to
  producer.send(). Under the hood, the new producer will automatically
 batch
  requests. The default settings will only batch when it's necessary
 (because
  there are already too many outstanding requests, so messages pile up in
 the
  local buffer), so you get the advantages of batching, but with a lower
  request rate the messages will still be sent to the broker immediately.
 
  -Ewen
 
  On Wed, Aug 26, 2015 at 9:31 PM, Hemanth Abbina
 heman...@eiqnetworks.com
  wrote:
 
  Ewen,
 
  Thanks for the explanation.
 
  We have control over the logs format coming to HAProxy. Right now,
 these
  are plain JSON logs (just like syslog messages with few additional meta
  information) sent to HAProxy from remote clients using HTTPs. No
  serialization is used.
 
  Currently, we have one log each of the HTTP request. I understood that
  every request is produced individually without batching.
 
  Will this work

Re: Http Kafka producer

2015-08-26 Thread Ewen Cheslack-Postava
Hemanth,

Can you be a bit more specific about your setup? Do you have control over
the format of the request bodies that reach HAProxy or not? If you do,
Confluent's REST proxy should work fine and does not require the Schema
Registry. It supports both binary (encoded as base64 so it can be passed
via the JSON request body) and Avro. With Avro it uses the schema registry,
but the binary mode doesn't require it.

If you don't have control over the format, then the REST proxy is not
currently designed to support that use case. I don't think HAProxy can
rewrite request bodies (beyond per-line regexes, which would be hard to
make work), so that's not an option either. It would certainly be possible
to make a small addition to the REST proxy to allow binary request bodies
to be produced directly to a topic specified in the URL, though you'd be
paying pretty high overhead per message -- without the ability to batch,
you're doing one HTTP request per messages. This might not be bad if your
messages are large enough? (Then again, the same issue applies regardless
of what solution you end up with if each of the requests to HAProxy only
contains one message).

-Ewen



On Wed, Aug 26, 2015 at 5:05 PM, Hemanth Abbina heman...@eiqnetworks.com
wrote:

 Marc,

 Thanks for your response.  Let's have more details on the problem.

 As I already mentioned in the previous post, here is our expected data
 flow:  logs - HAProxy - {new layer } - Kafka Cluster

 The 'new layer' should receive logs as HTTP requests from HAproxy and
 produce the same logs to Kafka without loss.

 Options that seems to be available, are
 1. Flume: It has a HTTP source  Kafka sink, but the documentation says
 HTTP source is not for production use.
 2. Kafka Rest Proxy: Though this seems to be fine, adding another
 dependency of Schema Registry servers to validate the schema, which should
 be again used by the consumers.
 3. Custom plugin to handle this functionality: Though the functionality
 seems to be simple - scalability, reliability aspects and maintenance would
 be more.

 Thanks
 Hemanth

 -Original Message-
 From: Marc Bollinger [mailto:m...@lumoslabs.com]
 Sent: Thursday, August 27, 2015 4:39 AM
 To: users@kafka.apache.org
 Cc: dev-subscr...@kafka.apache.org
 Subject: Re: Http Kafka producer

 I'm actually also really interested in this...I had a chat about this on
 the distributed systems slack's http://dist-sys.slack.com Kafka channel
 a few days ago, but we're not much further than griping about the problem.
 We're basically migrating an existing event system, one which packed
 messages into files, waited for a time-or-space threshold to be crossed,
 then dealt with distribution in terms of files. Basically, we'd like to
 keep a lot of those semantics: we can acknowledge success on the app server
 as soon as we've flushed to disk, and rely on the filesystem for
 durability, and total order across the system doesn't matter, as the HTTP
 PUTs sending the messages are load balanced across many app servers. We
 also can tolerate [very] long downstream event system outages,
 because...we're ultimately just writing sequentially to disk, per process
 (I should mention that this part is in Rails, which means we're dealing
 largely in terms of processes, not threads).

 RocksDB was mentioned in the discussion, but spending exactly 5 minutes
 researching that solution, it seems like the dead simplest solution on an
 app server in terms of moving parts (multiple processes writing, one
 process reading/forwarding to Kafka) wouldn't work well with RocksDB.
 Although now that I'm looking at it more, it looks like they're working on
 a MySQL storage engine?

 Anyway yeah, I'd love some discussion on this, or war stories of migration
 to Kafka from other event systems (F/OSS or...bespoke).

 On Wed, Aug 26, 2015 at 3:45 PM, Hemanth Abbina heman...@eiqnetworks.com
 wrote:

  Hi,
 
  Our application receives events through a HAProxy server on HTTPs,
  which should be forwarded and stored to Kafka cluster.
 
  What should be the best option for this ?
  This layer should receive events from HAProxy  produce them to Kafka
  cluster, in a reliable and efficient way (and should scale horizontally).
 
  Please suggest.
 
  --regards
  Hemanth
 




-- 
Thanks,
Ewen


  1   2   3   4   >