Re: Java 8 stream consumer pattern

2017-05-18 Thread Robert Quinlivan
Thanks, Matthias, I had not considered this. I will take a look at the
streams use case.

Thanks

On Wed, May 17, 2017 at 4:58 PM, Matthias J. Sax <matth...@confluent.io>
wrote:

> Did you try out Kafka Streams API instead of wrapping the consumer? It
> does support Lambdas already:
>
> https://github.com/confluentinc/examples/blob/3.
> 2.x/kafka-streams/src/main/java/io/confluent/examples/streams/
> MapFunctionLambdaExample.java#L126
>
> Full docs: http://docs.confluent.io/current/streams/index.html
>
>
> -Matthias
>
> On 5/17/17 1:45 PM, Robert Quinlivan wrote:
> > Hello,
> >
> > I've been looking at writing a Java 8 streams API wrapper for the Kafka
> > consumer. Since this seems like a common use case I was wondering if
> > someone in the user community had already begun a project like this.
> >
> > My goal is to be able to get back a Stream<ConsumerRecord<K, V>> wrapping
> > the results of #poll() which can then be passed into a map/filter
> pipeline.
> > I am using an underlying blocking queue data structure to buffer in
> memory
> > and using Stream.generate() to pull records. Any recommendations on a
> best
> > approach here?
> >
> > Thanks
> >
>
>


-- 
Robert Quinlivan
Software Engineer, Signal


Java 8 stream consumer pattern

2017-05-17 Thread Robert Quinlivan
Hello,

I've been looking at writing a Java 8 streams API wrapper for the Kafka
consumer. Since this seems like a common use case I was wondering if
someone in the user community had already begun a project like this.

My goal is to be able to get back a Stream<ConsumerRecord<K, V>> wrapping
the results of #poll() which can then be passed into a map/filter pipeline.
I am using an underlying blocking queue data structure to buffer in memory
and using Stream.generate() to pull records. Any recommendations on a best
approach here?

Thanks
-- 
Robert Quinlivan
Software Engineer, Signal


Re: Strict ordering of messages in Kafka

2017-05-15 Thread Robert Quinlivan
You can do it, but the performance trade off will be very costly. There is
no ordering guarantee between partitions, so you would have to work with
topics with only one partition. I believe you would also need to set the
batch.size in the producer to 1 to ensure the messages aren't re-ordered
within the producer buffer. As far as I'm aware, acks=all isn't necessary.

On Mon, May 15, 2017 at 10:50 AM, João Peixoto <joao.harti...@gmail.com>
wrote:

> Afaik that is not possible out of the box as that would require
> synchronization across multiple threads/instances. The throughput of such
> an approach would be terrible as the parallelism of KStreams is tightly
> coupled with the number of partitions.
>
> I'd say if you need such ordering you should reconsider the partitioning
> key, as that would be the way to do it.
>
> On Mon, May 15, 2017 at 8:22 AM Wong, Janette <janette.w...@rbc.com.invalid
> >
> wrote:
>
> > Hi,
> >
> > Is it that there is no way to guarantee strict ordering of messages
> within
> > Kafka across partitions?
> > Within a single partition, the only way for strict ordering is to set
> > max.in.flight.requests.per.connection=1 and acks=all ?
> >
> > Thanks,
> > Janette
> >
> > Janette Wong, P.Eng. | janette.w...@rbc.com | Senior Application
> > Architect - Analytics | Digital Architecture | T: 416.348.6051 | M:
> > 647.221.7836 | RBC WPP, 3rd floor
> >
> >
> >
> > ___
> > If you received this email in error, please advise the sender (by return
> > email or otherwise) immediately. You have consented to receive the
> attached
> > electronically at the above-noted email address; please retain a copy of
> > this confirmation for future reference.
> >
> > Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur
> > immédiatement, par retour de courriel ou par un autre moyen. Vous avez
> > accepté de recevoir le(s) document(s) ci-joint(s) par voie électronique à
> > l'adresse courriel indiquée ci-dessus; veuillez conserver une copie de
> > cette confirmation pour les fins de reference future.
> >
>



