Re: c++ kafka

2019-01-29 Thread Emmett Butler
Check out librdkafka
https://github.com/edenhill/librdkafka

On Tue, Jan 29, 2019 at 07:02 Dor Ben Dov  wrote:

> Hi,
> Can one tell me what is the best c++ library or approach if I have
> a c++ process in which I want it to write to kafka?
>
>
> Thanks in advanced,
> Dor
>
> This email and the information contained herein is proprietary and
> confidential and subject to the Amdocs Email Terms of Service, which you
> may review at https://www.amdocs.com/about/email-terms-of-service <
> https://www.amdocs.com/about/email-terms-of-service>
>
-- 
Emmett Butler | Senior Software Engineer
<http://www.parsely.com/?utm_source=Signature_medium=emmett-butler_campaign=Signature>
See what the world is paying attention to in *Currents
<https://currents.parsely.com/?utm_source=Signature_medium=emmett-butler_campaign=Signature>*


PyKafka 2.8.0 released

2018-09-25 Thread Emmett Butler
Hi everyone,

Yesterday Parse.ly released PyKafka <https://github.com/Parsely/pykafka>
2.8.0, which has been in the works since January. For those not familiar,
PyKafka is a pure-Python client for Kafka that maintains a
programmer-friendly API and offers optional integration with the librdkafka
C client for enhanced performance. Check out the release notes for 2.8.0
<https://github.com/Parsely/pykafka/releases/tag/2.8.0>, and install
PyKafka with pip install pykafka.

Here's a high-level overview of some of the most exciting changes in this
release:

   - Support for user-defined SerDes to simplify the handling of custom
   message payload formats
   - Decoupling of the "committed" and "consumed" concepts to allow
   granular management of offsets in client code
   - Implementation of non-busy waiting in the Producer
   - Tons of bugfixes

As always, please feel free to reach out with questions or comments about
PyKafka via the mailing list or the GitHub repo.
-- 
Emmett Butler | Senior Software Engineer
<http://www.parsely.com/?utm_source=Signature_medium=emmett-butler_campaign=Signature>


Re: Big Log Retention

2018-09-18 Thread Emmett Butler
In the particular case I'm referring to, I believe some partitions were
replicated to new nodes. These new nodes had been added to the cluster as a
result of human error, and were removed shortly thereafter. It's my
hypothesis that this resulted in many partitions having their replicas on
nodes that were no longer in the cluster and were never expected to rejoin.

Maybe this was a case in which the partition reassignment CLI tool would
have been useful?

On Thu, Sep 6, 2018 at 11:09 PM Brett Rann 
wrote:

> We have partitions that are in the 100s of GBs.
>
> It shouldn't have to shuffle around GB chunks of data unless you have done
> partition moves, or had a broker offline for a while. Is that the case?
>
> If not your ISR problem is probably related to something else other than
> retention size.
>
> On Fri, Sep 7, 2018 at 7:25 AM Emmett Butler  wrote:
>
> > Hi users,
> >
> > What's the biggest topic you've seen in production use? The factors I'm
> > interested in are log.retention.* config parameters and number of
> > partitions.
> >
> > We run several 200-partition topics that have log retention set to
> roughly
> > 1GB per partition. Is this big by common usage standards, or not so big?
> >
> > I ask because we've had some situations in which ISRs appeared to take an
> > inordinately long time to synchronize, and I'm guessing this is partially
> > related to the need to shuttle around 1GB hunks of data.
> >
> > --
> > Emmett Butler | Senior Software Engineer
> > <
> >
> http://www.parsely.com/?utm_source=Signature_medium=emmett-butler_campaign=Signature
> > <
> http://www.parsely.com/?utm_source=Signature_medium=emmett-butler_campaign=Signature
> >>
> >
>


-- 
Emmett Butler | Senior Software Engineer
<http://www.parsely.com/?utm_source=Signature_medium=emmett-butler_campaign=Signature>


Big Log Retention

2018-09-06 Thread Emmett Butler
Hi users,

What's the biggest topic you've seen in production use? The factors I'm
interested in are log.retention.* config parameters and number of
partitions.

We run several 200-partition topics that have log retention set to roughly
1GB per partition. Is this big by common usage standards, or not so big?

I ask because we've had some situations in which ISRs appeared to take an
inordinately long time to synchronize, and I'm guessing this is partially
related to the need to shuttle around 1GB hunks of data.

-- 
Emmett Butler | Senior Software Engineer
<http://www.parsely.com/?utm_source=Signature_medium=emmett-butler_campaign=Signature>


Fwd: coordinator load + OffsetFetchRequest error = consumption failure

2018-08-22 Thread Emmett Butler
Hi Kafka users,

*tldr questions;*

