It looks like that's on applicable to the SyncProducer? Also, I assume the upcoming (0.7) compression functionality be on the wire compression?
Thanks, -Blake On Sat, Aug 13, 2011 at 11:27 PM, Jun Rao <jun...@gmail.com> wrote: > Blake, > > Once an unexpectedly large message hits the broker, it's going to be hard > for the consumer to automatically recover from this error. The best way is > to prevent such a message from being produced in the first place. There is a > config property max.message.size in the producer that controls the size of > the largest allowed message. Messages exceeding that size will not be sent. > > Thanks, > > Jun > > On Sat, Aug 13, 2011 at 8:14 PM, Blake Matheny <bl...@tumblr.com> wrote: > >> Thanks for clarifying things Jay, makes sense. It was indeed a 'bug', >> Once I looked at the logs (oh yeah, those things), I found: >> >> ERROR [20110813-21:50:50.227] consumer.FetcherRunnable: error in >> FetcherRunnable >> kafka.common.InvalidMessageSizeException: invalid message size:488145 >> only received bytes:307196 at 0 possible causes (1) a single message >> larger than the fetch size; (2) log corruption >> INFO [20110813-21:50:50.228] consumer.FetcherRunnable: stopping >> fetcher FetchRunnable-0 to host 10.60.26.38 >> com.tumblr.motherboy.workers.UpdatePostWorker@1d573137: caught >> kafka.common.InvalidMessageSizeException >> >> After increasing the fetch.size the consumer continued on and is now >> catching up. For us this is safe since our codec would catch any >> corruption in the message, but it brings up a question. What is the >> right way to deal with this? Catch it and skip the message? Can you >> safely update the offset in zookeeper to force the consumer to move >> ahead? >> >> Thanks, >> >> -Blake >> >> On Sat, Aug 13, 2011 at 10:57 PM, Jay Kreps <jay.kr...@gmail.com> wrote: >> > Hi Blake, >> > >> > Yes, as you say the num.partitions is the number of partitions per node. >> So >> > as you say, 5 topics with 5 partitions each will have 25 total topic >> > partitions per node for a total of 50 partitions. >> > >> > Partitions can be merged for consumption but not split, which means >> across >> > all consumer machines/processes you can have as few as 1 or as many as 50 >> > active consumers (if you have more than 50, the extras won't get any >> data). >> > Provided this is the case, and you have an active thread consuming off of >> > each of the 5 topics, you should see offset updates on each topic >> appearing >> > in zookeeper for all the partitions. Be aware that data is fetched in >> chunks >> > from each partition and those partitons are processed sequentially so the >> > updates will not be continual on all partitions. >> > >> > What you are describing sounds like a bug somewhere. Can you turn on >> debug >> > logging in the consumers and add logging to make sure all your threads >> are >> > really getting data and consuming? >> > >> > -Jay >> > >> > On Sat, Aug 13, 2011 at 7:39 PM, Blake Matheny <bl...@tumblr.com> wrote: >> > >> >> Our current setup: >> >> >> >> 2 brokers, each with num.partitions set to 5 >> >> n producers, publishing to 5 topics >> >> 5 consumers >> >> All in same consumer group >> >> Each is consuming from all 5 topics >> >> Each is reading from 2 KafkaMessageStream's >> >> Custom Partitioner, provides uniform distribution >> >> >> >> Having read the recently recommended Kafka paper that describes some >> >> of the partitioning semantics, I have a few questions wrt the above >> >> setup. >> >> >> >> First, the way the ZK info for the brokers read, it looks like setting >> >> num.partitions to 5 on each broker has actually created 10 total >> >> partitions, 5 on each broker, is that correct? >> >> Second, with 5 topics, 5 partitions, and 2 brokers, does that give you >> >> 50 distinct message streams? I understand that a consumer can pull >> >> from more than one partition, but assuming you would like to map a >> >> single topic/partition to each consumer, would you in the above setup >> >> want to run 50 consumers? >> >> Lastly, I'm seeing updates to the log files on the second broker >> >> (/tmp/kafka-logs/[topic]-[partition-id]/[logfile].kafka is growing), >> >> but the corresponding offset znode isn't being updated by the >> >> consumer. The same consumer is updating the offset for the same topic, >> >> different partiton/consumer just fine (which leads me to believe the >> >> consumer is working properly). Is there something in the above >> >> described config that sounds incorrect? I'm wondering if there is a >> >> bug (in my code or elsewhere) when a consumer is reading from two >> >> partitions on the same topic across more than one broker. Just >> >> guessing though. >> >> >> >> Thanks in advance, >> >> >> >> -Blake >> >> >> >> -- >> >> Blake Matheny >> >> >> > >> >> >> >> -- >> Blake Matheny >> > -- Blake Matheny