-- 
Robert Quinlivan
Software Engineer, Signal


Re: possible kafka bug, maybe in console producer/consumer utils

2017-04-18 Thread Robert Quinlivan
;> >> or even a single read so I'll try here. My original msg, slightly
> >> >> edited, was:
> >> >>
> >> >> 
> >> >>
> >> >> (windows 2K8R2 fully patched, 16GB ram, fairly modern dual core xeon
> >> >> server, latest version of java)
> >> >>
> >> >> I've spent several days trying to sort out unexpected behaviour
> >> >> involving kafka and the kafka console producer and consumer.
> >> >>
> >> >>  If I set  the console produced and console consumer to look at the
> >> >> same topic then I can type lines into the producer window and see
> them
> >> >> appear in the consumer window, so it works.
> >> >>
> >> >> If I try to pipe in large amounts of data to the producer, some gets
> >> >> lost and the producer reports errors eg.
> >> >>
> >> >> [2017-04-17 18:14:05,868] ERROR Error when sending message to topic
> >> >> big_ptns1_repl1_nozip with key: null, value: 55 bytes with error:
> >> >> (org.apache.kafka.clients.
> >> >> producer.internals.ErrorLoggingCallback)
> >> >> org.apache.kafka.common.errors.TimeoutException: Batch containing 8
> >> >> record(s) expired due to timeout while requesting metadata from
> >> >> brokers for big_ptns1_repl1_nozip-0
> >> >>
> >> >> I'm using as input a file either shakespeare's full works (about 5.4
> >> >> meg ascii), or a much larger file of shakespear's full works
> >> >> replicated 900 times to make it about 5GB. Lines are ascii and short,
> >> >> and each line should be a single record when read in by the console
> >> >> producer. I need to do some benchmarking on time and space and this
> >> >> was my first try.
> >> >>
> >> >> As mentioned, data gets lost. I presume it is expected that any data
> >> >> we pipe into the producer should arrive in the consumer, so if I do
> >> >> this in one windows console:
> >> >>
> >> >> kafka-console-consumer.bat --bootstrap-server localhost:9092  --topic
> >> >> big_ptns1_repl1_nozip --zookeeper localhost:2181 >
> >> >> F:\Users\me\Desktop\shakespear\single_all_shakespear_OUT.txt
> >> >>
> >> >> and this in another:
> >> >>
> >> >> kafka-console-producer.bat --broker-list localhost:9092  --topic
> >> >> big_ptns1_repl1_nozip <
> >> >> F:\Users\me\Desktop\shakespear\complete_works_no_bare_lines.txt
> >> >>
> >> >> then the output file "single_all_shakespear_OUT.txt" should be
> >> >> identical to the input file "complete_works_no_bare_lines.txt"
> except
> >> >> it's not. For the complete works (sabout 5.4 meg uncompressed) I lost
> >> >> about 130K in the output.
> >> >> For the replicated shakespeare, which is about 5GB, I lost about 150
> >> meg.
> >> >>
> >> >> This can't be right surely and it's repeatable but happens at
> >> >> different places in the file when errors start to be produced, it
> >> >> seems.
> >> >>
> >> >> I've done this using all 3 versions of kafak in the 0.10.x.y branch
> >> >> and I get the same problem (the above commands were using the
> 0.10.0.0
> >> >> branch so they look a little obsolete but they are right for that
> >> >> branch I think). It's cost me some days.
> >> >> So, am I making a mistake, if so what?
> >> >>
> >> >> thanks
> >> >>
> >> >> jan
> >> >>
> >> >
> >>
> >
>



-- 
Robert Quinlivan
Software Engineer, Signal


Re: possible kafka bug, maybe in console producer/consumer utils

2017-04-18 Thread Robert Quinlivan
 need to do some benchmarking on time and space and this
