ProducerData jar file
Hi Experts, kafka.javaapi.producer.ProducerData class is available in 0.8.1 ? I am unable to use it, I copied all the jars in /build/dependant-libs-2.10.1 and libs folders, any help ? -- Thanks, Kishore.
Re: ProducerData jar file
do i need to download this separately ? my requirement is ingest the data in my csv files into kafka, please help me how to do it with java code. On Thu, Dec 11, 2014 at 1:30 PM, kishore kumar akishore...@gmail.com wrote: Hi Experts, kafka.javaapi.producer.ProducerData class is available in 0.8.1 ? I am unable to use it, I copied all the jars in /build/dependant-libs-2.10.1 and libs folders, any help ? -- Thanks, Kishore. -- Thanks, Kishore.
Re: ProducerData jar file
Hi, You just need to include the libraries available in kafka/libs folder. Pl follow below example https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example On Thu, Dec 11, 2014 at 4:43 PM, kishore kumar akishore...@gmail.com wrote: do i need to download this separately ? my requirement is ingest the data in my csv files into kafka, please help me how to do it with java code. On Thu, Dec 11, 2014 at 1:30 PM, kishore kumar akishore...@gmail.com wrote: Hi Experts, kafka.javaapi.producer.ProducerData class is available in 0.8.1 ? I am unable to use it, I copied all the jars in /build/dependant-libs-2.10.1 and libs folders, any help ? -- Thanks, Kishore. -- Thanks, Kishore.
Re: How to Setup MirrorMaker in Generalized way
Hi Neha, Thanks for your reply. Now using MM tool to replicate data between Kafka clusters, But I am facing one problem, Messages gets duplicated if MM killed forcefully[ *kill -9* ]. Is there any solution to avoid this duplicated entry in target cluster? I am using Kafka *8.1.1.* On Mon, Dec 8, 2014 at 11:17 PM, Neha Narkhede n...@confluent.io wrote: Hi Madhukar, From the same documentation link you referred to - The source and destination clusters are completely independent entities: they can have different numbers of partitions and the offsets will not be the same. For this reason the mirror cluster is not really intended as a fault-tolerance mechanism (as the consumer position will be different); for that we recommend using normal in-cluster replication. The mirror maker process will, however, retain and use the message key for partitioning so order is preserved on a per-key basis. There is no way to setup an *exact* Kafka mirror yet. Thanks, Neha On Mon, Dec 8, 2014 at 7:47 AM, Madhukar Bharti bhartimadhu...@gmail.com wrote: Hi, I am going to setup Kafka clusters having 3 brokers in Datacenter 1. Topics can be created time to time. Each topic can have varying partitions mostly 1,10 or 20. Each application might have different partitioning algorithm that we don't know(let it be hidden from ops team). We want to setup mirror maker tool in such a way so that, the exact partitioned data should go to the same partition without knowing the Topics partition logic and it should be *generalized*. [This should be common for all Topics.] *like partition 0 at DataCenter1 should be exact mirror of partition-0 in Datacenter2*. Please suggest me a solution for doing so. If MirrorMaker http://kafka.apache.org/documentation.html#basic_ops_mirror_maker tool provide any configurations which solve this use-case please let me know. Regards, Madhukar Bharti -- Thanks, Neha -- Thanks and Regards, Madhukar Bharti
Very slow producer
Hi, I’m writing my own producer to read from text files, and send line by line to Kafka cluster. I notice that the producer is extremely slow. It's currently sending at ~57KB/node/s. This is like 50-100 times slower than using bin/kafka-console-producer.sh Here’s my producer: final File dir = new File(dataDir); ListFile files = new ArrayList(Arrays.asList(dir.listFiles())); int key = 0; for (final File file : files) { try { BufferedReader br = new BufferedReader(new FileReader(file)); for (String line = br.readLine(); line != null; line = br.readLine()) { KeyedMessageString, String data = new KeyedMessage(topic, Integer.toString(key++), line); producer.send(data); } } catch (IOException e) { e.printStackTrace(); } } And partitioner: public int partition(Object key, int numPartitions) { String stringKey = (String)key; return Integer.parseInt(stringKey) % numPartitions; } The only difference between kafka-console-producer.sh code and my code is that I use a custom partitioner. I have no idea why it’s so slow. Best regards,Huy, Le Van
Re: ProducerData jar file
hi Manikumar, As I mentioned in previous mail, I added the jars available in libs folder, but this class is not available in that jars, I am using cloudera's CLABS-KAFKA. On Thu, Dec 11, 2014 at 4:55 PM, Manikumar Reddy ku...@nmsworks.co.in wrote: Hi, You just need to include the libraries available in kafka/libs folder. Pl follow below example https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example On Thu, Dec 11, 2014 at 4:43 PM, kishore kumar akishore...@gmail.com wrote: do i need to download this separately ? my requirement is ingest the data in my csv files into kafka, please help me how to do it with java code. On Thu, Dec 11, 2014 at 1:30 PM, kishore kumar akishore...@gmail.com wrote: Hi Experts, kafka.javaapi.producer.ProducerData class is available in 0.8.1 ? I am unable to use it, I copied all the jars in /build/dependant-libs-2.10.1 and libs folders, any help ? -- Thanks, Kishore. -- Thanks, Kishore. -- Thanks, Kishore.
Re: ProducerData jar file
Hi, kafka.javaapi.producer.ProducerData class belongs to kafka 0.7/0.6. This class is removed from 0.8 on-wards. Pl try with 0.8.x API. Regards, Manikumar On Thu, Dec 11, 2014 at 8:10 PM, kishore kumar akishore...@gmail.com wrote: hi Manikumar, As I mentioned in previous mail, I added the jars available in libs folder, but this class is not available in that jars, I am using cloudera's CLABS-KAFKA. On Thu, Dec 11, 2014 at 4:55 PM, Manikumar Reddy ku...@nmsworks.co.in wrote: Hi, You just need to include the libraries available in kafka/libs folder. Pl follow below example https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example On Thu, Dec 11, 2014 at 4:43 PM, kishore kumar akishore...@gmail.com wrote: do i need to download this separately ? my requirement is ingest the data in my csv files into kafka, please help me how to do it with java code. On Thu, Dec 11, 2014 at 1:30 PM, kishore kumar akishore...@gmail.com wrote: Hi Experts, kafka.javaapi.producer.ProducerData class is available in 0.8.1 ? I am unable to use it, I copied all the jars in /build/dependant-libs-2.10.1 and libs folders, any help ? -- Thanks, Kishore. -- Thanks, Kishore. -- Thanks, Kishore.
there is no option --delete--version kafka_2.9.2-0.8.1.1
There is no option --delete--version kafka_2.9.2-0.8.1.1: 张宗军 乐居控股有限公司 运维技术部 地址:北京市东城区广渠家园5号楼首东国际大厦11层,100022 电话:15110221260 / 010-58951710 电邮:zongj...@leju.com 网址:www.leju.com 本邮件可能包含机密或专属资料和信息,供目标收件人用于特殊目的。严格禁止其他人改动、 使用、分发或披露。如果您不是目标收件人(或经过授权的收件人),请回复该邮件与发件人 联系,并删除该邮件的所有副本。
Re: OutOfMemoryException when starting replacement node.
Agree that the docs can be better. Perhaps you want to open a JIRA (at issues.apache.org) with this suggestion? On Wed, Dec 10, 2014 at 4:03 PM, Solon Gordon so...@knewton.com wrote: I see, thank you for the explanation. You might consider being more explicit about this in your documentation. We didn't realize we needed to take the (partitions * fetch size) calculation into account when choosing partition counts for our topics, so this is a bit of a rude surprise. On Wed, Dec 10, 2014 at 3:50 PM, Gwen Shapira gshap...@cloudera.com wrote: Ah, found where we actually size the request as partitions * fetch size. Thanks for the correction, Jay and sorry for the mix-up, Solon. On Wed, Dec 10, 2014 at 10:41 AM, Jay Kreps j...@confluent.io wrote: Hey Solon, The 10MB size is per-partition. The rationale for this is that the fetch size per-partition is effectively a max message size. However with so many partitions on one machine this will lead to a very large fetch size. We don't do a great job of scheduling these to stay under a memory bound today. Ideally the broker and consumer should do something intelligent to stay under a fixed memory budget, this is something we'd like to address as part of the new consumer. For now you need to either bump up your heap or decrease your fetch size. -jay On Wed, Dec 10, 2014 at 7:48 AM, Solon Gordon so...@knewton.com wrote: I just wanted to bump this issue to see if anyone has thoughts. Based on the error message it seems like the broker is attempting to consume nearly 2GB of data in a single fetch. Is this expected behavior? Please let us know if more details would be helpful or if it would be better for us to file a JIRA issue. We're using Kafka 0.8.1.1. Thanks, Solon On Thu, Dec 4, 2014 at 12:00 PM, Dmitriy Gromov dmit...@knewton.com wrote: Hi, We were recently trying to replace a broker instance and were getting an OutOfMemoryException when the new node was coming up. The issue happened during the log replication phase. We were able to circumvent this issue by copying over all of the logs to the new node before starting it. Details: - The heap size on the old and new node was 8GB. - There was about 50GB of log data to transfer. - There were 1548 partitions across 11 topics - We recently increased our num.replica.fetchers to solve the problem described here: https://issues.apache.org/jira/browse/KAFKA-1196. However, this process worked when we first changed that value. [2014-12-04 12:10:22,746] ERROR OOME with size 1867671283 (kafka.network. BoundedByteBufferReceive) java.lang.OutOfMemoryError: Java heap space at java.nio.HeapByteBuffer.init(HeapByteBuffer.java:57) at java.nio.ByteBuffer.allocate(ByteBuffer.java:331) at kafka.network.BoundedByteBufferReceive.byteBufferAllocate( BoundedByteBufferReceive.scala:80) at kafka.network.BoundedByteBufferReceive.readFrom( BoundedByteBufferReceive.scala:63) at kafka.network.Receive$class.readCompletely(Transmission.scala:56) at kafka.network.BoundedByteBufferReceive.readCompletely( BoundedByteBufferReceive.scala:29) at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100) at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:73) at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$ $sendRequest(SimpleConsumer.scala:71) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$ apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:109) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$ apply$mcV$sp$1.apply(SimpleConsumer.scala:109) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$ apply$mcV$sp$1.apply(SimpleConsumer.scala:109) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp( SimpleConsumer.scala:108) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply( SimpleConsumer.scala:108) at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply( SimpleConsumer.scala:108) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:107) at kafka.server.AbstractFetcherThread.processFetchRequest( AbstractFetcherThread.scala:96) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala: 88) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) Thank you
Re: Is Kafka documentation regarding null key misleading?
Guozhang, can you point me to the code that implements periodic/sticky random partitioner? I actually like to try it out in our env, even though I assume it is NOT ported to 0.8.2 java producer. Thanks, Steven On Mon, Dec 8, 2014 at 1:43 PM, Guozhang Wang wangg...@gmail.com wrote: Hi Yury, Originally the producer behavior under null-key is random random, but later changed to this periodic random to reduce the number of sockets on the server side: imagine if you have n brokers and m producers where m n, with random random distribution each server will need to maintain a socket with each of the m producers. We realized that this change IS misleading and we have changed back to random random in the new producer released in 0.8.2. Guozhang On Fri, Dec 5, 2014 at 10:43 AM, Andrew Jorgensen ajorgen...@twitter.com.invalid wrote: If you look under Producer configs you see the following key ‘ topic.metadata.refresh.interval.ms’ with a default of 600 * 1000 (10 minutes). It is not entirely clear but this controls how often a producer will a null key partitioner will switch partitions that it is writing to. In my production app I set this down to 1 minute and haven’t seen any ill effects but it is good to note that the shorter you get *could* cause some issues and extra overhead. I agree this could probably be a little more clear in the documentation. - Andrew Jorgensen @ajorgensen On December 5, 2014 at 1:34:00 PM, Yury Ruchin (yuri.ruc...@gmail.com) wrote: Hello, I've come across a (seemingly) strange situation when my Kafka producer gave so uneven distribution across partitions. I found that I used null key to produce messages, guided by the following clause in the documentation: If the key is null, then a random broker partition is picked. However, after looking at the code, I found that the broker partition is not truly random for every message - instead, the randomly picked partition number sticks and only refreshes after the topic.metadata.refresh.ms expires, which is 10 minutes by default. So, with null key the producer keeps writing to the same partition for 10 minutes. Is my understanding of partitioning with null key correct? If yes, shouldn't the documentation be fixed then to explicitly describe the sticky pseudo-random partition assignment? Thanks, Yury -- -- Guozhang
Re: Very slow producer
Did you set producer.type to async when creating your producer? The console producer uses async by default, but the default producer config is sync. -Ewen On Thu, Dec 11, 2014 at 6:08 AM, Huy Le Van huy.le...@insight-centre.org wrote: Hi, I’m writing my own producer to read from text files, and send line by line to Kafka cluster. I notice that the producer is extremely slow. It's currently sending at ~57KB/node/s. This is like 50-100 times slower than using bin/kafka-console-producer.sh Here’s my producer: final File dir = new File(dataDir); ListFile files = new ArrayList(Arrays.asList(dir.listFiles())); int key = 0; for (final File file : files) { try { BufferedReader br = new BufferedReader(new FileReader(file)); for (String line = br.readLine(); line != null; line = br.readLine()) { KeyedMessageString, String data = new KeyedMessage(topic, Integer.toString(key++), line); producer.send(data); } } catch (IOException e) { e.printStackTrace(); } } And partitioner: public int partition(Object key, int numPartitions) { String stringKey = (String)key; return Integer.parseInt(stringKey) % numPartitions; } The only difference between kafka-console-producer.sh code and my code is that I use a custom partitioner. I have no idea why it’s so slow. Best regards,Huy, Le Van -- Thanks, Ewen
How do I create a consumer group
We're using 0.82 beta and a homegrown c++ async library based on boost asio that has support for the offset api. (apikeys OffsetCommitRequest = 8, OffsetFetchRequest = 9, ConsumerMetadataRequest = 10) If we use a java client and commit an offset then the consumer group shows up in the response from ConsumerMetadataRequest. However I cant figure out how to create a new one using the Kafka API. Also, my __consumer_offsets topic shows up with a replication factor of 1. Is that changeable? thanks, svante
Given brokers, is it able to know all the zookeepers that brokers connect to
Hi Guys, If I know the brokers. Is there a way to know the zookeeper host from broker list? Thanks! Siyuan
Re: [DISCUSSION] adding the serializer api back to the new java producer
Thanks Jun. I think we all understand the motivation of adding serialization API back, but are just proposing different ways of doing such. I personally prefer to not bind the producer instance with a fixed serialization, but that said I am fine with the current proposal too as this can still be done via other workarounds. Guozhang On Tue, Dec 9, 2014 at 3:46 PM, Bhavesh Mistry mistry.p.bhav...@gmail.com wrote: Hi All, This is very likely when you have large site such as Linked-in and you have thousand of servers producing data. You will mixed bag of producer and serialization or deserialization because of incremental code deployment. So, it is best to keep the API as generic as possible and each org / company can wrap the generic API with how ever they fit with serialization/ de-serialization framework (java or proto buffer or avro or base 64). Keep the API as generic as possible. Thanks, Bhavesh On Tue, Dec 9, 2014 at 3:29 PM, Steven Wu stevenz...@gmail.com wrote: In practice the cases that actually mix serialization types in a single stream are pretty rare I think just because the consumer then has the problem of guessing how to deserialize, so most of these will end up with at least some marker or schema id or whatever that tells you how to read the data. Arguable this mixed serialization with marker is itself a serializer type and should have a serializer of its own... agree that it is unlikely to have mixed serialization format for one topic/type. But we sometimes/often create one Producer object for one cluster. and there can be many topics on this cluster. different topics may have different serialization formats. So I agree with Guozhang's point regarding data type flexibility of using simple byte[] (instead of generic K, V). On Fri, Dec 5, 2014 at 5:00 PM, Jay Kreps j...@confluent.io wrote: Hey Sriram, Thanks! I think this is a very helpful summary. Let me try to address your point about passing in the serde at send time. I think the first objection is really to the paired key/value serializer interfaces. This leads to kind of a weird combinatorial thing where you would have an avro/avro serializer a string/avro serializer, a pb/pb serializer, and a string/pb serializer, and so on. But your proposal would work as well with separate serializers for key and value. I think the downside is just the one you call out--that this is a corner case and you end up with two versions of all the apis to support it. This also makes the serializer api more annoying to implement. I think the alternative solution to this case and any other we can give people is just configuring ByteArraySerializer which gives you basically the api that you have now with byte arrays. If this is incredibly common then this would be a silly solution, but I guess the belief is that these cases are rare and a really well implemented avro or json serializer should be 100% of what most people need. In practice the cases that actually mix serialization types in a single stream are pretty rare I think just because the consumer then has the problem of guessing how to deserialize, so most of these will end up with at least some marker or schema id or whatever that tells you how to read the data. Arguable this mixed serialization with marker is itself a serializer type and should have a serializer of its own... -Jay On Fri, Dec 5, 2014 at 3:48 PM, Sriram Subramanian srsubraman...@linkedin.com.invalid wrote: This thread has diverged multiple times now and it would be worth summarizing them. There seems to be the following points of discussion - 1. Can we keep the serialization semantics outside the Producer interface and have simple bytes in / bytes out for the interface (This is what we have today). The points for this is to keep the interface simple and usage easy to understand. The points against this is that it gets hard to share common usage patterns around serialization/message validations for the future. 2. Can we create a wrapper producer that does the serialization and have different variants of it for different data formats? The points for this is again to keep the main API clean. The points against this is that it duplicates the API, increases the surface area and creates redundancy for a minor addition. 3. Do we need to support different data types per record? The current interface (bytes in/bytes out) lets you instantiate one producer and use it to send multiple data formats. There seems to be some valid use cases for this. I have still not seen a strong argument against not having this functionality. Can someone provide their views on why we don't need this support that is possible with the current API? One possible
Re: Very slow producer
Hi Ewen, Thank you for your response. It’s much faster after changing to async. Cheers,Huy, Le Van On Thursday, Dec 11, 2014 at 7:08 p.m., Ewen Cheslack-Postava e...@confluent.io, wrote: Did you set producer.type to async when creating your producer? The console producer uses async by default, but the default producer config is sync. -Ewen On Thu, Dec 11, 2014 at 6:08 AM, Huy Le Van wrote: Hi, I’m writing my own producer to read from text files, and send line by line to Kafka cluster. I notice that the producer is extremely slow. It's currently sending at ~57KB/node/s. This is like 50-100 times slower than using bin/kafka-console-producer.sh Here’s my producer: final File dir = new File(dataDir); List files = new ArrayList(Arrays.asList(dir.listFiles())); int key = 0; for (final File file : files) { try { BufferedReader br = new BufferedReader(new FileReader(file)); for (String line = br.readLine(); line != null; line = br.readLine()) { KeyedMessage data = new KeyedMessage(topic, Integer.toString(key++), line); producer.send(data); } } catch (IOException e) { e.printStackTrace(); } } And partitioner: public int partition(Object key, int numPartitions) { String stringKey = (String)key; return Integer.parseInt(stringKey) % numPartitions; } The only difference between kafka-console-producer.sh code and my code is that I use a custom partitioner. I have no idea why it’s so slow. Best regards,Huy, Le Van -- Thanks, Ewen ,@insight-centre.org
Re: Is Kafka documentation regarding null key misleading?
Steven, You can take a look at kafka.producer.async.DefaultEventHandler, in getPartition function. Guozhang On Thu, Dec 11, 2014 at 9:58 AM, Steven Wu stevenz...@gmail.com wrote: Guozhang, can you point me to the code that implements periodic/sticky random partitioner? I actually like to try it out in our env, even though I assume it is NOT ported to 0.8.2 java producer. Thanks, Steven On Mon, Dec 8, 2014 at 1:43 PM, Guozhang Wang wangg...@gmail.com wrote: Hi Yury, Originally the producer behavior under null-key is random random, but later changed to this periodic random to reduce the number of sockets on the server side: imagine if you have n brokers and m producers where m n, with random random distribution each server will need to maintain a socket with each of the m producers. We realized that this change IS misleading and we have changed back to random random in the new producer released in 0.8.2. Guozhang On Fri, Dec 5, 2014 at 10:43 AM, Andrew Jorgensen ajorgen...@twitter.com.invalid wrote: If you look under Producer configs you see the following key ‘ topic.metadata.refresh.interval.ms’ with a default of 600 * 1000 (10 minutes). It is not entirely clear but this controls how often a producer will a null key partitioner will switch partitions that it is writing to. In my production app I set this down to 1 minute and haven’t seen any ill effects but it is good to note that the shorter you get *could* cause some issues and extra overhead. I agree this could probably be a little more clear in the documentation. - Andrew Jorgensen @ajorgensen On December 5, 2014 at 1:34:00 PM, Yury Ruchin (yuri.ruc...@gmail.com) wrote: Hello, I've come across a (seemingly) strange situation when my Kafka producer gave so uneven distribution across partitions. I found that I used null key to produce messages, guided by the following clause in the documentation: If the key is null, then a random broker partition is picked. However, after looking at the code, I found that the broker partition is not truly random for every message - instead, the randomly picked partition number sticks and only refreshes after the topic.metadata.refresh.ms expires, which is 10 minutes by default. So, with null key the producer keeps writing to the same partition for 10 minutes. Is my understanding of partitioning with null key correct? If yes, shouldn't the documentation be fixed then to explicitly describe the sticky pseudo-random partition assignment? Thanks, Yury -- -- Guozhang -- -- Guozhang