Re: Java 8 stream consumer pattern
Thanks, Matthias, I had not considered this. I will take a look at the streams use case. Thanks On Wed, May 17, 2017 at 4:58 PM, Matthias J. Sax <matth...@confluent.io> wrote: > Did you try out Kafka Streams API instead of wrapping the consumer? It > does support Lambdas already: > > https://github.com/confluentinc/examples/blob/3. > 2.x/kafka-streams/src/main/java/io/confluent/examples/streams/ > MapFunctionLambdaExample.java#L126 > > Full docs: http://docs.confluent.io/current/streams/index.html > > > -Matthias > > On 5/17/17 1:45 PM, Robert Quinlivan wrote: > > Hello, > > > > I've been looking at writing a Java 8 streams API wrapper for the Kafka > > consumer. Since this seems like a common use case I was wondering if > > someone in the user community had already begun a project like this. > > > > My goal is to be able to get back a Stream<ConsumerRecord<K, V>> wrapping > > the results of #poll() which can then be passed into a map/filter > pipeline. > > I am using an underlying blocking queue data structure to buffer in > memory > > and using Stream.generate() to pull records. Any recommendations on a > best > > approach here? > > > > Thanks > > > > -- Robert Quinlivan Software Engineer, Signal
Java 8 stream consumer pattern
Hello, I've been looking at writing a Java 8 streams API wrapper for the Kafka consumer. Since this seems like a common use case I was wondering if someone in the user community had already begun a project like this. My goal is to be able to get back a Stream<ConsumerRecord<K, V>> wrapping the results of #poll() which can then be passed into a map/filter pipeline. I am using an underlying blocking queue data structure to buffer in memory and using Stream.generate() to pull records. Any recommendations on a best approach here? Thanks -- Robert Quinlivan Software Engineer, Signal
Re: Strict ordering of messages in Kafka
You can do it, but the performance trade off will be very costly. There is no ordering guarantee between partitions, so you would have to work with topics with only one partition. I believe you would also need to set the batch.size in the producer to 1 to ensure the messages aren't re-ordered within the producer buffer. As far as I'm aware, acks=all isn't necessary. On Mon, May 15, 2017 at 10:50 AM, João Peixoto <joao.harti...@gmail.com> wrote: > Afaik that is not possible out of the box as that would require > synchronization across multiple threads/instances. The throughput of such > an approach would be terrible as the parallelism of KStreams is tightly > coupled with the number of partitions. > > I'd say if you need such ordering you should reconsider the partitioning > key, as that would be the way to do it. > > On Mon, May 15, 2017 at 8:22 AM Wong, Janette <janette.w...@rbc.com.invalid > > > wrote: > > > Hi, > > > > Is it that there is no way to guarantee strict ordering of messages > within > > Kafka across partitions? > > Within a single partition, the only way for strict ordering is to set > > max.in.flight.requests.per.connection=1 and acks=all ? > > > > Thanks, > > Janette > > > > Janette Wong, P.Eng. | janette.w...@rbc.com | Senior Application > > Architect - Analytics | Digital Architecture | T: 416.348.6051 | M: > > 647.221.7836 | RBC WPP, 3rd floor > > > > > > > > ___ > > If you received this email in error, please advise the sender (by return > > email or otherwise) immediately. You have consented to receive the > attached > > electronically at the above-noted email address; please retain a copy of > > this confirmation for future reference. > > > > Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur > > immédiatement, par retour de courriel ou par un autre moyen. Vous avez > > accepté de recevoir le(s) document(s) ci-joint(s) par voie électronique à > > l'adresse courriel indiquée ci-dessus; veuillez conserver une copie de > > cette confirmation pour les fins de reference future. > > > -- Robert Quinlivan Software Engineer, Signal
Re: possible kafka bug, maybe in console producer/consumer utils
;> >> or even a single read so I'll try here. My original msg, slightly > >> >> edited, was: > >> >> > >> >> > >> >> > >> >> (windows 2K8R2 fully patched, 16GB ram, fairly modern dual core xeon > >> >> server, latest version of java) > >> >> > >> >> I've spent several days trying to sort out unexpected behaviour > >> >> involving kafka and the kafka console producer and consumer. > >> >> > >> >> If I set the console produced and console consumer to look at the > >> >> same topic then I can type lines into the producer window and see > them > >> >> appear in the consumer window, so it works. > >> >> > >> >> If I try to pipe in large amounts of data to the producer, some gets > >> >> lost and the producer reports errors eg. > >> >> > >> >> [2017-04-17 18:14:05,868] ERROR Error when sending message to topic > >> >> big_ptns1_repl1_nozip with key: null, value: 55 bytes with error: > >> >> (org.apache.kafka.clients. > >> >> producer.internals.ErrorLoggingCallback) > >> >> org.apache.kafka.common.errors.TimeoutException: Batch containing 8 > >> >> record(s) expired due to timeout while requesting metadata from > >> >> brokers for big_ptns1_repl1_nozip-0 > >> >> > >> >> I'm using as input a file either shakespeare's full works (about 5.4 > >> >> meg ascii), or a much larger file of shakespear's full works > >> >> replicated 900 times to make it about 5GB. Lines are ascii and short, > >> >> and each line should be a single record when read in by the console > >> >> producer. I need to do some benchmarking on time and space and this > >> >> was my first try. > >> >> > >> >> As mentioned, data gets lost. I presume it is expected that any data > >> >> we pipe into the producer should arrive in the consumer, so if I do > >> >> this in one windows console: > >> >> > >> >> kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic > >> >> big_ptns1_repl1_nozip --zookeeper localhost:2181 > > >> >> F:\Users\me\Desktop\shakespear\single_all_shakespear_OUT.txt > >> >> > >> >> and this in another: > >> >> > >> >> kafka-console-producer.bat --broker-list localhost:9092 --topic > >> >> big_ptns1_repl1_nozip < > >> >> F:\Users\me\Desktop\shakespear\complete_works_no_bare_lines.txt > >> >> > >> >> then the output file "single_all_shakespear_OUT.txt" should be > >> >> identical to the input file "complete_works_no_bare_lines.txt" > except > >> >> it's not. For the complete works (sabout 5.4 meg uncompressed) I lost > >> >> about 130K in the output. > >> >> For the replicated shakespeare, which is about 5GB, I lost about 150 > >> meg. > >> >> > >> >> This can't be right surely and it's repeatable but happens at > >> >> different places in the file when errors start to be produced, it > >> >> seems. > >> >> > >> >> I've done this using all 3 versions of kafak in the 0.10.x.y branch > >> >> and I get the same problem (the above commands were using the > 0.10.0.0 > >> >> branch so they look a little obsolete but they are right for that > >> >> branch I think). It's cost me some days. > >> >> So, am I making a mistake, if so what? > >> >> > >> >> thanks > >> >> > >> >> jan > >> >> > >> > > >> > > > -- Robert Quinlivan Software Engineer, Signal
Re: possible kafka bug, maybe in console producer/consumer utils
need to do some benchmarking on time and space and this > > >> was my first try. > > >> > > >> As mentioned, data gets lost. I presume it is expected that any data > > >> we pipe into the producer should arrive in the consumer, so if I do > > >> this in one windows console: > > >> > > >> kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic > > >> big_ptns1_repl1_nozip --zookeeper localhost:2181 > > > >> F:\Users\me\Desktop\shakespear\single_all_shakespear_OUT.txt > > >> > > >> and this in another: > > >> > > >> kafka-console-producer.bat --broker-list localhost:9092 --topic > > >> big_ptns1_repl1_nozip < > > >> F:\Users\me\Desktop\shakespear\complete_works_no_bare_lines.txt > > >> > > >> then the output file "single_all_shakespear_OUT.txt" should be > > >> identical to the input file "complete_works_no_bare_lines.txt" except > > >> it's not. For the complete works (sabout 5.4 meg uncompressed) I lost > > >> about 130K in the output. > > >> For the replicated shakespeare, which is about 5GB, I lost about 150 > > meg. > > >> > > >> This can't be right surely and it's repeatable but happens at > > >> different places in the file when errors start to be produced, it > > >> seems. > > >> > > >> I've done this using all 3 versions of kafak in the 0.10.x.y branch > > >> and I get the same problem (the above commands were using the 0.10.0.0 > > >> branch so they look a little obsolete but they are right for that > > >> branch I think). It's cost me some days. > > >> So, am I making a mistake, if so what? > > >> > > >> thanks > > >> > > >> jan > > >> > > > > > > -- Robert Quinlivan Software Engineer, Signal
Kafka producer does not respect added partitions
Hello, I have added partitions to a topic. The new partitions appear in the consumer assignments and in the topics listing they have the correct number of ISRs. However, the producer still does not write to the new partitions. My producer writes in a round-robin fashion, using the Cluster's reported partition count. I have seen no mention of a need to restart or reconfigure the producer in order to pick up the added partitions. Is this required? Thanks -- Robert Quinlivan Software Engineer, Signal
Re: Kafka consumer offset reset
What offset do you want to reset them to? The easier way to adjust offsets in 0.10 is to attach a consumer for the target topic-partition and seek to the position you desire and commit that new offset. On Tue, Mar 21, 2017 at 9:56 AM, Jakub Stransky <stransky...@gmail.com> wrote: > Hello, > > just recently migrated to using Kafka 0.10.1.0 > > I would like to reset position for some consumers. I went through > documentation and couldn't spot it how to achieve that. All what I got is > that v 10 reduces usage of zookeeper and clients have possibility to use > different storage for maintaining the offsets. > > Could someone more experienced elaborate a bit on this topic? > > Thanks > jakub > -- Robert Quinlivan Software Engineer, Signal
Re: ConsumerRebalanceListerner
Does the consumer or producer log anything when you connect the new consumer? Is there anything logged in the broker logs? On Mon, Mar 20, 2017 at 10:58 AM, jeffrey venus <jeffrey.ve...@gmail.com> wrote: > Hi,tarte > I am trying to use kafka as a messaging platform for my microservices. > There is a problem i am facing in real time . when I try to bring in a new > consumer group to consume from a certain topic . I have to restart the > producer only then it ( new consumer group) starts consuming. Is there any > other away without disturbing the producer. > > > Regards > V G Sunjay Jeffrish > -- Robert Quinlivan Software Engineer, Signal
Re: Offset commit request failing
Thanks for the response. Reading through that thread, it appears that this issue was addressed with KAFKA-3810 <https://issues.apache.org/jira/browse/KAFKA-3810>. This change eases the restriction on fetch size between replicas. However, should the outcome be a more comprehensive change to the serialization format of the request? The size of the group metadata currently grows linearly with the number of topic-partitions. This is difficult to tune for in a configuration using topic auto creation. On Fri, Mar 17, 2017 at 3:17 AM, James Cheng <wushuja...@gmail.com> wrote: > I think it's due to the high number of partitions and the high number of > consumers in the group. The group coordination info to keep track of the > assignments actually happens via a message that travels through the > __consumer_offsets topic. So with so many partitions and consumers, the > message gets too big to go through the topic. > > There is a long thread here that discusses it. I don't remember what > specific actions came out of that discussion. http://search-hadoop.com/m/ > Kafka/uyzND1yd26N1rFtRd1?subj=+DISCUSS+scalability+limits+ > in+the+coordinator > > -James > > Sent from my iPhone > > > On Mar 15, 2017, at 9:40 AM, Robert Quinlivan <rquinli...@signal.co> > wrote: > > > > I should also mention that this error was seen on broker version > 0.10.1.1. > > I found that this condition sounds somewhat similar to KAFKA-4362 > > <https://issues.apache.org/jira/browse/KAFKA-4362>, but that issue was > > submitted in 0.10.1.1 so they appear to be different issues. > > > > On Wed, Mar 15, 2017 at 11:11 AM, Robert Quinlivan <rquinli...@signal.co > > > > wrote: > > > >> Good morning, > >> > >> I'm hoping for some help understanding the expected behavior for an > offset > >> commit request and why this request might fail on the broker. > >> > >> *Context:* > >> > >> For context, my configuration looks like this: > >> > >> - Three brokers > >> - Consumer offsets topic replication factor set to 3 > >> - Auto commit enabled > >> - The user application topic, which I will call "my_topic", has a > >> replication factor of 3 as well and 800 partitions > >> - 4000 consumers attached in consumer group "my_group" > >> > >> > >> *Issue:* > >> > >> When I attach the consumers, the coordinator logs the following error > >> message repeatedly for each generation: > >> > >> ERROR [Group Metadata Manager on Broker 0]: Appending metadata message > for > >> group my_group generation 2066 failed due to org.apache.kafka.common. > >> errors.RecordTooLargeException, returning UNKNOWN error code to the > >> client (kafka.coordinator.GroupMetadataManager) > >> > >> *Observed behavior:* > >> > >> The consumer group does not stay connected long enough to consume > >> messages. It is effectively stuck in a rebalance loop and the "my_topic" > >> data has become unavailable. > >> > >> > >> *Investigation:* > >> > >> Following the Group Metadata Manager code, it looks like the broker is > >> writing to a cache after it writes an Offset Commit Request to the log > >> file. If this cache write fails, the broker then logs this error and > >> returns an error code in the response. In this case, the error from the > >> cache is MESSAGE_TOO_LARGE, which is logged as a > RecordTooLargeException. > >> However, the broker then sets the error code to UNKNOWN on the Offset > >> Commit Response. > >> > >> It seems that the issue is the size of the metadata in the Offset Commit > >> Request. I have the following questions: > >> > >> 1. What is the size limit for this request? Are we exceeding the size > >> which is causing this request to fail? > >> 2. If this is an issue with metadata size, what would cause abnormally > >> large metadata? > >> 3. How is this cache used within the broker? > >> > >> > >> Thanks in advance for any insights you can provide. > >> > >> Regards, > >> Robert Quinlivan > >> Software Engineer, Signal > >> > > > > > > > > -- > > Robert Quinlivan > > Software Engineer, Signal > -- Robert Quinlivan Software Engineer, Signal
Re: Offset commit request failing
I should also mention that this error was seen on broker version 0.10.1.1. I found that this condition sounds somewhat similar to KAFKA-4362 <https://issues.apache.org/jira/browse/KAFKA-4362>, but that issue was submitted in 0.10.1.1 so they appear to be different issues. On Wed, Mar 15, 2017 at 11:11 AM, Robert Quinlivan <rquinli...@signal.co> wrote: > Good morning, > > I'm hoping for some help understanding the expected behavior for an offset > commit request and why this request might fail on the broker. > > *Context:* > > For context, my configuration looks like this: > >- Three brokers >- Consumer offsets topic replication factor set to 3 >- Auto commit enabled >- The user application topic, which I will call "my_topic", has a >replication factor of 3 as well and 800 partitions >- 4000 consumers attached in consumer group "my_group" > > > *Issue:* > > When I attach the consumers, the coordinator logs the following error > message repeatedly for each generation: > > ERROR [Group Metadata Manager on Broker 0]: Appending metadata message for > group my_group generation 2066 failed due to org.apache.kafka.common. > errors.RecordTooLargeException, returning UNKNOWN error code to the > client (kafka.coordinator.GroupMetadataManager) > > *Observed behavior:* > > The consumer group does not stay connected long enough to consume > messages. It is effectively stuck in a rebalance loop and the "my_topic" > data has become unavailable. > > > *Investigation:* > > Following the Group Metadata Manager code, it looks like the broker is > writing to a cache after it writes an Offset Commit Request to the log > file. If this cache write fails, the broker then logs this error and > returns an error code in the response. In this case, the error from the > cache is MESSAGE_TOO_LARGE, which is logged as a RecordTooLargeException. > However, the broker then sets the error code to UNKNOWN on the Offset > Commit Response. > > It seems that the issue is the size of the metadata in the Offset Commit > Request. I have the following questions: > >1. What is the size limit for this request? Are we exceeding the size >which is causing this request to fail? >2. If this is an issue with metadata size, what would cause abnormally > large metadata? >3. How is this cache used within the broker? > > > Thanks in advance for any insights you can provide. > > Regards, > Robert Quinlivan > Software Engineer, Signal > -- Robert Quinlivan Software Engineer, Signal
Offset commit request failing
Good morning, I'm hoping for some help understanding the expected behavior for an offset commit request and why this request might fail on the broker. *Context:* For context, my configuration looks like this: - Three brokers - Consumer offsets topic replication factor set to 3 - Auto commit enabled - The user application topic, which I will call "my_topic", has a replication factor of 3 as well and 800 partitions - 4000 consumers attached in consumer group "my_group" *Issue:* When I attach the consumers, the coordinator logs the following error message repeatedly for each generation: ERROR [Group Metadata Manager on Broker 0]: Appending metadata message for group my_group generation 2066 failed due to org.apache.kafka.common.errors.RecordTooLargeException, returning UNKNOWN error code to the client (kafka.coordinator.GroupMetadataManager) *Observed behavior:* The consumer group does not stay connected long enough to consume messages. It is effectively stuck in a rebalance loop and the "my_topic" data has become unavailable. *Investigation:* Following the Group Metadata Manager code, it looks like the broker is writing to a cache after it writes an Offset Commit Request to the log file. If this cache write fails, the broker then logs this error and returns an error code in the response. In this case, the error from the cache is MESSAGE_TOO_LARGE, which is logged as a RecordTooLargeException. However, the broker then sets the error code to UNKNOWN on the Offset Commit Response. It seems that the issue is the size of the metadata in the Offset Commit Request. I have the following questions: 1. What is the size limit for this request? Are we exceeding the size which is causing this request to fail? 2. If this is an issue with metadata size, what would cause abnormally large metadata? 3. How is this cache used within the broker? Thanks in advance for any insights you can provide. Regards, Robert Quinlivan Software Engineer, Signal
Re: Question on Metadata
Did you look at the ConsumerRecord <https://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/consumer/ConsumerRecord.html> class? On Tue, Mar 14, 2017 at 11:09 AM, Syed Mudassir Ahmed < smudas...@snaplogic.com> wrote: > Can anyone help? > > -- Forwarded message -- > From: "Syed Mudassir Ahmed" <smudas...@snaplogic.com> > Date: Mar 14, 2017 12:28 PM > Subject: Question on Metadata > To: <users@kafka.apache.org> > Cc: > > Hi guys, > When we consume a JMS message, we get a Message object that has methods > to fetch implicit metadata provided by JMS server. > http://docs.oracle.com/javaee/6/api/javax/jms/Message.html. There are > methods to fetch that implicit metadata such as Expiration, Correlation ID, > etc. > > Is there a way to fetch any such implicit metadata while consuming a > kafka message? Any pointers to do that would be greatly appreciated. > > Thanks in advance. > > -- > Thanks, > Syed. > -- Robert Quinlivan Software Engineer, Signal
Re: Simple KafkaProducer to handle multiple requests or not
There is no need to create a new producer instance for each write request. In doing so you lose the advantages of the buffering and batching that the producer offers. In your use case I would recommend having a single running producer and tuning the batch size and linger.ms settings if you find that the producer is using too much memory. On Mon, Mar 13, 2017 at 5:05 AM, Amit K <amitk@gmail.com> wrote: > Hi, > > I am using simple kafka producer (java based, version 0.9.0.0) in an > application where I receive lot of hits (about 50 per seconds, in much like > servlet way) on application that has kafka producer. Per request comes > different set of records. > > I am using only one instance of kafka producer to push the records to kafka > cluster. Is that good way to use kafka producer? As it is mentioned in > documentation, the kafkaproducer can be shared across multiple threads. > > Or should be there one kafka producer created to handle one request? > > Is there any best practice documents/guidelines to follow for using simple > java Kafka producer api? > > Thanks in advance for your responses. > > Thanks, > Amit > -- Robert Quinlivan Software Engineer, Signal
Re: How to set offset for a consumer in Kafka 0.10.0.X
The best approach would be: - Have all consumers in your group shut down - Have an offset reset tool join with the same group name as above - Offset tool subscribes to all topic-partitions, seeks to the desired offset, and commits. - Offset tool shuts down - Consumers then restart and re-join the consumer group, resuming at the offsets that were last committed for each topic-partition On Wed, Mar 8, 2017 at 10:51 AM, Jeff Widman <j...@netskope.com> wrote: > Yeah, that gist looks like it *should* work. I haven't tested it so can't > guarantee. > > On Tue, Mar 7, 2017 at 7:04 PM, Glen Ogilvie <glen.ogil...@oss.co.nz> > wrote: > > > Hi Jeff, > > > > Yes, the work I am doing is ops work. Logstash is consuming from the > topic > > + consumer group, and I don't want it to start at the beginning, but > rather > > at a specific offset, > > so setting the offset for the consumer group externally, then starting up > > logstash is my goal. > > > > I'm still a little unclear as to how to do this. > > > > Is this python script: http://pastebin.com/tvxj1wTX > > > > The right way to go about getting the offset set to a specific value > > (12345678 in this example) for a specific consumer group? > > > > Regards > > -- > > Glen Ogilvie > > Open Systems Specialists > > Level 1, 162 Grafton Road > > http://www.oss.co.nz/ > > > > Ph: +64 9 984 3000 > > Mobile: +64 21 684 146 > > GPG Key: ACED9C17 > > > > > > From: "Jeff Widman" <j...@netskope.com> > > To: "users" <users@kafka.apache.org> > > Sent: Wednesday, 8 March, 2017 1:41:17 PM > > Subject: Re: How to set offset for a consumer in Kafka 0.10.0.X > > > > Offsets for modern kafka consumers are stored in an internal Kafka topic, > > so they aren't as easy to change as zookeeper. > > > > To set a consumer offset, you need a consumer within a consumer group to > > call commit() with your explicit offset. If needed, you can create a > dummy > > consumer and tell it to join an existing consumer group. > > > > Take a look at this migration script, especially the part where it > commits > > the offset to see how it can work in Scala: > > https://github.com/apache/kafka/pull/2615/files > > > > It's fairly straightforward to do this within most other clients as well. > > If you're doing some quick ops work where you don't want to spin up the > > JVM, then it's fairly easy to do this using kafka-python. > > > > > > > > On Tue, Mar 7, 2017 at 4:08 PM, Glen Ogilvie <glen.ogil...@oss.co.nz> > > wrote: > > > > > Hi, > > > > > > We are running Kafka 0.10.0.X, with zookeeper. I'm trying to figure out > > if > > > I can manually > > > set a consumer offset, for a specific consumer when that consumer is > > > stopped. > > > > > > It looks like it used to be done using: kafka.tools.ExportZkOffsets and > > > kafka.tools.ImportZkOffsets > > > ( https://cwiki.apache.org/confluence/display/KAFKA/ > > > System+Tools#SystemTools-ConsumerOffsetChecker ) > > > > > > However, if my version they don't work, because they try and read from > > > zookeeper /consumers which is empty.. I think they are old tools. > > > > > > Does anyone know where in zookeeper, where the current kafka keeps > > > consumer offsets? > > > > > > Regards > > > -- > > > Glen Ogilvie > > > Open Systems Specialists > > > Level 1, 162 Grafton Road > > > http://www.oss.co.nz/ > > > > > > Ph: +64 9 984 3000 > > > Mobile: +64 21 684 146 > > > GPG Key: ACED9C17 > > > > > > -- Robert Quinlivan Software Engineer, Signal
Consumer group stuck rebalancing with RecordTooLargeException
Hello, I have a consumer group that is continually stuck in a reblance with the following error being produced in the broker logs: [2017-03-07 22:16:20,473] ERROR [Group Metadata Manager on Broker 0]: Appending metadata message for group tagfire_application generation 951 failed due to org.apache.kafka.common.errors.RecordTooLargeException, returning UNKNOWN error code to the client (kafka.coordinator.GroupMetadataManager) The consumer group cannot attach. How can I resolve this issue on the broker? Thanks -- Robert Quinlivan Software Engineer, Signal
Messages rejected by broker due to size
Hello, I'm looking for a bit of clarification on the outcome of a RecordTooLargeException in the broker that was not evident from the documentation. First, what happens to a large record that is rejected? Is it ignored by the broker or is there a way to recover a rejected message? Second, is the message size condition checked before or after compression/batching is applied? What exactly is "message.max.bytes" looking at? Third, what happens if a broker accepts the record from the producer but the "replica.fetch.max.bytes" setting is lower than the "message.max.bytes" setting? This seems like an edge case to me. The leader would accept the record but replicas would not be able to receive it, so it would be lost. Or does the replica take the max of those two settings in order to avoid this condition? Thanks in advance! -- Robert Quinlivan Software Engineer, Signal
Re: Kafka delay
Can you clarify what you mean by "process time"? Is this the time it takes to have a message sent from the producer appear in a consumer? You may be seeing the result of batching. The producer batches messages so if you are testing sending a single message at a time you will get different performance characteristics than if you are sending many. On Fri, Jan 13, 2017 at 2:36 AM, Patricia Callejo < patricia.call...@imdea.org> wrote: > Hi, > we are developing a kafka environment in order to be able to process up > hundreds of messages per second. The point is that if we send in different > partitions a lot of messages, the time of the process is very good in ms, > however if we send just a single message, the minimum time required is very > high, at least 1 or 2 seconds, even more. > Is there any configuration we are missing to avoid this minimum delay? or > is the minimum time required by kafka for the whole process? > > Best Regards, > Patricia -- Robert Quinlivan Software Engineer, Signal
Metric meaning
Hello, Are there more detailed descriptions available for the metrics exposed by Kafka via JMX? The current documentation provides some information but a few metrics are not listed in detail – for example, "Log flush rate and time." -- Robert Quinlivan Software Engineer, Signal
Multi-topic consumer configuration
Hello, We are using a multi-topic approach for partitioning data. Each topic will follow a pattern naming convention. We want to design our consumer configuration so that a slow topic will not block a fast topic, as each topic will consume at a different rate. However I am seeing an issue where all topics matching the pattern are assigned to a single consumer. When there are several consumers in the group with a single topic with many partitions, the partitions are automatically distributed among the available consumers in the group. I expected that calling KafkaConsumer#subscribe(Pattern, ConsumerRebalanceListener) would follow a similar behavior by distributing the assigned topics among all consumers in the group. Is this not the case? What is the expected behavior and how would you recommend implementing this design? Thank you -- Robert Quinlivan Software Engineer, Signal
Issue with "stuck" consumer in 0.9 broker
I am running an 0.9 broker and I'm having trouble viewing and committing offsets. Upon starting up the broker, I see the following in the kafka.out log: [2016-12-17 14:56:14,389] WARN Connected to an old server; r-o mode will be unavailable (org.apache.zookeeper.ClientCnxnSocket) I have one client consumer. I am using the new consumer. The consumer-groups tool reports this: $ ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list --new-consumer my_application However, it throws an exception when checking the position of the my_application group: $ ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --new-consumer --describe --group my_application Consumer group `my_application` does not exist or is rebalancing. My suspicion is that the broker is stuck in a rebalance. Occasionally, I see the following in my client logs: INFO 2016-12-17 09:06:05,410 org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Attempt to heart beat failed since the group is rebalancing, try to re-join group. INFO 2016-12-17 09:06:05,430 org.apache.kafka.clients.consumer.internals.Fetcher - Fetch offset 2013 is out of range, resetting offset INFO 2016-12-17 09:06:05,430 org.apache.kafka.clients.consumer.internals.Fetcher - Fetch offset 30280 is out of range, resetting offset Which I'm guessing is the consumer timing out and attempting to rejoin, but failing to do so. If it's relevant, my server properties file looks like this: broker.id=0 auto.create.topics.enable=true group.max.session.timeout.ms=30 default.replication.factor=1 offsets.topic.replication.factor=1 compression.type=snappy num.network.threads=2 num.io.threads=2 socket.send.buffer.bytes=1048576 socket.receive.buffer.bytes=1048576 accept (protection against OOM) socket.request.max.bytes=104857600 log.dirs=/mnt/kafka-logs num.partitions=1 delete.topic.enable=true log.flush.interval.messages=1 log.flush.interval.ms=1000 log.retention.minutes=1 offsets.retention.minutes=86400 log.segment.bytes=486870912 log.retention.check.interval.ms=6 zookeeper.connect=localhost:2181/my_chroot zookeeper.connection.timeout.ms=100 inter.broker.protocol.version=0.9.0.1 port=9092 offsets.topic.compression.codec=2 I'm having trouble understanding what is broken without any further information logged from the brokers. Is there a switch in the broker config that can provide more verbose logging, or is there another way of checking the offsets? Thank you -- Robert Quinlivan Software Engineer, Signal