> > >> was my first try.
> > >>
> > >> As mentioned, data gets lost. I presume it is expected that any data
> > >> we pipe into the producer should arrive in the consumer, so if I do
> > >> this in one windows console:
> > >>
> > >> kafka-console-consumer.bat --bootstrap-server localhost:9092  --topic
> > >> big_ptns1_repl1_nozip --zookeeper localhost:2181 >
> > >> F:\Users\me\Desktop\shakespear\single_all_shakespear_OUT.txt
> > >>
> > >> and this in another:
> > >>
> > >> kafka-console-producer.bat --broker-list localhost:9092  --topic
> > >> big_ptns1_repl1_nozip <
> > >> F:\Users\me\Desktop\shakespear\complete_works_no_bare_lines.txt
> > >>
> > >> then the output file "single_all_shakespear_OUT.txt" should be
> > >> identical to the input file "complete_works_no_bare_lines.txt" except
> > >> it's not. For the complete works (sabout 5.4 meg uncompressed) I lost
> > >> about 130K in the output.
> > >> For the replicated shakespeare, which is about 5GB, I lost about 150
> > meg.
> > >>
> > >> This can't be right surely and it's repeatable but happens at
> > >> different places in the file when errors start to be produced, it
> > >> seems.
> > >>
> > >> I've done this using all 3 versions of kafak in the 0.10.x.y branch
> > >> and I get the same problem (the above commands were using the 0.10.0.0
> > >> branch so they look a little obsolete but they are right for that
> > >> branch I think). It's cost me some days.
> > >> So, am I making a mistake, if so what?
> > >>
> > >> thanks
> > >>
> > >> jan
> > >>
> > >
> >
>



-- 
Robert Quinlivan
Software Engineer, Signal


Kafka producer does not respect added partitions

2017-03-24 Thread Robert Quinlivan
Hello,

I have added partitions to a topic. The new partitions appear in the
consumer assignments and in the topics listing they have the correct number
of ISRs. However, the producer still does not write to the new partitions.

My producer writes in a round-robin fashion, using the Cluster's reported
partition count. I have seen no mention of a need to restart or reconfigure
the producer in order to pick up the added partitions. Is this required?

Thanks
-- 
Robert Quinlivan
Software Engineer, Signal


Re: Kafka consumer offset reset

2017-03-21 Thread Robert Quinlivan
What offset do you want to reset them to? The easier way to adjust offsets
in 0.10 is to attach a consumer for the target topic-partition and seek to
the position you desire and commit that new offset.

On Tue, Mar 21, 2017 at 9:56 AM, Jakub Stransky <stransky...@gmail.com>
wrote:

> Hello,
>
> just recently migrated to using Kafka 0.10.1.0
>
> I would like to reset position for some consumers. I went through
> documentation and couldn't spot it how to achieve that. All what I got is
> that v 10 reduces usage of zookeeper and clients have possibility to use
> different storage for maintaining the offsets.
>
> Could someone more experienced elaborate a bit on this topic?
>
> Thanks
> jakub
>



-- 
Robert Quinlivan
Software Engineer, Signal


Re: ConsumerRebalanceListerner

2017-03-20 Thread Robert Quinlivan
Does the consumer or producer log anything when you connect the new
consumer? Is there anything logged in the broker logs?

On Mon, Mar 20, 2017 at 10:58 AM, jeffrey venus <jeffrey.ve...@gmail.com>
wrote:

> Hi,tarte
>   I am trying to use kafka as a messaging platform for my microservices.
> There is a problem i am facing in real time . when I try to bring in a new
> consumer group to consume from a certain topic . I have to restart the
> producer only then it ( new consumer group) starts consuming. Is there any
> other away without disturbing the producer.
>
>
> Regards
> V G Sunjay Jeffrish
>



-- 
Robert Quinlivan
Software Engineer, Signal


Re: Offset commit request failing

2017-03-17 Thread Robert Quinlivan
Thanks for the response. Reading through that thread, it appears that this
issue was addressed with KAFKA-3810
<https://issues.apache.org/jira/browse/KAFKA-3810>. This change eases the
restriction on fetch size between replicas. However, should the outcome be
a more comprehensive change to the serialization format of the request? The
size of the group metadata currently grows linearly with the number of
topic-partitions. This is difficult to tune for in a configuration using
topic auto creation.