*1. Is it normal or expected for the coordinator load state to last for 6
hours? Is this load time affected by log retention settings, message
production rate, or other parameters?*
*2. Do non-pykafka clients handle COORDINATOR_LOAD_IN_PROGRESS by consuming
only from the non-erroring partitions? Pykafka's insistence that all
partitions return successful OffsetFetchResponses can be a source of
consumption downtime.*
*3. Why does Kafka sometimes select a non-synced replica as the preferred
replica during coordinator loads? How can I reassign partition leaders to
replicas in the ISR?*
*4. Are all of these questions moot because I should just be using a newer
version of Kafka than 0.8.2.1?*


I'm using Kafka 0.8.2.1, running a topic with 200 partitions and RF=3, with
log retention set to about 1GB.

An unknown event caused the cluster to enter the "coordinator load" or
"group load" state. A few signals made this apparent: the pykafka-based
consumers began to fail
<https://github.com/Parsely/pykafka/blob/858554029830e15cfa6d15df002d1772672d8ee0/pykafka/simpleconsumer.py#L643>
during FetchOffsetRequests with error code 14 COORDINATOR_LOAD_IN_PROGRESS
for some subset of partitions. These errors were triggered when consuming
with a consumer group that had existed since before the coordinator load.
In broker logs, messages like this appeared:

[2018-05...] ERROR Controller 17 epoch 20 initiated state change for
partition [my.cool.topic,144] from OnlinePartition to OnlinePartition
failed (state.change.logger)
kafka.common.StateChangeFailedException: encountered error while electing
leader for partition [my.cool.topic,144] due to: Preferred replica 11 for
partition [my.cool.topic,144] is either not alive or not in the isr.
Current leader and ISR: [{"leader":12,"leader_epoch":7,"isr":[12,13]}].

For some reason, Kafka decided that replica 11 was the "preferred" replica
despite the fact that it was not in the ISR. To my knowledge, consumption
*could* continue uninterrupted from either replica 12 or 13 while 11
resynchronized - it's not clear why Kafka chose a non-synced replica as the
preferred leader.

The above-described behavior lasted for about 6 hours, during which the
pykafka fetch_offsets error made message consumption impossible. While the
coordinator load was still in progress, other consumer groups were able to
consume the topic without error. In fact, the eventual fix was to restart
the broken consumers with a new consumer_group name.

*Questions*

1. Is it normal or expected for the coordinator load state to last for 6
hours? Is this load time affected by log retention settings, message
production rate, or other parameters?
2. Do non-pykafka clients handle COORDINATOR_LOAD_IN_PROGRESS by consuming
only from the non-erroring partitions? Pykafka's insistence that all
partitions return successful OffsetFetchResponses can be a source of
consumption downtime.
3. Why does Kafka sometimes select a non-synced replica as the preferred
replica during coordinator loads? How can I reassign partition leaders to
replicas in the ISR?
4. Are all of these questions moot because I should just be using a newer
version of Kafka?

Thanks for your help, and please let me know if I can provide additional
information or answer additional questions.

Broker config options:

broker.id=10
port=9092
zookeeper.connect=/kafka5

log.dirs=*
delete.topic.enable=true
replica.fetch.max.bytes=1048576
replica.fetch.wait.max.ms=500
replica.high.watermark.checkpoint.interval.ms=5000
replica.socket.timeout.ms=3
replica.socket.receive.buffer.bytes=65536
replica.lag.time.max.ms=1
replica.lag.max.messages=4000
controller.socket.timeout.ms=3
message.max.bytes=100
auto.create.topics.enable=false
log.index.interval.bytes=4096
log.index.size.max.bytes=10485760
log.retention.hours=96
log.roll.hours=168
log.retention.check.interval.ms=30
log.segment.bytes=1073741824
zookeeper.connection.timeout.ms=6000
zookeeper.sync.time.ms=2000
num.io.threads=8
socket.request.max.bytes=104857600
num.replica.fetchers=4
controller.message.queue.size=10
num.partitions=8
log.flush.interval.ms=6
log.flush.interval.messages=6
log.flush.scheduler.interval.ms=2000
num.network.threads=8
socket.receive.buffer.bytes=1048576
socket.send.buffer.bytes=1048576
queued.max.requests=500
fetch.purgatory.purge.interval.requests=100
producer.purgatory.purge.interval.requests=100
controlled.shutdown.enable=true

-- 
Emmett Butler | Senior Software Engineer
<http://www.parsely.com/?utm_source=Signature_medium=emmett-butler_campaign=Signature>


Re: Ordering in a Kafka Pipe line

2018-07-06 Thread Emmett Butler
Kafka provides total ordering only within individual partitions. A topic
with multiple partitions is considered a "partial order", in which multiple
subsets of the topic are considered well-ordered but the topic as a whole
is not. The tradeoff between scalability via partitioning and message
ordering is fundamental to Kafka's design and unfortunately makes it
difficult to built Kafka-based pipelines that rely on strict ordering.

