Re: c++ kafka
Check out librdkafka https://github.com/edenhill/librdkafka On Tue, Jan 29, 2019 at 07:02 Dor Ben Dov wrote: > Hi, > Can one tell me what is the best c++ library or approach if I have > a c++ process in which I want it to write to kafka? > > > Thanks in advanced, > Dor > > This email and the information contained herein is proprietary and > confidential and subject to the Amdocs Email Terms of Service, which you > may review at https://www.amdocs.com/about/email-terms-of-service < > https://www.amdocs.com/about/email-terms-of-service> > -- Emmett Butler | Senior Software Engineer <http://www.parsely.com/?utm_source=Signature_medium=emmett-butler_campaign=Signature> See what the world is paying attention to in *Currents <https://currents.parsely.com/?utm_source=Signature_medium=emmett-butler_campaign=Signature>*
PyKafka 2.8.0 released
Hi everyone, Yesterday Parse.ly released PyKafka <https://github.com/Parsely/pykafka> 2.8.0, which has been in the works since January. For those not familiar, PyKafka is a pure-Python client for Kafka that maintains a programmer-friendly API and offers optional integration with the librdkafka C client for enhanced performance. Check out the release notes for 2.8.0 <https://github.com/Parsely/pykafka/releases/tag/2.8.0>, and install PyKafka with pip install pykafka. Here's a high-level overview of some of the most exciting changes in this release: - Support for user-defined SerDes to simplify the handling of custom message payload formats - Decoupling of the "committed" and "consumed" concepts to allow granular management of offsets in client code - Implementation of non-busy waiting in the Producer - Tons of bugfixes As always, please feel free to reach out with questions or comments about PyKafka via the mailing list or the GitHub repo. -- Emmett Butler | Senior Software Engineer <http://www.parsely.com/?utm_source=Signature_medium=emmett-butler_campaign=Signature>
Re: Big Log Retention
In the particular case I'm referring to, I believe some partitions were replicated to new nodes. These new nodes had been added to the cluster as a result of human error, and were removed shortly thereafter. It's my hypothesis that this resulted in many partitions having their replicas on nodes that were no longer in the cluster and were never expected to rejoin. Maybe this was a case in which the partition reassignment CLI tool would have been useful? On Thu, Sep 6, 2018 at 11:09 PM Brett Rann wrote: > We have partitions that are in the 100s of GBs. > > It shouldn't have to shuffle around GB chunks of data unless you have done > partition moves, or had a broker offline for a while. Is that the case? > > If not your ISR problem is probably related to something else other than > retention size. > > On Fri, Sep 7, 2018 at 7:25 AM Emmett Butler wrote: > > > Hi users, > > > > What's the biggest topic you've seen in production use? The factors I'm > > interested in are log.retention.* config parameters and number of > > partitions. > > > > We run several 200-partition topics that have log retention set to > roughly > > 1GB per partition. Is this big by common usage standards, or not so big? > > > > I ask because we've had some situations in which ISRs appeared to take an > > inordinately long time to synchronize, and I'm guessing this is partially > > related to the need to shuttle around 1GB hunks of data. > > > > -- > > Emmett Butler | Senior Software Engineer > > < > > > http://www.parsely.com/?utm_source=Signature_medium=emmett-butler_campaign=Signature > > < > http://www.parsely.com/?utm_source=Signature_medium=emmett-butler_campaign=Signature > >> > > > -- Emmett Butler | Senior Software Engineer <http://www.parsely.com/?utm_source=Signature_medium=emmett-butler_campaign=Signature>
Big Log Retention
Hi users, What's the biggest topic you've seen in production use? The factors I'm interested in are log.retention.* config parameters and number of partitions. We run several 200-partition topics that have log retention set to roughly 1GB per partition. Is this big by common usage standards, or not so big? I ask because we've had some situations in which ISRs appeared to take an inordinately long time to synchronize, and I'm guessing this is partially related to the need to shuttle around 1GB hunks of data. -- Emmett Butler | Senior Software Engineer <http://www.parsely.com/?utm_source=Signature_medium=emmett-butler_campaign=Signature>
Fwd: coordinator load + OffsetFetchRequest error = consumption failure
Hi Kafka users, *tldr questions;* *1. Is it normal or expected for the coordinator load state to last for 6 hours? Is this load time affected by log retention settings, message production rate, or other parameters?* *2. Do non-pykafka clients handle COORDINATOR_LOAD_IN_PROGRESS by consuming only from the non-erroring partitions? Pykafka's insistence that all partitions return successful OffsetFetchResponses can be a source of consumption downtime.* *3. Why does Kafka sometimes select a non-synced replica as the preferred replica during coordinator loads? How can I reassign partition leaders to replicas in the ISR?* *4. Are all of these questions moot because I should just be using a newer version of Kafka than 0.8.2.1?* I'm using Kafka 0.8.2.1, running a topic with 200 partitions and RF=3, with log retention set to about 1GB. An unknown event caused the cluster to enter the "coordinator load" or "group load" state. A few signals made this apparent: the pykafka-based consumers began to fail <https://github.com/Parsely/pykafka/blob/858554029830e15cfa6d15df002d1772672d8ee0/pykafka/simpleconsumer.py#L643> during FetchOffsetRequests with error code 14 COORDINATOR_LOAD_IN_PROGRESS for some subset of partitions. These errors were triggered when consuming with a consumer group that had existed since before the coordinator load. In broker logs, messages like this appeared: [2018-05...] ERROR Controller 17 epoch 20 initiated state change for partition [my.cool.topic,144] from OnlinePartition to OnlinePartition failed (state.change.logger) kafka.common.StateChangeFailedException: encountered error while electing leader for partition [my.cool.topic,144] due to: Preferred replica 11 for partition [my.cool.topic,144] is either not alive or not in the isr. Current leader and ISR: [{"leader":12,"leader_epoch":7,"isr":[12,13]}]. For some reason, Kafka decided that replica 11 was the "preferred" replica despite the fact that it was not in the ISR. To my knowledge, consumption *could* continue uninterrupted from either replica 12 or 13 while 11 resynchronized - it's not clear why Kafka chose a non-synced replica as the preferred leader. The above-described behavior lasted for about 6 hours, during which the pykafka fetch_offsets error made message consumption impossible. While the coordinator load was still in progress, other consumer groups were able to consume the topic without error. In fact, the eventual fix was to restart the broken consumers with a new consumer_group name. *Questions* 1. Is it normal or expected for the coordinator load state to last for 6 hours? Is this load time affected by log retention settings, message production rate, or other parameters? 2. Do non-pykafka clients handle COORDINATOR_LOAD_IN_PROGRESS by consuming only from the non-erroring partitions? Pykafka's insistence that all partitions return successful OffsetFetchResponses can be a source of consumption downtime. 3. Why does Kafka sometimes select a non-synced replica as the preferred replica during coordinator loads? How can I reassign partition leaders to replicas in the ISR? 4. Are all of these questions moot because I should just be using a newer version of Kafka? Thanks for your help, and please let me know if I can provide additional information or answer additional questions. Broker config options: broker.id=10 port=9092 zookeeper.connect=/kafka5 log.dirs=* delete.topic.enable=true replica.fetch.max.bytes=1048576 replica.fetch.wait.max.ms=500 replica.high.watermark.checkpoint.interval.ms=5000 replica.socket.timeout.ms=3 replica.socket.receive.buffer.bytes=65536 replica.lag.time.max.ms=1 replica.lag.max.messages=4000 controller.socket.timeout.ms=3 message.max.bytes=100 auto.create.topics.enable=false log.index.interval.bytes=4096 log.index.size.max.bytes=10485760 log.retention.hours=96 log.roll.hours=168 log.retention.check.interval.ms=30 log.segment.bytes=1073741824 zookeeper.connection.timeout.ms=6000 zookeeper.sync.time.ms=2000 num.io.threads=8 socket.request.max.bytes=104857600 num.replica.fetchers=4 controller.message.queue.size=10 num.partitions=8 log.flush.interval.ms=6 log.flush.interval.messages=6 log.flush.scheduler.interval.ms=2000 num.network.threads=8 socket.receive.buffer.bytes=1048576 socket.send.buffer.bytes=1048576 queued.max.requests=500 fetch.purgatory.purge.interval.requests=100 producer.purgatory.purge.interval.requests=100 controlled.shutdown.enable=true -- Emmett Butler | Senior Software Engineer <http://www.parsely.com/?utm_source=Signature_medium=emmett-butler_campaign=Signature>
Re: Ordering in a Kafka Pipe line
Kafka provides total ordering only within individual partitions. A topic with multiple partitions is considered a "partial order", in which multiple subsets of the topic are considered well-ordered but the topic as a whole is not. The tradeoff between scalability via partitioning and message ordering is fundamental to Kafka's design and unfortunately makes it difficult to built Kafka-based pipelines that rely on strict ordering. Partition keys are used to ensure that messages sharing a certain characteristic will always go to the same partition. It's possible that you could use this to your advantage given the ordering guarantees provided by individual partitions, but you'd still be limited in your ability to scale beyond a single partition for any given partition key. On Fri, Jul 6, 2018 at 10:15 AM chinchu chinchu wrote: > Hello, > We are building a data pipe line with the following semantics. We need to > maintain order till the last unit of work is done in this pipe line .We > cannot have a single partition since that looses our ability to scale . > Looked at using partitioning keys ,but does that guarantee order in the > pipe line as such as opposed to between a topic and consumers? > > produces > consumes enriches and produces >consumes ,does work and produces > > persistent store > > Thanks, > Chinchu > -- Emmett Butler | Senior Software Engineer <http://www.parsely.com/?utm_source=Signature_medium=emmett-butler_campaign=Signature>
Fwd: coordinator load + OffsetFetchRequest error = consumption failure
Hi Kafka users, *tldr questions;* *1. Is it normal or expected for the coordinator load state to last for 6 hours? Is this load time affected by log retention settings, message production rate, or other parameters?* *2. Do non-pykafka clients handle COORDINATOR_LOAD_IN_PROGRESS by consuming only from the non-erroring partitions? Pykafka's insistence that all partitions return successful OffsetFetchResponses can be a source of consumption downtime.* *3. Why does Kafka sometimes select a non-synced replica as the preferred replica during coordinator loads? How can I reassign partition leaders to replicas in the ISR?* *4. Are all of these questions moot because I should just be using a newer version of Kafka than 0.8.2.1?* I'm using Kafka 0.8.2.1, running a topic with 200 partitions and RF=3, with log retention set to about 1GB. An unknown event caused the cluster to enter the "coordinator load" or "group load" state. A few signals made this apparent: the pykafka-based consumers began to fail <https://github.com/Parsely/pykafka/blob/858554029830e15cfa6d15df002d1772672d8ee0/pykafka/simpleconsumer.py#L643> during FetchOffsetRequests with error code 14 COORDINATOR_LOAD_IN_PROGRESS for some subset of partitions. These errors were triggered when consuming with a consumer group that had existed since before the coordinator load. In broker logs, messages like this appeared: [2018-05...] ERROR Controller 17 epoch 20 initiated state change for partition [my.cool.topic,144] from OnlinePartition to OnlinePartition failed (state.change.logger) kafka.common.StateChangeFailedException: encountered error while electing leader for partition [my.cool.topic,144] due to: Preferred replica 11 for partition [my.cool.topic,144] is either not alive or not in the isr. Current leader and ISR: [{"leader":12,"leader_epoch":7,"isr":[12,13]}]. For some reason, Kafka decided that replica 11 was the "preferred" replica despite the fact that it was not in the ISR. To my knowledge, consumption *could* continue uninterrupted from either replica 12 or 13 while 11 resynchronized - it's not clear why Kafka chose a non-synced replica as the preferred leader. The above-described behavior lasted for about 6 hours, during which the pykafka fetch_offsets error made message consumption impossible. While the coordinator load was still in progress, other consumer groups were able to consume the topic without error. In fact, the eventual fix was to restart the broken consumers with a new consumer_group name. *Questions* 1. Is it normal or expected for the coordinator load state to last for 6 hours? Is this load time affected by log retention settings, message production rate, or other parameters? 2. Do non-pykafka clients handle COORDINATOR_LOAD_IN_PROGRESS by consuming only from the non-erroring partitions? Pykafka's insistence that all partitions return successful OffsetFetchResponses can be a source of consumption downtime. 3. Why does Kafka sometimes select a non-synced replica as the preferred replica during coordinator loads? How can I reassign partition leaders to replicas in the ISR? 4. Are all of these questions moot because I should just be using a newer version of Kafka? Thanks for your help, and please let me know if I can provide additional information or answer additional questions. Broker config options: broker.id=10 port=9092 zookeeper.connect=/kafka5 log.dirs=* delete.topic.enable=true replica.fetch.max.bytes=1048576 replica.fetch.wait.max.ms=500 replica.high.watermark.checkpoint.interval.ms=5000 replica.socket.timeout.ms=3 replica.socket.receive.buffer.bytes=65536 replica.lag.time.max.ms=1 replica.lag.max.messages=4000 controller.socket.timeout.ms=3 message.max.bytes=100 auto.create.topics.enable=false log.index.interval.bytes=4096 log.index.size.max.bytes=10485760 log.retention.hours=96 log.roll.hours=168 log.retention.check.interval.ms=30 log.segment.bytes=1073741824 zookeeper.connection.timeout.ms=6000 zookeeper.sync.time.ms=2000 num.io.threads=8 socket.request.max.bytes=104857600 num.replica.fetchers=4 controller.message.queue.size=10 num.partitions=8 log.flush.interval.ms=6 log.flush.interval.messages=6 log.flush.scheduler.interval.ms=2000 num.network.threads=8 socket.receive.buffer.bytes=1048576 socket.send.buffer.bytes=1048576 queued.max.requests=500 fetch.purgatory.purge.interval.requests=100 producer.purgatory.purge.interval.requests=100 controlled.shutdown.enable=true -- Emmett Butler | Senior Software Engineer <http://www.parsely.com/?utm_source=Signature_medium=emmett-butler_campaign=Signature>
Re: [pykafka-user] Re: Timestamp based reset_offset fails with OffsetOutOfRangeError
Thanks! I ask because I think it's possible that only having a single log segment (as your partition does) hamstrings the functionality of reset_offsets(). I haven't verified this experimentally, but I think it's possible. Maybe someone on the Kafka users group has insight. On Tue, Jun 19, 2018 at 1:55 AM, Moe wrote: > Thanks for getting back to me so speedily. To be frank, I do not know how > log segments differ from logs. > > *# This is a view into my logs folder:* > > > *# And this is the view into that folder for my offset_retrieval_test.001 > partition (the topic in question):* > > > Please let me know if this isn't what you were asking for! The topic has > been auto created so all settings are default (7 days retention if my > memory serves me right, but I'm sure you know that better than me). > > I appreciate the time and effort, > Moritz > > 2018-06-18 18:27 GMT+02:00 Emmett Butler : > >> How many log segments does this partition have, and what are the >> retention settings for the topic? You can find the log segments by looking >> in the directory pointed to by the log.dirs server configuration. >> >> On Mon, Jun 18, 2018 at 7:43 AM, Moe wrote: >> >>> *# Sorry, I should have mentioned that the timestamp I use is one I >>> read from the server gui* >>> >>> >>> Am Montag, 18. Juni 2018 16:39:50 UTC+2 schrieb Moe: >>>> >>>> *Hi,* >>>> >>>> >>>> *I'm struggling to implement a time(stamp) based reset_offset and am >>>> hoping someone here can show me the light. In essence when I call >>>> reset_offset with a timestamp I get * >>>> >>>> *"OffsetOutOfRangeError" * >>>> >>>> >>>> *# Setup:* >>>> - Kafka: >>>> Kafka==1.1.0 (also tried with 1.0.0) running in Docker >>>> (Wurstmeister/Kafka image) >>>> >>>> - Consumer/Producer: >>>> Ubuntu17.04 >>>> pykafka2.7.0 >>>> python3.6 >>>> >>>> >>>> *# Kafka Connection* >>>> client = KafkaClient(hosts='10.25.42.127:9092', broker_version="1.0.0") >>>> topic = client.topics['offset_retrieval_test.001'.encode()] >>>> >>>> *# Producer* >>>> delivery_reports=False >>>> auto_start=True >>>> linger_ms=10.0 >>>> >>>> ksp = topic.get_producer(delivery_reports=delivery_reports,linger_ >>>> ms=linger_ms,auto_start=auto_start) >>>> >>>> for i in range(100): >>>> >>>> dict_obj = { >>>> 'time': time.time(), >>>> 'value': i >>>> } >>>> >>>> data = json.dumps(dict_obj) >>>> >>>> ksp.produce(data.encode()) >>>> time.sleep(0.25) >>>> >>>> *# This results in data being written to my Kafka Server as follows:* >>>> >>>> >>>> *# But when I call reset offset such as:* >>>> offset_ts = int(time.mktime(time.strptime('2018-06-18 15:33:22', >>>> '%Y-%m-%d %H:%M:%S'))*1000) >>>> consumer.reset_offsets([(topic.partitions[0],offset_ts)]) >>>> >>>> while True: >>>> message = consumer.consume() >>>> dict_obj = json.loads(message.value) >>>> >>>> print(dict_obj) >>>> >>>> *# I get the below error and a reset to 0* >>>> >>>> >>>> *# Potentially related; printing the message timestamp (as below) >>>> results in 0 being printed* >>>> >>>> while True: >>>> message = consumer.consume() >>>> dict_obj = json.loads(message.value) >>>> >>>> print(message.timestamp) >>>> >>>> >>>> *Thanks a million,* >>>> Moritz >>>> >>>> >>>> -- >>> You received this message because you are subscribed to the Google >>> Groups "pykafka-user" group. >>> To unsubscribe from this group and stop receiving emails from it, send >>> an email to pykafka-user+unsubscr...@googlegroups.com. >>> To post to this group, send email to pykafka-u...@googlegroups.com. >>> Visit this group at https://groups.google.com/group/pykafka-user. >>> To view this discussion on the web visit https://groups.google.com/d/ms >>> gid/pykafka-user/77c980bb-d3e8-49f9-bbe8-358566ce797b%40googlegroups.com >>> <https://groups
Fwd: coordinator load + OffsetFetchRequest error = consumption failure
Hi Kafka users, *tldr questions;* *1. Is it normal or expected for the coordinator load state to last for 6 hours? Is this load time affected by log retention settings, message production rate, or other parameters?* *2. Do non-pykafka clients handle COORDINATOR_LOAD_IN_PROGRESS by consuming only from the non-erroring partitions? Pykafka's insistence that all partitions return successful OffsetFetchResponses can be a source of consumption downtime.* *3. Why does Kafka sometimes select a non-synced replica as the preferred replica during coordinator loads? How can I reassign partition leaders to replicas in the ISR?* *4. Are all of these questions moot because I should just be using a newer version of Kafka than 0.8.2.1?* I'm using Kafka 0.8.2.1, running a topic with 200 partitions and RF=3, with log retention set to about 1GB. An unknown event caused the cluster to enter the "coordinator load" or "group load" state. A few signals made this apparent: the pykafka-based consumers began to fail <https://github.com/Parsely/pykafka/blob/858554029830e15cfa6d15df002d1772672d8ee0/pykafka/simpleconsumer.py#L643> during FetchOffsetRequests with error code 14 COORDINATOR_LOAD_IN_PROGRESS for some subset of partitions. These errors were triggered when consuming with a consumer group that had existed since before the coordinator load. In broker logs, messages like this appeared: [2018-05...] ERROR Controller 17 epoch 20 initiated state change for partition [my.cool.topic,144] from OnlinePartition to OnlinePartition failed (state.change.logger) kafka.common.StateChangeFailedException: encountered error while electing leader for partition [my.cool.topic,144] due to: Preferred replica 11 for partition [my.cool.topic,144] is either not alive or not in the isr. Current leader and ISR: [{"leader":12,"leader_epoch":7,"isr":[12,13]}]. For some reason, Kafka decided that replica 11 was the "preferred" replica despite the fact that it was not in the ISR. To my knowledge, consumption *could* continue uninterrupted from either replica 12 or 13 while 11 resynchronized - it's not clear why Kafka chose a non-synced replica as the preferred leader. The above-described behavior lasted for about 6 hours, during which the pykafka fetch_offsets error made message consumption impossible. While the coordinator load was still in progress, other consumer groups were able to consume the topic without error. In fact, the eventual fix was to restart the broken consumers with a new consumer_group name. *Questions* 1. Is it normal or expected for the coordinator load state to last for 6 hours? Is this load time affected by log retention settings, message production rate, or other parameters? 2. Do non-pykafka clients handle COORDINATOR_LOAD_IN_PROGRESS by consuming only from the non-erroring partitions? Pykafka's insistence that all partitions return successful OffsetFetchResponses can be a source of consumption downtime. 3. Why does Kafka sometimes select a non-synced replica as the preferred replica during coordinator loads? How can I reassign partition leaders to replicas in the ISR? 4. Are all of these questions moot because I should just be using a newer version of Kafka? Thanks for your help, and please let me know if I can provide additional information or answer additional questions. Broker config options: broker.id=10 port=9092 zookeeper.connect=/kafka5 log.dirs=* delete.topic.enable=true replica.fetch.max.bytes=1048576 replica.fetch.wait.max.ms=500 replica.high.watermark.checkpoint.interval.ms=5000 replica.socket.timeout.ms=3 replica.socket.receive.buffer.bytes=65536 replica.lag.time.max.ms=1 replica.lag.max.messages=4000 controller.socket.timeout.ms=3 message.max.bytes=100 auto.create.topics.enable=false log.index.interval.bytes=4096 log.index.size.max.bytes=10485760 log.retention.hours=96 log.roll.hours=168 log.retention.check.interval.ms=30 log.segment.bytes=1073741824 zookeeper.connection.timeout.ms=6000 zookeeper.sync.time.ms=2000 num.io.threads=8 socket.request.max.bytes=104857600 num.replica.fetchers=4 controller.message.queue.size=10 num.partitions=8 log.flush.interval.ms=6 log.flush.interval.messages=6 log.flush.scheduler.interval.ms=2000 num.network.threads=8 socket.receive.buffer.bytes=1048576 socket.send.buffer.bytes=1048576 queued.max.requests=500 fetch.purgatory.purge.interval.requests=100 producer.purgatory.purge.interval.requests=100 controlled.shutdown.enable=true -- Emmett Butler | Senior Software Engineer <http://www.parsely.com/?utm_source=Signature_medium=emmett-butler_campaign=Signature>
Re: Question regarding offset commits by Consumers - docs
>From the docs <https://kafka.apache.org/documentation/#impl_offsettracking> : > The brokers periodically compact the offsets topic since it only needs to > maintain the most recent offset commit per partition Offsets are maintained per partition per consumer group, so it doesn't matter which member of a consumer group is reading a given partition - the offset will remain consistent. On Wed, May 30, 2018 at 9:23 AM, M. Manna wrote: > Hello, > > I was trying to remember from docs (it's been a while) how the last > committed offsets work i.e. whether it's being tracked per consumer > group-basis or something else. This is specific to when auto.offset.reset > is set to "earliest"/"latest" and the last committed offset is determined. > > > . For example: > > 1) C0, C1, C2 are subscribed to topics t0 and t1 - each topic has 3 > partitions: > 2) Using range assignment - C0 [t0p0,t1p0] C1[t0p1,t1p1] and C2[t0p2,t1p2] > 3) Now C2 dies or leaves, or "Deactivates" - now rebalance occurs > 4) After Rebalance - C0[t0p0, t0p1, t1p0, t1p1] and C1[t0p2, t1p2] are the > new assignments > > Assuming that C0,C1,C2 are both part of *same *consumer group, and C2 > successfully committed to offset #6 (t0p2 and t1p2) before leaving, will #6 > be considered as the latest committed offset for that consumer group? In > other words, reassignment of that partition to any existing consumer under > that group will acknowlede #6 as the latest offset? > > If someone could quote the docs, it'll be appreciated. Meanwhile, I will > try to figure out something from the code if possible. > > Thanks, > -- Emmett Butler | Senior Software Engineer <http://www.parsely.com/?utm_source=Signature_medium=emmett-butler_campaign=Signature>
coordinator load + OffsetFetchRequest error = consumption failure
Hi Kafka users, *tldr questions;* *1. Is it normal or expected for the coordinator load state to last for 6 hours? Is this load time affected by log retention settings, message production rate, or other parameters?* *2. Do non-pykafka clients handle COORDINATOR_LOAD_IN_PROGRESS by consuming only from the non-erroring partitions? Pykafka's insistence that all partitions return successful OffsetFetchResponses can be a source of consumption downtime.* *3. Why does Kafka sometimes select a non-synced replica as the preferred replica during coordinator loads? How can I reassign partition leaders to replicas in the ISR?* *4. Are all of these questions moot because I should just be using a newer version of Kafka than 0.8.2.1?* I'm using Kafka 0.8.2.1, running a topic with 200 partitions and RF=3, with log retention set to about 1GB. An unknown event caused the cluster to enter the "coordinator load" or "group load" state. A few signals made this apparent: the pykafka-based consumers began to fail <https://github.com/Parsely/pykafka/blob/858554029830e15cfa6d15df002d1772672d8ee0/pykafka/simpleconsumer.py#L643> during FetchOffsetRequests with error code 14 COORDINATOR_LOAD_IN_PROGRESS for some subset of partitions. These errors were triggered when consuming with a consumer group that had existed since before the coordinator load. In broker logs, messages like this appeared: [2018-05...] ERROR Controller 17 epoch 20 initiated state change for partition [my.cool.topic,144] from OnlinePartition to OnlinePartition failed (state.change.logger) kafka.common.StateChangeFailedException: encountered error while electing leader for partition [my.cool.topic,144] due to: Preferred replica 11 for partition [my.cool.topic,144] is either not alive or not in the isr. Current leader and ISR: [{"leader":12,"leader_epoch":7,"isr":[12,13]}]. For some reason, Kafka decided that replica 11 was the "preferred" replica despite the fact that it was not in the ISR. To my knowledge, consumption *could* continue uninterrupted from either replica 12 or 13 while 11 resynchronized - it's not clear why Kafka chose a non-synced replica as the preferred leader. The above-described behavior lasted for about 6 hours, during which the pykafka fetch_offsets error made message consumption impossible. While the coordinator load was still in progress, other consumer groups were able to consume the topic without error. In fact, the eventual fix was to restart the broken consumers with a new consumer_group name. *Questions* 1. Is it normal or expected for the coordinator load state to last for 6 hours? Is this load time affected by log retention settings, message production rate, or other parameters? 2. Do non-pykafka clients handle COORDINATOR_LOAD_IN_PROGRESS by consuming only from the non-erroring partitions? Pykafka's insistence that all partitions return successful OffsetFetchResponses can be a source of consumption downtime. 3. Why does Kafka sometimes select a non-synced replica as the preferred replica during coordinator loads? How can I reassign partition leaders to replicas in the ISR? 4. Are all of these questions moot because I should just be using a newer version of Kafka? Thanks for your help, and please let me know if I can provide additional information or answer additional questions. Broker config options: broker.id=10 port=9092 zookeeper.connect=/kafka5 log.dirs=* delete.topic.enable=true replica.fetch.max.bytes=1048576 replica.fetch.wait.max.ms=500 replica.high.watermark.checkpoint.interval.ms=5000 replica.socket.timeout.ms=3 replica.socket.receive.buffer.bytes=65536 replica.lag.time.max.ms=1 replica.lag.max.messages=4000 controller.socket.timeout.ms=3 message.max.bytes=100 auto.create.topics.enable=false log.index.interval.bytes=4096 log.index.size.max.bytes=10485760 log.retention.hours=96 log.roll.hours=168 log.retention.check.interval.ms=30 log.segment.bytes=1073741824 zookeeper.connection.timeout.ms=6000 zookeeper.sync.time.ms=2000 num.io.threads=8 socket.request.max.bytes=104857600 num.replica.fetchers=4 controller.message.queue.size=10 num.partitions=8 log.flush.interval.ms=6 log.flush.interval.messages=6 log.flush.scheduler.interval.ms=2000 num.network.threads=8 socket.receive.buffer.bytes=1048576 socket.send.buffer.bytes=1048576 queued.max.requests=500 fetch.purgatory.purge.interval.requests=100 producer.purgatory.purge.interval.requests=100 controlled.shutdown.enable=true -- Emmett Butler | Senior Software Engineer <http://www.parsely.com/?utm_source=Signature_medium=emmett-butler_campaign=Signature>
Re: ListOffsets parameters
Wow, thanks so much for the detailed explanation. I've verified that I can replicate your results, and adapted it for inclusion in pykafka's documentation, since this API is a constant source of confusion for users. https://github.com/Parsely/pykafka/commit/340f8e16c8b0d9830b6fd889f3be7570015ba3eb On Thu, Apr 12, 2018 at 1:19 PM, Andras Beni <andrasb...@cloudera.com> wrote: > Hi Emmett, > > ListOffsets API tells you about the log segments belonging to (the given > partitions of) a topic. > I think I better explain how it behaves by example. > I have a topic called 'test2' with three partitions (0..2). I produced 2665 > messages to its partition 0. I set up the topic so that it rolls a new > segment after around 530 messages. > My two oldest segments were deleted, so now in my kafka/data/test2-0 > directory I have these files: > /var/local/kafka/data/test2-0/1059.log > /var/local/kafka/data/test2-0/2119.log > /var/local/kafka/data/test2-0/1589.log > /var/local/kafka/data/test2-0/2649.log > Let's say I call ListOffset API version 0 now with following parameters > Topic: test2, partition: 0, timestamp: -1 ( == latest), numberOfOffsets : > 100 > This will result in an offset list of [2665,2649,2119,1589,1059] One item > for each first offset in a file and the last offset (but not more than 100 > items) > If I set numberOfOffsets to 3, I only get 3 items: [2665,2649,2119] > Now let's say I use timestamp -2 ( == oldest) this way I only get back the > oldest offset [1059] regardless of numberOfOffsets. > Using a real timestamp (e.g. 152355948) will show me offsets that were > appended to the log before that time (based on last modified date of the > file). > If the timestamp means a moment before the oldest message, the returned > array will be empty. > > In version 1 and 2 request and response have changed: > If timestamp == -1, the latest offset is returned (except if isolation > level is read committed, because then the latest stable offset is returned) > if timestamp == -2, the oldest offset is returned > if timestamp is a real time value, the log is searched for the last offset > whose timestamp is smaller than given timestamp. > > I hope this helps, > Andras > > > > On Mon, Mar 26, 2018 at 6:33 PM, Emmett Butler <emm...@parsely.com> wrote: > > > Hi users, > > > > I'm the maintainer of the PyKafka <https://github.com/parsely/pykafka> > > library and I'm working on improving its support for the ListOffsets API. > > Some questions: > > > > Kafka version: 1.0.0 > > I'm using this documentation > > <https://kafka.apache.org/protocol#The_Messages_ListOffsets> for > > reference. > > > > In some cases, ListOffsets requests return an empty array of offsets. Is > > this expected behavior? If so, when? > > > > What format is the Timestamp parameter expected in? I've been using > > milliseconds since epoch (python: time.time() * 1000), but I haven't > > managed to get a response that isn't either [0] or [] using this > approach. > > Could this have to do with the number of log segments on my topic, or the > > presence of a time index? How do I make a request including a Timestamp > > (not special values -1 or -2) that returns a valid offset? What is the > > meaning of a [0] response in this context? > > > > What is the MaxNumberOfOffsets parameter supposed to do? When I request > max > > 10 offsets, I get back [12345, 0] (two offsets). Again, could this have > to > > do with the number of log segments on my topic? > > > > Related PyKafka issue tickets for reference: > > https://github.com/Parsely/pykafka/issues/728 > > https://github.com/Parsely/pykafka/issues/733 > > > > Thanks for your help. > > > > -- > > Emmett Butler | Software Engineer > > <http://www.parsely.com/?utm_source=Signature_medium= > > emmett-butler_campaign=Signature> > > > -- Emmett Butler | Software Engineer <http://www.parsely.com/?utm_source=Signature_medium=emmett-butler_campaign=Signature>
Fwd: ListOffsets parameters
Hi users, I'm the maintainer of the PyKafka <https://github.com/parsely/pykafka> library and I'm working on improving its support for the ListOffsets API. Some questions: Kafka version: 1.0.0 I'm using this documentation <https://kafka.apache.org/protocol#The_Messages_ListOffsets> for reference. In some cases, ListOffsets requests return an empty array of offsets. Is this expected behavior? If so, when? What format is the Timestamp parameter expected in? I've been using milliseconds since epoch (python: time.time() * 1000), but I haven't managed to get a response that isn't either [0] or [] using this approach. Could this have to do with the number of log segments on my topic, or the presence of a time index? How do I make a request including a Timestamp (not special values -1 or -2) that returns a valid offset? What is the meaning of a [0] response in this context? What is the MaxNumberOfOffsets parameter supposed to do? When I request max 10 offsets, I get back [12345, 0] (two offsets). Again, could this have to do with the number of log segments on my topic? Related PyKafka issue tickets for reference: https://github.com/Parsely/pykafka/issues/728 https://github.com/Parsely/pykafka/issues/733 Thanks for your help. -- Emmett Butler | Software Engineer <http://www.parsely.com/?utm_source=Signature_medium=emmett-butler_campaign=Signature>
ListOffsets parameters
Hi users, I'm the maintainer of the PyKafka <https://github.com/parsely/pykafka> library and I'm working on improving its support for the ListOffsets API. Some questions: Kafka version: 1.0.0 I'm using this documentation <https://kafka.apache.org/protocol#The_Messages_ListOffsets> for reference. In some cases, ListOffsets requests return an empty array of offsets. Is this expected behavior? If so, when? What format is the Timestamp parameter expected in? I've been using milliseconds since epoch (python: time.time() * 1000), but I haven't managed to get a response that isn't either [0] or [] using this approach. Could this have to do with the number of log segments on my topic, or the presence of a time index? How do I make a request including a Timestamp (not special values -1 or -2) that returns a valid offset? What is the meaning of a [0] response in this context? What is the MaxNumberOfOffsets parameter supposed to do? When I request max 10 offsets, I get back [12345, 0] (two offsets). Again, could this have to do with the number of log segments on my topic? Related PyKafka issue tickets for reference: https://github.com/Parsely/pykafka/issues/728 https://github.com/Parsely/pykafka/issues/733 Thanks for your help. -- Emmett Butler | Software Engineer <http://www.parsely.com/?utm_source=Signature_medium=emmett-butler_campaign=Signature>
Clients list
Hi Kafka users, I maintain the PyKafka <https://github.com/Parsely/pykafka> python client library, and the PyKafka listing on the clients page <https://cwiki.apache.org/confluence/display/KAFKA/Clients#Clients-Python> is outdated. I made a Confluence account, but I don't see a way to edit the page. How can I get edit access to this page so I can keep PyKafka's listing up to date? -- Emmett Butler | Software Engineer <http://www.parsely.com/?utm_source=Signature_medium=emmett-butler_campaign=Signature>