On Fri, Mar 17, 2017 at 3:17 AM, James Cheng <wushuja...@gmail.com> wrote:

> I think it's due to the high number of partitions and the high number of
> consumers in the group. The group coordination info to keep track of the
> assignments actually happens via a message that travels through the
> __consumer_offsets topic. So with so many partitions and consumers, the
> message gets too big to go through the topic.
>
> There is a long thread here that discusses it. I don't remember what
> specific actions came out of that discussion. http://search-hadoop.com/m/
> Kafka/uyzND1yd26N1rFtRd1?subj=+DISCUSS+scalability+limits+
> in+the+coordinator
>
> -James
>
> Sent from my iPhone
>
> > On Mar 15, 2017, at 9:40 AM, Robert Quinlivan <rquinli...@signal.co>
> wrote:
> >
> > I should also mention that this error was seen on broker version
> 0.10.1.1.
> > I found that this condition sounds somewhat similar to KAFKA-4362
> > <https://issues.apache.org/jira/browse/KAFKA-4362>, but that issue was
> > submitted in 0.10.1.1 so they appear to be different issues.
> >
> > On Wed, Mar 15, 2017 at 11:11 AM, Robert Quinlivan <rquinli...@signal.co
> >
> > wrote:
> >
> >> Good morning,
> >>
> >> I'm hoping for some help understanding the expected behavior for an
> offset
> >> commit request and why this request might fail on the broker.
> >>
> >> *Context:*
> >>
> >> For context, my configuration looks like this:
> >>
> >>   - Three brokers
> >>   - Consumer offsets topic replication factor set to 3
> >>   - Auto commit enabled
> >>   - The user application topic, which I will call "my_topic", has a
> >>   replication factor of 3 as well and 800 partitions
> >>   - 4000 consumers attached in consumer group "my_group"
> >>
> >>
> >> *Issue:*
> >>
> >> When I attach the consumers, the coordinator logs the following error
> >> message repeatedly for each generation:
> >>
> >> ERROR [Group Metadata Manager on Broker 0]: Appending metadata message
> for
> >> group my_group generation 2066 failed due to org.apache.kafka.common.
> >> errors.RecordTooLargeException, returning UNKNOWN error code to the
> >> client (kafka.coordinator.GroupMetadataManager)
> >>
> >> *Observed behavior:*
> >>
> >> The consumer group does not stay connected long enough to consume
> >> messages. It is effectively stuck in a rebalance loop and the "my_topic"
> >> data has become unavailable.
> >>
> >>
> >> *Investigation:*
> >>
> >> Following the Group Metadata Manager code, it looks like the broker is
> >> writing to a cache after it writes an Offset Commit Request to the log
> >> file. If this cache write fails, the broker then logs this error and
> >> returns an error code in the response. In this case, the error from the
> >> cache is MESSAGE_TOO_LARGE, which is logged as a
> RecordTooLargeException.
> >> However, the broker then sets the error code to UNKNOWN on the Offset
> >> Commit Response.
> >>
> >> It seems that the issue is the size of the metadata in the Offset Commit
> >> Request. I have the following questions:
> >>
> >>   1. What is the size limit for this request? Are we exceeding the size
> >>   which is causing this request to fail?
> >>   2. If this is an issue with metadata size, what would cause abnormally
> >>   large metadata?
> >>   3. How is this cache used within the broker?
> >>
> >>
> >> Thanks in advance for any insights you can provide.
> >>
> >> Regards,
> >> Robert Quinlivan
> >> Software Engineer, Signal
> >>
> >
> >
> >
> > --
> > Robert Quinlivan
> > Software Engineer, Signal
>



-- 
Robert Quinlivan
Software Engineer, Signal


Re: Offset commit request failing

2017-03-15 Thread Robert Quinlivan
I should also mention that this error was seen on broker version 0.10.1.1.
I found that this condition sounds somewhat similar to KAFKA-4362
<https://issues.apache.org/jira/browse/KAFKA-4362>, but that issue was
submitted in 0.10.1.1 so they appear to be different issues.