Partition keys are used to ensure that messages sharing a certain
characteristic will always go to the same partition. It's possible that you
could use this to your advantage given the ordering guarantees provided by
individual partitions, but you'd still be limited in your ability to scale
beyond a single partition for any given partition key.

On Fri, Jul 6, 2018 at 10:15 AM chinchu chinchu 
wrote:

> Hello,
> We  are building a data pipe line with the following semantics. We need to
> maintain order till the last unit of work is done in this pipe line .We
> cannot have a single partition  since that looses our ability to scale .
> Looked at using partitioning keys ,but does that guarantee order in the
> pipe line as such as opposed to between a topic and consumers?
>
> produces > consumes enriches and produces >consumes ,does work and produces
> > persistent store
>
> Thanks,
> Chinchu
>


-- 
Emmett Butler | Senior Software Engineer
<http://www.parsely.com/?utm_source=Signature_medium=emmett-butler_campaign=Signature>


Fwd: coordinator load + OffsetFetchRequest error = consumption failure

2018-06-21 Thread Emmett Butler
Hi Kafka users,

*tldr questions;*

*1. Is it normal or expected for the coordinator load state to last for 6
hours? Is this load time affected by log retention settings, message
production rate, or other parameters?*
*2. Do non-pykafka clients handle COORDINATOR_LOAD_IN_PROGRESS by consuming
only from the non-erroring partitions? Pykafka's insistence that all
partitions return successful OffsetFetchResponses can be a source of
consumption downtime.*
*3. Why does Kafka sometimes select a non-synced replica as the preferred
replica during coordinator loads? How can I reassign partition leaders to
replicas in the ISR?*
*4. Are all of these questions moot because I should just be using a newer
version of Kafka than 0.8.2.1?*


I'm using Kafka 0.8.2.1, running a topic with 200 partitions and RF=3, with
log retention set to about 1GB.

An unknown event caused the cluster to enter the "coordinator load" or
"group load" state. A few signals made this apparent: the pykafka-based
consumers began to fail
<https://github.com/Parsely/pykafka/blob/858554029830e15cfa6d15df002d1772672d8ee0/pykafka/simpleconsumer.py#L643>
during FetchOffsetRequests with error code 14 COORDINATOR_LOAD_IN_PROGRESS
for some subset of partitions. These errors were triggered when consuming
with a consumer group that had existed since before the coordinator load.
In broker logs, messages like this appeared:

[2018-05...] ERROR Controller 17 epoch 20 initiated state change for
partition [my.cool.topic,144] from OnlinePartition to OnlinePartition
failed (state.change.logger)
kafka.common.StateChangeFailedException: encountered error while electing
leader for partition [my.cool.topic,144] due to: Preferred replica 11 for
partition [my.cool.topic,144] is either not alive or not in the isr.
Current leader and ISR: [{"leader":12,"leader_epoch":7,"isr":[12,13]}].

For some reason, Kafka decided that replica 11 was the "preferred" replica
despite the fact that it was not in the ISR. To my knowledge, consumption
*could* continue uninterrupted from either replica 12 or 13 while 11
resynchronized - it's not clear why Kafka chose a non-synced replica as the
preferred leader.

The above-described behavior lasted for about 6 hours, during which the
pykafka fetch_offsets error made message consumption impossible. While the
coordinator load was still in progress, other consumer groups were able to
consume the topic without error. In fact, the eventual fix was to restart
the broken consumers with a new consumer_group name.

*Questions*

1. Is it normal or expected for the coordinator load state to last for 6
hours? Is this load time affected by log retention settings, message
production rate, or other parameters?
2. Do non-pykafka clients handle COORDINATOR_LOAD_IN_PROGRESS by consuming
only from the non-erroring partitions? Pykafka's insistence that all
partitions return successful OffsetFetchResponses can be a source of
consumption downtime.
3. Why does Kafka sometimes select a non-synced replica as the preferred
replica during coordinator loads? How can I reassign partition leaders to
replicas in the ISR?
4. Are all of these questions moot because I should just be using a newer
version of Kafka?

Thanks for your help, and please let me know if I can provide additional
information or answer additional questions.

Broker config options:

broker.id=10
port=9092
zookeeper.connect=/kafka5