On Wed, Mar 15, 2017 at 11:11 AM, Robert Quinlivan <rquinli...@signal.co>
wrote:

> Good morning,
>
> I'm hoping for some help understanding the expected behavior for an offset
> commit request and why this request might fail on the broker.
>
> *Context:*
>
> For context, my configuration looks like this:
>
>- Three brokers
>- Consumer offsets topic replication factor set to 3
>- Auto commit enabled
>- The user application topic, which I will call "my_topic", has a
>replication factor of 3 as well and 800 partitions
>- 4000 consumers attached in consumer group "my_group"
>
>
> *Issue:*
>
> When I attach the consumers, the coordinator logs the following error
> message repeatedly for each generation:
>
> ERROR [Group Metadata Manager on Broker 0]: Appending metadata message for
> group my_group generation 2066 failed due to org.apache.kafka.common.
> errors.RecordTooLargeException, returning UNKNOWN error code to the
> client (kafka.coordinator.GroupMetadataManager)
>
> *Observed behavior:*
>
> The consumer group does not stay connected long enough to consume
> messages. It is effectively stuck in a rebalance loop and the "my_topic"
> data has become unavailable.
>
>
> *Investigation:*
>
> Following the Group Metadata Manager code, it looks like the broker is
> writing to a cache after it writes an Offset Commit Request to the log
> file. If this cache write fails, the broker then logs this error and
> returns an error code in the response. In this case, the error from the
> cache is MESSAGE_TOO_LARGE, which is logged as a RecordTooLargeException.
> However, the broker then sets the error code to UNKNOWN on the Offset
> Commit Response.
>
> It seems that the issue is the size of the metadata in the Offset Commit
> Request. I have the following questions:
>
>1. What is the size limit for this request? Are we exceeding the size
>which is causing this request to fail?
>2. If this is an issue with metadata size, what would cause abnormally
>    large metadata?
>3. How is this cache used within the broker?
>
>
> Thanks in advance for any insights you can provide.
>
> Regards,
> Robert Quinlivan
> Software Engineer, Signal
>



-- 
Robert Quinlivan
Software Engineer, Signal


Offset commit request failing

2017-03-15 Thread Robert Quinlivan
Good morning,

I'm hoping for some help understanding the expected behavior for an offset
commit request and why this request might fail on the broker.

*Context:*

For context, my configuration looks like this:

   - Three brokers
   - Consumer offsets topic replication factor set to 3
   - Auto commit enabled
   - The user application topic, which I will call "my_topic", has a
   replication factor of 3 as well and 800 partitions
   - 4000 consumers attached in consumer group "my_group"


*Issue:*

When I attach the consumers, the coordinator logs the following error
message repeatedly for each generation:

ERROR [Group Metadata Manager on Broker 0]: Appending metadata message for
group my_group generation 2066 failed due to
org.apache.kafka.common.errors.RecordTooLargeException, returning UNKNOWN
error code to the client (kafka.coordinator.GroupMetadataManager)

*Observed behavior:*

The consumer group does not stay connected long enough to consume messages.
It is effectively stuck in a rebalance loop and the "my_topic" data has
become unavailable.


*Investigation:*

Following the Group Metadata Manager code, it looks like the broker is
writing to a cache after it writes an Offset Commit Request to the log
file. If this cache write fails, the broker then logs this error and
returns an error code in the response. In this case, the error from the
cache is MESSAGE_TOO_LARGE, which is logged as a RecordTooLargeException.
However, the broker then sets the error code to UNKNOWN on the Offset
Commit Response.

It seems that the issue is the size of the metadata in the Offset Commit
Request. I have the following questions:

   1. What is the size limit for this request? Are we exceeding the size
   which is causing this request to fail?
   2. If this is an issue with metadata size, what would cause abnormally
   large metadata?
   3. How is this cache used within the broker?


Thanks in advance for any insights you can provide.

Regards,
Robert Quinlivan
Software Engineer, Signal


Re: Question on Metadata

2017-03-14 Thread Robert Quinlivan
Did you look at the ConsumerRecord
<https://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/consumer/ConsumerRecord.html>
class?