log.dirs=*
delete.topic.enable=true
replica.fetch.max.bytes=1048576
replica.fetch.wait.max.ms=500
replica.high.watermark.checkpoint.interval.ms=5000
replica.socket.timeout.ms=3
replica.socket.receive.buffer.bytes=65536
replica.lag.time.max.ms=1
replica.lag.max.messages=4000
controller.socket.timeout.ms=3
message.max.bytes=100
auto.create.topics.enable=false
log.index.interval.bytes=4096
log.index.size.max.bytes=10485760
log.retention.hours=96
log.roll.hours=168
log.retention.check.interval.ms=30
log.segment.bytes=1073741824
zookeeper.connection.timeout.ms=6000
zookeeper.sync.time.ms=2000
num.io.threads=8
socket.request.max.bytes=104857600
num.replica.fetchers=4
controller.message.queue.size=10
num.partitions=8
log.flush.interval.ms=6
log.flush.interval.messages=6
log.flush.scheduler.interval.ms=2000
num.network.threads=8
socket.receive.buffer.bytes=1048576
socket.send.buffer.bytes=1048576
queued.max.requests=500
fetch.purgatory.purge.interval.requests=100
producer.purgatory.purge.interval.requests=100
controlled.shutdown.enable=true

-- 
Emmett Butler | Senior Software Engineer
<http://www.parsely.com/?utm_source=Signature_medium=emmett-butler_campaign=Signature>


Re: [pykafka-user] Re: Timestamp based reset_offset fails with OffsetOutOfRangeError

2018-06-19 Thread Emmett Butler
Thanks! I ask because I think it's possible that only having a single log
segment (as your partition does) hamstrings the functionality of
reset_offsets(). I haven't verified this experimentally, but I think it's
possible. Maybe someone on the Kafka users group has insight.

On Tue, Jun 19, 2018 at 1:55 AM, Moe  wrote:

> Thanks for getting back to me so speedily. To be frank, I do not know how
> log segments differ from logs.
>
> *# This is a view into my logs folder:*
>
>
> *# And this is the view into that folder for my offset_retrieval_test.001
> partition (the topic in question):*
>
>
> Please let me know if this isn't what you were asking for! The topic has
> been auto created so all settings are default (7 days retention if my
> memory serves me right, but I'm sure you know that better than me).
>
> I appreciate the time and effort,
> Moritz
>
> 2018-06-18 18:27 GMT+02:00 Emmett Butler :
>
>> How many log segments does this partition have, and what are the
>> retention settings for the topic? You can find the log segments by looking
>> in the directory pointed to by the log.dirs server configuration.
>>
>> On Mon, Jun 18, 2018 at 7:43 AM, Moe  wrote:
>>
>>> *# Sorry, I should have mentioned that the timestamp I use is  one I
>>> read from the server gui*
>>>
>>>
>>> Am Montag, 18. Juni 2018 16:39:50 UTC+2 schrieb Moe:
>>>>
>>>> *Hi,*
>>>>
>>>>
>>>> *I'm struggling to implement a time(stamp) based reset_offset and am
>>>> hoping someone here can show me the light. In essence when I call
>>>> reset_offset with a timestamp I get *
>>>>
>>>> *"OffsetOutOfRangeError" *
>>>>
>>>>
>>>> *# Setup:*
>>>> - Kafka:
>>>> Kafka==1.1.0 (also tried with 1.0.0) running in Docker
>>>> (Wurstmeister/Kafka image)
>>>>
>>>> - Consumer/Producer:
>>>> Ubuntu17.04
>>>> pykafka2.7.0
>>>> python3.6
>>>>
>>>>
>>>> *# Kafka Connection*
>>>> client = KafkaClient(hosts='10.25.42.127:9092', broker_version="1.0.0")
>>>> topic = client.topics['offset_retrieval_test.001'.encode()]
>>>>
>>>> *# Producer*
>>>> delivery_reports=False
>>>> auto_start=True
>>>> linger_ms=10.0
>>>>
>>>> ksp = topic.get_producer(delivery_reports=delivery_reports,linger_
>>>> ms=linger_ms,auto_start=auto_start)
>>>>
>>>> for i in range(100):
>>>>
>>>> dict_obj = {
>>>> 'time': time.time(),
>>>> 'value': i
>>>> }
>>>>
>>>> data = json.dumps(dict_obj)
>>>>
>>>> ksp.produce(data.encode())
>>>> time.sleep(0.25)
>>>>
>>>> *# This results in data being written to my Kafka Server as follows:*
>>>>
>>>>
>>>> *# But when I call reset offset such as:*
>>>> offset_ts = int(time.mktime(time.strptime('2018-06-18 15:33:22',
>>>> '%Y-%m-%d %H:%M:%S'))*1000)
>>>> consumer.reset_offsets([(topic.partitions[0],offset_ts)])
>>>>
>>>> while True:
>>>> message = consumer.consume()
>>>> dict_obj = json.loads(message.value)
>>>>
>>>> print(dict_obj)
>>>>
>>>> *# I get the below error and a reset to 0*
>>>>
>>>>
>>>> *# Potentially related; printing the message timestamp (as below)
>>>> results in 0 being printed*
>>>>
>>>> while True:
>>>> message = consumer.consume()
>>>> dict_obj = json.loads(message.value)
>>>>
>>>> print(message.timestamp)
>>>>
>>>>
>>>> *Thanks a million,*
>>>> Moritz
>>>>
>>>>
>>>> --
>>> You received this message because you are subscribed to the Google
>>> Groups "pykafka-user" group.
>>> To unsubscribe from this group and stop receiving emails from it, send
>>> an email to pykafka-user+unsubscr...@googlegroups.com.
>>> To post to this group, send email to pykafka-u...@googlegroups.com.
>>> Visit this group at https://groups.google.com/group/pykafka-user.
>>> To view this discussion on the web visit https://groups.google.com/d/ms
>>> gid/pykafka-user/77c980bb-d3e8-49f9-bbe8-358566ce797b%40googlegroups.com
>>> <https://groups

Fwd: coordinator load + OffsetFetchRequest error = consumption failure

2018-06-07 Thread Emmett Butler
Hi Kafka users,

*tldr questions;*

*1. Is it normal or expected for the coordinator load state to last for 6
hours? Is this load time affected by log retention settings, message
production rate, or other parameters?*
*2. Do non-pykafka clients handle COORDINATOR_LOAD_IN_PROGRESS by consuming
only from the non-erroring partitions? Pykafka's insistence that all
partitions return successful OffsetFetchResponses can be a source of
consumption downtime.*
*3. Why does Kafka sometimes select a non-synced replica as the preferred
replica during coordinator loads? How can I reassign partition leaders to
replicas in the ISR?*
*4. Are all of these questions moot because I should just be using a newer
version of Kafka than 0.8.2.1?*


I'm using Kafka 0.8.2.1, running a topic with 200 partitions and RF=3, with
log retention set to about 1GB.

An unknown event caused the cluster to enter the "coordinator load" or
"group load" state. A few signals made this apparent: the pykafka-based
consumers began to fail
<https://github.com/Parsely/pykafka/blob/858554029830e15cfa6d15df002d1772672d8ee0/pykafka/simpleconsumer.py#L643>
during FetchOffsetRequests with error code 14 COORDINATOR_LOAD_IN_PROGRESS
for some subset of partitions. These errors were triggered when consuming
with a consumer group that had existed since before the coordinator load.
In broker logs, messages like this appeared:

[2018-05...] ERROR Controller 17 epoch 20 initiated state change for
partition [my.cool.topic,144] from OnlinePartition to OnlinePartition
failed (state.change.logger)
kafka.common.StateChangeFailedException: encountered error while electing
leader for partition [my.cool.topic,144] due to: Preferred replica 11 for
partition [my.cool.topic,144] is either not alive or not in the isr.
Current leader and ISR: [{"leader":12,"leader_epoch":7,"isr":[12,13]}].

For some reason, Kafka decided that replica 11 was the "preferred" replica
despite the fact that it was not in the ISR. To my knowledge, consumption
*could* continue uninterrupted from either replica 12 or 13 while 11
resynchronized - it's not clear why Kafka chose a non-synced replica as the
preferred leader.

The above-described behavior lasted for about 6 hours, during which the
pykafka fetch_offsets error made message consumption impossible. While the
coordinator load was still in progress, other consumer groups were able to
consume the topic without error. In fact, the eventual fix was to restart
the broken consumers with a new consumer_group name.

*Questions*

1. Is it normal or expected for the coordinator load state to last for 6
hours? Is this load time affected by log retention settings, message
production rate, or other parameters?
2. Do non-pykafka clients handle COORDINATOR_LOAD_IN_PROGRESS by consuming
only from the non-erroring partitions? Pykafka's insistence that all
partitions return successful OffsetFetchResponses can be a source of
consumption downtime.
3. Why does Kafka sometimes select a non-synced replica as the preferred
replica during coordinator loads? How can I reassign partition leaders to
replicas in the ISR?
4. Are all of these questions moot because I should just be using a newer
version of Kafka?

Thanks for your help, and please let me know if I can provide additional
information or answer additional questions.

Broker config options:

broker.id=10
port=9092
zookeeper.connect=/kafka5

log.dirs=*
delete.topic.enable=true
replica.fetch.max.bytes=1048576
replica.fetch.wait.max.ms=500
replica.high.watermark.checkpoint.interval.ms=5000
replica.socket.timeout.ms=3
replica.socket.receive.buffer.bytes=65536
replica.lag.time.max.ms=1
replica.lag.max.messages=4000
controller.socket.timeout.ms=3
message.max.bytes=100
auto.create.topics.enable=false
log.index.interval.bytes=4096
log.index.size.max.bytes=10485760
log.retention.hours=96
log.roll.hours=168
log.retention.check.interval.ms=30
log.segment.bytes=1073741824
zookeeper.connection.timeout.ms=6000
zookeeper.sync.time.ms=2000
num.io.threads=8
socket.request.max.bytes=104857600
num.replica.fetchers=4
controller.message.queue.size=10
num.partitions=8
log.flush.interval.ms=6
log.flush.interval.messages=6
log.flush.scheduler.interval.ms=2000
num.network.threads=8
socket.receive.buffer.bytes=1048576
socket.send.buffer.bytes=1048576
queued.max.requests=500
fetch.purgatory.purge.interval.requests=100
producer.purgatory.purge.interval.requests=100
controlled.shutdown.enable=true

-- 
Emmett Butler | Senior Software Engineer
<http://www.parsely.com/?utm_source=Signature_medium=emmett-butler_campaign=Signature>


Re: Question regarding offset commits by Consumers - docs

2018-05-30 Thread Emmett Butler
>From the docs <https://kafka.apache.org/documentation/#impl_offsettracking>
:

> The brokers periodically compact the offsets topic since it only needs to
> maintain the most recent offset commit per partition

Offsets are maintained per partition per consumer group, so it doesn't
matter which member of a consumer group is reading a given partition - the
offset will remain consistent.


On Wed, May 30, 2018 at 9:23 AM, M. Manna  wrote:

> Hello,
>
> I was trying to remember from docs (it's been a while) how the last
> committed offsets work i.e. whether it's being tracked per consumer
> group-basis or something else. This is specific to when auto.offset.reset
> is set to "earliest"/"latest" and the last committed offset is determined.
>
>
> . For example:
>
> 1) C0, C1, C2 are subscribed to topics t0 and t1 - each topic has 3
> partitions:
> 2) Using range assignment - C0 [t0p0,t1p0] C1[t0p1,t1p1] and C2[t0p2,t1p2]
> 3) Now C2 dies or leaves, or "Deactivates" - now rebalance occurs
> 4) After Rebalance - C0[t0p0, t0p1, t1p0, t1p1] and C1[t0p2, t1p2] are the
> new assignments
>
> Assuming that C0,C1,C2 are both part of *same *consumer group, and C2
> successfully committed to offset #6 (t0p2 and t1p2) before leaving, will #6
> be considered as the latest committed offset for that consumer group? In
> other words, reassignment of that partition to any existing consumer under
> that group will acknowlede #6 as the latest offset?
>
> If someone could quote the docs, it'll be appreciated. Meanwhile, I will
> try to figure out something from the code if possible.
>
> Thanks,
>