On Tue, Mar 14, 2017 at 11:09 AM, Syed Mudassir Ahmed <
smudas...@snaplogic.com> wrote:

> Can anyone help?
>
> -- Forwarded message --
> From: "Syed Mudassir Ahmed" <smudas...@snaplogic.com>
> Date: Mar 14, 2017 12:28 PM
> Subject: Question on Metadata
> To: <users@kafka.apache.org>
> Cc:
>
> Hi guys,
>   When we consume a JMS message, we get a Message object that has methods
> to fetch implicit metadata provided by JMS server.
> http://docs.oracle.com/javaee/6/api/javax/jms/Message.html.  There are
> methods to fetch that implicit metadata such as Expiration, Correlation ID,
> etc.
>
>   Is there a way to fetch any such implicit metadata while consuming a
> kafka message?  Any pointers to do that would be greatly appreciated.
>
>   Thanks in advance.
>
> --
> Thanks,
> Syed.
>



-- 
Robert Quinlivan
Software Engineer, Signal


Re: Simple KafkaProducer to handle multiple requests or not

2017-03-13 Thread Robert Quinlivan
There is no need to create a new producer instance for each write request.
In doing so you lose the advantages of the buffering and batching that the
producer offers. In your use case I would recommend having a single running
producer and tuning the batch size and linger.ms settings if you find that
the producer is using too much memory.

On Mon, Mar 13, 2017 at 5:05 AM, Amit K <amitk@gmail.com> wrote:

> Hi,
>
> I am using simple kafka producer (java based, version 0.9.0.0) in an
> application where I receive lot of hits (about 50 per seconds, in much like
> servlet way) on application that has kafka producer. Per request comes
> different set of records.
>
> I am using only one instance of kafka producer to push the records to kafka
> cluster. Is that good way to use kafka producer? As it is mentioned in
> documentation, the kafkaproducer can be shared across multiple threads.
>
> Or should be there one kafka producer created to handle one request?
>
> Is there any best practice documents/guidelines to follow for using simple
> java Kafka producer api?
>
> Thanks in advance for your responses.
>
> Thanks,
> Amit
>



-- 
Robert Quinlivan
Software Engineer, Signal


Re: How to set offset for a consumer in Kafka 0.10.0.X

2017-03-08 Thread Robert Quinlivan
The best approach would be:
- Have all consumers in your group shut down
- Have an offset reset tool join with the same group name as above
- Offset tool subscribes to all topic-partitions, seeks to the desired
offset, and commits.
- Offset tool shuts down
- Consumers then restart and re-join the consumer group, resuming at the
offsets that were last committed for each topic-partition

On Wed, Mar 8, 2017 at 10:51 AM, Jeff Widman <j...@netskope.com> wrote:

> Yeah, that gist looks like it *should* work. I haven't tested it so can't
> guarantee.
>
> On Tue, Mar 7, 2017 at 7:04 PM, Glen Ogilvie <glen.ogil...@oss.co.nz>
> wrote:
>
> > Hi Jeff,
> >
> > Yes, the work I am doing is ops work. Logstash is consuming from the
> topic
> > + consumer group, and I don't want it to start at the beginning, but
> rather
> > at a specific offset,
> > so setting the offset for the consumer group externally, then starting up
> > logstash is my goal.
> >
> > I'm still a little unclear as to how to do this.
> >
> > Is this python script: http://pastebin.com/tvxj1wTX
> >
> > The right way to go about getting the offset set to a specific value
> > (12345678 in this example) for a specific consumer group?
> >
> > Regards
> > --
> > Glen Ogilvie
> > Open Systems Specialists
> > Level 1, 162 Grafton Road
> > http://www.oss.co.nz/
> >
> > Ph: +64 9 984 3000
> > Mobile: +64 21 684 146
> > GPG Key: ACED9C17
> >
> >
> > From: "Jeff Widman" <j...@netskope.com>
> > To: "users" <users@kafka.apache.org>
> > Sent: Wednesday, 8 March, 2017 1:41:17 PM
> > Subject: Re: How to set offset for a consumer in Kafka 0.10.0.X
> >
> > Offsets for modern kafka consumers are stored in an internal Kafka topic,
> > so they aren't as easy to change as zookeeper.
> >
> > To set a consumer offset, you need a consumer within a consumer group to
> > call commit() with your explicit offset. If needed, you can create a
> dummy
> > consumer and tell it to join an existing consumer group.
> >
> > Take a look at this migration script, especially the part where it
> commits
> > the offset to see how it can work in Scala:
> > https://github.com/apache/kafka/pull/2615/files
> >
> > It's fairly straightforward to do this within most other clients as well.
> > If you're doing some quick ops work where you don't want to spin up the
> > JVM, then it's fairly easy to do this using kafka-python.
> >
> >
> >
> > On Tue, Mar 7, 2017 at 4:08 PM, Glen Ogilvie <glen.ogil...@oss.co.nz>
> > wrote:
> >
> > > Hi,
> > >
> > > We are running Kafka 0.10.0.X, with zookeeper. I'm trying to figure out
> > if
> > > I can manually
> > > set a consumer offset, for a specific consumer when that consumer is
> > > stopped.
> > >
> > > It looks like it used to be done using: kafka.tools.ExportZkOffsets and
> > > kafka.tools.ImportZkOffsets
> > > ( https://cwiki.apache.org/confluence/display/KAFKA/
> > > System+Tools#SystemTools-ConsumerOffsetChecker )
> > >
> > > However, if my version they don't work, because they try and read from
> > > zookeeper /consumers which is empty.. I think they are old tools.
> > >
> > > Does anyone know where in zookeeper, where the current kafka keeps
> > > consumer offsets?
> > >
> > > Regards
> > > --
> > > Glen Ogilvie
> > > Open Systems Specialists
> > > Level 1, 162 Grafton Road
> > > http://www.oss.co.nz/
> > >
> > > Ph: +64 9 984 3000
> > > Mobile: +64 21 684 146
> > > GPG Key: ACED9C17
> > >
> >
>



-- 
Robert Quinlivan
Software Engineer, Signal


Consumer group stuck rebalancing with RecordTooLargeException

2017-03-07 Thread Robert Quinlivan
Hello,

I have a consumer group that is continually stuck in a reblance with the
following error being produced in the broker logs:

[2017-03-07 22:16:20,473] ERROR [Group Metadata Manager on Broker 0]:
Appending metadata message for group tagfire_application generation 951
failed due to org.apache.kafka.common.errors.RecordTooLargeException,
returning UNKNOWN error code to the client
(kafka.coordinator.GroupMetadataManager)

The consumer group cannot attach. How can I resolve this issue on the
broker?

Thanks
-- 
Robert Quinlivan
Software Engineer, Signal


Messages rejected by broker due to size

2017-03-01 Thread Robert Quinlivan
Hello,

I'm looking for a bit of clarification on the outcome of a
RecordTooLargeException in the broker that was not evident from the
documentation.

First, what happens to a large record that is rejected? Is it ignored by
the broker or is there a way to recover a rejected message?

Second, is the message size condition checked before or after
compression/batching is applied? What exactly is "message.max.bytes"
looking at?

Third, what happens if a broker accepts the record from the producer but
the "replica.fetch.max.bytes" setting is lower than the "message.max.bytes"
setting? This seems like an edge case to me. The leader would accept the
record but replicas would not be able to receive it, so it would be lost.
Or does the replica take the max of those two settings in order to avoid
this condition?

Thanks in advance!
-- 
Robert Quinlivan
Software Engineer, Signal


Re: Kafka delay

2017-01-13 Thread Robert Quinlivan
Can you clarify what you mean by "process time"? Is this the time it takes
to have a message sent from the producer appear in a consumer?

You may be seeing the result of batching. The producer batches messages so
if you are testing sending a single message at a time you will get
different performance characteristics than if you are sending many.

On Fri, Jan 13, 2017 at 2:36 AM, Patricia Callejo <
patricia.call...@imdea.org> wrote:

> Hi,
> we are developing a kafka environment in order to be able to process up
> hundreds of messages per second. The point is that if we send in different
> partitions a lot of messages, the time of the process is very good in ms,
> however if we send just a single message, the minimum time required is very
> high, at least 1 or 2 seconds, even more.
> Is there any configuration we are missing to avoid this minimum delay? or
> is the minimum time required by kafka for the whole process?
>
> Best Regards,
> Patricia




-- 
Robert Quinlivan
Software Engineer, Signal


Metric meaning

2017-01-05 Thread Robert Quinlivan
Hello,

Are there more detailed descriptions available for the metrics exposed by
Kafka via JMX? The current documentation provides some information but a
few metrics are not listed in detail – for example, "Log flush rate and
time."

-- 
Robert Quinlivan
Software Engineer, Signal


Multi-topic consumer configuration

2016-12-21 Thread Robert Quinlivan
Hello,

We are using a multi-topic approach for partitioning data. Each topic will
follow a pattern naming convention. We want to design our consumer
configuration so that a slow topic will not block a fast topic, as each
topic will consume at a different rate.

However I am seeing an issue where all topics matching the pattern are
assigned to a single consumer. When there are several consumers in the
group with a single topic with many partitions, the partitions are
automatically distributed among the available consumers in the group. I
expected that calling KafkaConsumer#subscribe(Pattern,
ConsumerRebalanceListener) would follow a similar behavior by distributing
the assigned topics among all consumers in the group.