-- 
Emmett Butler | Senior Software Engineer
<http://www.parsely.com/?utm_source=Signature_medium=emmett-butler_campaign=Signature>


coordinator load + OffsetFetchRequest error = consumption failure

2018-05-29 Thread Emmett Butler
Hi Kafka users,

*tldr questions;*

*1. Is it normal or expected for the coordinator load state to last for 6
hours? Is this load time affected by log retention settings, message
production rate, or other parameters?*
*2. Do non-pykafka clients handle COORDINATOR_LOAD_IN_PROGRESS by consuming
only from the non-erroring partitions? Pykafka's insistence that all
partitions return successful OffsetFetchResponses can be a source of
consumption downtime.*
*3. Why does Kafka sometimes select a non-synced replica as the preferred
replica during coordinator loads? How can I reassign partition leaders to
replicas in the ISR?*
*4. Are all of these questions moot because I should just be using a newer
version of Kafka than 0.8.2.1?*


I'm using Kafka 0.8.2.1, running a topic with 200 partitions and RF=3, with
log retention set to about 1GB.

An unknown event caused the cluster to enter the "coordinator load" or
"group load" state. A few signals made this apparent: the pykafka-based
consumers began to fail
<https://github.com/Parsely/pykafka/blob/858554029830e15cfa6d15df002d1772672d8ee0/pykafka/simpleconsumer.py#L643>
during FetchOffsetRequests with error code 14 COORDINATOR_LOAD_IN_PROGRESS
for some subset of partitions. These errors were triggered when consuming
with a consumer group that had existed since before the coordinator load.
In broker logs, messages like this appeared:

[2018-05...] ERROR Controller 17 epoch 20 initiated state change for
partition [my.cool.topic,144] from OnlinePartition to OnlinePartition
failed (state.change.logger)
kafka.common.StateChangeFailedException: encountered error while electing
leader for partition [my.cool.topic,144] due to: Preferred replica 11 for
partition [my.cool.topic,144] is either not alive or not in the isr.
Current leader and ISR: [{"leader":12,"leader_epoch":7,"isr":[12,13]}].

For some reason, Kafka decided that replica 11 was the "preferred" replica
despite the fact that it was not in the ISR. To my knowledge, consumption
*could* continue uninterrupted from either replica 12 or 13 while 11
resynchronized - it's not clear why Kafka chose a non-synced replica as the
preferred leader.

The above-described behavior lasted for about 6 hours, during which the
pykafka fetch_offsets error made message consumption impossible. While the
coordinator load was still in progress, other consumer groups were able to
consume the topic without error. In fact, the eventual fix was to restart
the broken consumers with a new consumer_group name.

*Questions*

1. Is it normal or expected for the coordinator load state to last for 6
hours? Is this load time affected by log retention settings, message
production rate, or other parameters?
2. Do non-pykafka clients handle COORDINATOR_LOAD_IN_PROGRESS by consuming
only from the non-erroring partitions? Pykafka's insistence that all
partitions return successful OffsetFetchResponses can be a source of
consumption downtime.
3. Why does Kafka sometimes select a non-synced replica as the preferred
replica during coordinator loads? How can I reassign partition leaders to
replicas in the ISR?
4. Are all of these questions moot because I should just be using a newer
version of Kafka?

Thanks for your help, and please let me know if I can provide additional
information or answer additional questions.

Broker config options:

broker.id=10
port=9092
zookeeper.connect=/kafka5

log.dirs=*
delete.topic.enable=true
replica.fetch.max.bytes=1048576
replica.fetch.wait.max.ms=500
replica.high.watermark.checkpoint.interval.ms=5000
replica.socket.timeout.ms=3
replica.socket.receive.buffer.bytes=65536
replica.lag.time.max.ms=1
replica.lag.max.messages=4000
controller.socket.timeout.ms=3
message.max.bytes=100
auto.create.topics.enable=false
log.index.interval.bytes=4096
log.index.size.max.bytes=10485760
log.retention.hours=96
log.roll.hours=168
log.retention.check.interval.ms=30
log.segment.bytes=1073741824
zookeeper.connection.timeout.ms=6000
zookeeper.sync.time.ms=2000
num.io.threads=8
socket.request.max.bytes=104857600
num.replica.fetchers=4
controller.message.queue.size=10
num.partitions=8
log.flush.interval.ms=6
log.flush.interval.messages=6
log.flush.scheduler.interval.ms=2000
num.network.threads=8
socket.receive.buffer.bytes=1048576
socket.send.buffer.bytes=1048576
queued.max.requests=500
fetch.purgatory.purge.interval.requests=100
producer.purgatory.purge.interval.requests=100
controlled.shutdown.enable=true

-- 
Emmett Butler | Senior Software Engineer
<http://www.parsely.com/?utm_source=Signature_medium=emmett-butler_campaign=Signature>


Re: ListOffsets parameters

2018-04-12 Thread Emmett Butler
Wow, thanks so much for the detailed explanation. I've verified that I can
replicate your results, and adapted it for inclusion in pykafka's
documentation, since this API is a constant source of confusion for users.
https://github.com/Parsely/pykafka/commit/340f8e16c8b0d9830b6fd889f3be7570015ba3eb

On Thu, Apr 12, 2018 at 1:19 PM, Andras Beni <andrasb...@cloudera.com>
wrote:

> Hi Emmett,
>
> ListOffsets API tells you about the log segments belonging to (the given
> partitions of) a topic.
> I think I better explain how it behaves by example.
> I have a topic called 'test2' with three partitions (0..2). I produced 2665
> messages to its partition 0. I set up the topic so that it rolls a new
> segment after around 530 messages.
> My two oldest segments were deleted, so now in my kafka/data/test2-0
> directory I have these files:
> /var/local/kafka/data/test2-0/1059.log
> /var/local/kafka/data/test2-0/2119.log
> /var/local/kafka/data/test2-0/1589.log
> /var/local/kafka/data/test2-0/2649.log
> Let's say I call ListOffset API version 0 now with following parameters
> Topic: test2, partition: 0, timestamp: -1 ( == latest), numberOfOffsets :
> 100
> This will result in an offset list of [2665,2649,2119,1589,1059] One item
> for each first offset in a file and the last offset (but not more than 100
> items)
> If I set numberOfOffsets to 3, I only get 3 items: [2665,2649,2119]
> Now let's say I use timestamp -2 ( == oldest) this way I only get back the
> oldest offset [1059] regardless of numberOfOffsets.
> Using a real timestamp (e.g. 152355948) will show me offsets that were
> appended to the log before that time (based on last modified date of the
> file).
> If the timestamp means a moment before the oldest message, the returned
> array will be empty.
>
> In version 1 and 2 request and response have changed:
> If timestamp == -1, the latest offset is returned (except if isolation
> level is read committed, because then the latest stable offset is returned)
> if timestamp == -2, the oldest offset is returned
> if timestamp is a real time value, the log is searched for the last offset
> whose timestamp is smaller than given timestamp.
>
> I hope this helps,
> Andras
>
>
>
> On Mon, Mar 26, 2018 at 6:33 PM, Emmett Butler <emm...@parsely.com> wrote:
>
> > Hi users,
> >
> > I'm the maintainer of the PyKafka <https://github.com/parsely/pykafka>
> > library and I'm working on improving its support for the ListOffsets API.
> > Some questions:
> >
> > Kafka version: 1.0.0
> > I'm using this documentation
> > <https://kafka.apache.org/protocol#The_Messages_ListOffsets> for
> > reference.
> >
> > In some cases, ListOffsets requests return an empty array of offsets. Is
> > this expected behavior? If so, when?
> >
> > What format is the Timestamp parameter expected in? I've been using
> > milliseconds since epoch (python: time.time() * 1000), but I haven't
> > managed to get a response that isn't either [0] or [] using this
> approach.
> > Could this have to do with the number of log segments on my topic, or the
> > presence of a time index? How do I make a request including a Timestamp
> > (not special values -1 or -2) that returns a valid offset? What is the
> > meaning of a [0] response in this context?
> >
> > What is the MaxNumberOfOffsets parameter supposed to do? When I request
> max
> > 10 offsets, I get back [12345, 0] (two offsets). Again, could this have
> to
> > do with the number of log segments on my topic?
> >
> > Related PyKafka issue tickets for reference:
> > https://github.com/Parsely/pykafka/issues/728
> > https://github.com/Parsely/pykafka/issues/733
> >
> > Thanks for your help.
> >
> > --
> > Emmett Butler | Software Engineer
> > <http://www.parsely.com/?utm_source=Signature_medium=
> > emmett-butler_campaign=Signature>
> >
>