Is this not the case? What is the expected behavior and how would you
recommend implementing this design?

Thank you

-- 
Robert Quinlivan
Software Engineer, Signal


Issue with "stuck" consumer in 0.9 broker

2016-12-17 Thread Robert Quinlivan
I am running an 0.9 broker and I'm having trouble viewing and committing
offsets. Upon starting up the broker, I see the following in the kafka.out
log:

[2016-12-17 14:56:14,389] WARN Connected to an old server; r-o mode will be
unavailable (org.apache.zookeeper.ClientCnxnSocket)

I have one client consumer. I am using the new consumer. The
consumer-groups tool reports this:

$ ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
--new-consumer
my_application

However, it throws an exception when checking the position of the
my_application group:

$ ./kafka-consumer-groups.sh --bootstrap-server localhost:9092
--new-consumer --describe --group my_application
Consumer group `my_application` does not exist or is rebalancing.

My suspicion is that the broker is stuck in a rebalance. Occasionally, I
see the following in my client logs:

INFO  2016-12-17 09:06:05,410
org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Attempt
to heart beat failed since the group is rebalancing, try to re-join group.
INFO  2016-12-17 09:06:05,430
org.apache.kafka.clients.consumer.internals.Fetcher - Fetch offset 2013 is
out of range, resetting offset
INFO  2016-12-17 09:06:05,430
org.apache.kafka.clients.consumer.internals.Fetcher - Fetch offset 30280 is
out of range, resetting offset

Which I'm guessing is the consumer timing out and attempting to rejoin, but
failing to do so.

If it's relevant, my server properties file looks like this:

broker.id=0
auto.create.topics.enable=true
group.max.session.timeout.ms=30
default.replication.factor=1
offsets.topic.replication.factor=1
compression.type=snappy
num.network.threads=2
num.io.threads=2
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
accept (protection against OOM)
socket.request.max.bytes=104857600
log.dirs=/mnt/kafka-logs
num.partitions=1
delete.topic.enable=true
log.flush.interval.messages=1
log.flush.interval.ms=1000
log.retention.minutes=1
offsets.retention.minutes=86400
log.segment.bytes=486870912
log.retention.check.interval.ms=6
zookeeper.connect=localhost:2181/my_chroot
zookeeper.connection.timeout.ms=100
inter.broker.protocol.version=0.9.0.1
port=9092
offsets.topic.compression.codec=2


I'm having trouble understanding what is broken without any further
information logged from the brokers. Is there a switch in the broker config
that can provide more verbose logging, or is there another way of checking
the offsets?

Thank you
-- 
Robert Quinlivan
Software Engineer, Signal