-- 
Emmett Butler | Software Engineer
<http://www.parsely.com/?utm_source=Signature_medium=emmett-butler_campaign=Signature>


Fwd: ListOffsets parameters

2018-03-26 Thread Emmett Butler
Hi users,

I'm the maintainer of the PyKafka <https://github.com/parsely/pykafka>
library and I'm working on improving its support for the ListOffsets API.
Some questions:

Kafka version: 1.0.0
I'm using this documentation
<https://kafka.apache.org/protocol#The_Messages_ListOffsets> for reference.

In some cases, ListOffsets requests return an empty array of offsets. Is
this expected behavior? If so, when?

What format is the Timestamp parameter expected in? I've been using
milliseconds since epoch (python: time.time() * 1000), but I haven't
managed to get a response that isn't either [0] or [] using this approach.
Could this have to do with the number of log segments on my topic, or the
presence of a time index? How do I make a request including a Timestamp
(not special values -1 or -2) that returns a valid offset? What is the
meaning of a [0] response in this context?

What is the MaxNumberOfOffsets parameter supposed to do? When I request max
10 offsets, I get back [12345, 0] (two offsets). Again, could this have to
do with the number of log segments on my topic?

Related PyKafka issue tickets for reference:
https://github.com/Parsely/pykafka/issues/728
https://github.com/Parsely/pykafka/issues/733

Thanks for your help.

-- 
Emmett Butler | Software Engineer
<http://www.parsely.com/?utm_source=Signature_medium=emmett-butler_campaign=Signature>


ListOffsets parameters

2018-03-05 Thread Emmett Butler
Hi users,

I'm the maintainer of the PyKafka <https://github.com/parsely/pykafka>
library and I'm working on improving its support for the ListOffsets API.
Some questions:

Kafka version: 1.0.0
I'm using this documentation
<https://kafka.apache.org/protocol#The_Messages_ListOffsets> for reference.

In some cases, ListOffsets requests return an empty array of offsets. Is
this expected behavior? If so, when?

What format is the Timestamp parameter expected in? I've been using
milliseconds since epoch (python: time.time() * 1000), but I haven't
managed to get a response that isn't either [0] or [] using this approach.
Could this have to do with the number of log segments on my topic, or the
presence of a time index? How do I make a request including a Timestamp
(not special values -1 or -2) that returns a valid offset? What is the
meaning of a [0] response in this context?

What is the MaxNumberOfOffsets parameter supposed to do? When I request max
10 offsets, I get back [12345, 0] (two offsets). Again, could this have to
do with the number of log segments on my topic?

Related PyKafka issue tickets for reference:
https://github.com/Parsely/pykafka/issues/728
https://github.com/Parsely/pykafka/issues/733

Thanks for your help.

-- 
Emmett Butler | Software Engineer
<http://www.parsely.com/?utm_source=Signature_medium=emmett-butler_campaign=Signature>


Clients list

2017-08-23 Thread Emmett Butler
Hi Kafka users,

I maintain the PyKafka <https://github.com/Parsely/pykafka> python client
library, and the PyKafka listing on the clients page
<https://cwiki.apache.org/confluence/display/KAFKA/Clients#Clients-Python> is
outdated. I made a Confluence account, but I don't see a way to edit the
page. How can I get edit access to this page so I can keep PyKafka's
listing up to date?

-- 
Emmett Butler | Software Engineer
<http://www.parsely.com/?utm_source=Signature_medium=emmett-butler_campaign=Signature>