Re: 0.9 consumer reading a range of log messages

2016-01-06 Thread Rajiv Kurian
Thanks or the replies Jason!

A couple of follow up questions:
i. If I call seekToEnd() on the consumer and then use position() is the
position() call making a blocking IO call. It is not entirely clear from
the documentation whether this will just block or not.

ii. If the position() call does indeed block are there any consequences in
terms of me not calling poll often enough? Is there a way for me to limit
how long I allow it to block, maybe by passing a timeout parameter. I only
use manual assignments so I am hoping that there is no consequence of
infrequent heart beats etc through poll starvation.

Thanks,
Rajiv



On Wed, Jan 6, 2016 at 1:58 PM, Jason Gustafson  wrote:

> Hi Rajiv,
>
> Answers below:
>
> i) How do I get the last log offset from the Kafka consumer?
>
>
> To get the last offset, first call seekToEnd() and then use position().
>
> ii) If I ask the consumer to seek to the beginning via the  consumer
> > .seekToBeginning(newTopicPartition) call, will it handle the case where
> > the
> > log has rolled over in the meanwhile and what was considered the
> beginning
> > offset is no longer present?
>
>
> The call to seekToBeginning() only sets a flag indicating that a reset is
> needed. The actual position will not be fetched until you call poll() or
> position(). This means that the window for an out of range offset should be
> small, but of course it could happen. The behavior of the consumer when an
> offset is out of range is controlled with the "auto.offset.reset"
> configuration. If you use the "earliest" policy, then the consumer will
> automatically reset the position to whatever the current earliest offset
> is. You might also choose to use no automatic reset policy by specifying
> "none." In this case, poll() will throw an OffsetOutOfRangeException, which
> you can catch and manually re-seek to the beginning.
>
> iii) What settings do I need on the Kafka broker (besides
> > log.retention.minutes = 10) to ensure that my partitions don't retain any
> > more than 10 minutes of data (plus a couple minutes is fine). Do I need
> to
> > tune how often Kafka checks for log deletion eligibility? Any other
> > settings I should play with to ensure timely deletion?
>
>
> Log retention is currently at the granularity of log segments. This means
> that you cannot generally guarantee that messages will be deleted within
> the configured retention time. However, you can control the segment size
> using "log.segment.bytes" and the delay before deletion with "
> log.segment.delete.delay.ms." If you can estimate the incoming message
> rate, then you can probably tune these settings to get a retention policy
> closer to what you're looking for. See here for more info on broker
> configuration: https://kafka.apache.org/documentation.html#brokerconfigs.
> And for what it's worth, KIP-32, which adds a timestamp to each message,
> should provide some better options for handling this.
>
> -Jason
>
> On Wed, Jan 6, 2016 at 9:37 AM, Rajiv Kurian  wrote:
>
> > I want to use the new 0.9 consumer for a particular application.
> >
> > My use case is the following:
> >
> > i) The TopicPartition I need to poll has a short log say 10 mins odd
> > (log.retention.minutes is set to 10).
> >
> > ii) I don't use a consumer group i.e. I manage the partition assignment
> > myself.
> >
> > iii) Whenever I get a new partition assigned to one of my processes
> > (through my own mechanism), I want to query for the current end of the
> log
> > and then seek to the beginning of the log. I want to continue reading in
> a
> > straight line till my offset moves from the beginning to the end that I
> > queried before beginning to poll. When I am done reading this much data
> (I
> > know the end has moved by the time I've read all of it) I consider myself
> > caught up. Note: I only need to do the seek to the beginning of the log,
> > which the new consumer allows one to do. I just need to know the end of
> log
> > offset so that I know that I have "caught up".
> >
> > So questions I have are:
> >
> > i) How do I get the last log offset from the Kafka consumer? The
> > SimpleConsumer had a way to get this information. If I can get this info
> > from the consumer, I plan to do something like this:
> >
> >
> > private boolean assignNewPartitionAndCatchUp(int newPartition) {
> >
> > final TopicPartition newTopicPartition = new
> > TopicPartition(myTopic,
> > newPartition);
> >
> >// Queries the existing partitions and adds this TopicPartition to
> > the list.
> >
> > List newAssignment =
> > createNewAssignmentByAddingPartition(
> >
> > newTopicPartition);
> >
> > consumer.assign(newAssignment);
> >
> >
> > // How do I actually do this with the consumer?
> >
> > final long lastMessageOffset = getLastMessageOffset(
> > newTopicPartition);
> >
> >
> > consumer.seekToBeginning(newTopicPartition);
> >
> > final long 

Re: 0.9 consumer reading a range of log messages

2016-01-06 Thread Jason Gustafson
Hi Rajiv,

Answers below:

i) How do I get the last log offset from the Kafka consumer?


To get the last offset, first call seekToEnd() and then use position().

ii) If I ask the consumer to seek to the beginning via the  consumer
> .seekToBeginning(newTopicPartition) call, will it handle the case where
> the
> log has rolled over in the meanwhile and what was considered the beginning
> offset is no longer present?


The call to seekToBeginning() only sets a flag indicating that a reset is
needed. The actual position will not be fetched until you call poll() or
position(). This means that the window for an out of range offset should be
small, but of course it could happen. The behavior of the consumer when an
offset is out of range is controlled with the "auto.offset.reset"
configuration. If you use the "earliest" policy, then the consumer will
automatically reset the position to whatever the current earliest offset
is. You might also choose to use no automatic reset policy by specifying
"none." In this case, poll() will throw an OffsetOutOfRangeException, which
you can catch and manually re-seek to the beginning.

iii) What settings do I need on the Kafka broker (besides
> log.retention.minutes = 10) to ensure that my partitions don't retain any
> more than 10 minutes of data (plus a couple minutes is fine). Do I need to
> tune how often Kafka checks for log deletion eligibility? Any other
> settings I should play with to ensure timely deletion?


Log retention is currently at the granularity of log segments. This means
that you cannot generally guarantee that messages will be deleted within
the configured retention time. However, you can control the segment size
using "log.segment.bytes" and the delay before deletion with "
log.segment.delete.delay.ms." If you can estimate the incoming message
rate, then you can probably tune these settings to get a retention policy
closer to what you're looking for. See here for more info on broker
configuration: https://kafka.apache.org/documentation.html#brokerconfigs.
And for what it's worth, KIP-32, which adds a timestamp to each message,
should provide some better options for handling this.

-Jason

On Wed, Jan 6, 2016 at 9:37 AM, Rajiv Kurian  wrote:

> I want to use the new 0.9 consumer for a particular application.
>
> My use case is the following:
>
> i) The TopicPartition I need to poll has a short log say 10 mins odd
> (log.retention.minutes is set to 10).
>
> ii) I don't use a consumer group i.e. I manage the partition assignment
> myself.
>
> iii) Whenever I get a new partition assigned to one of my processes
> (through my own mechanism), I want to query for the current end of the log
> and then seek to the beginning of the log. I want to continue reading in a
> straight line till my offset moves from the beginning to the end that I
> queried before beginning to poll. When I am done reading this much data (I
> know the end has moved by the time I've read all of it) I consider myself
> caught up. Note: I only need to do the seek to the beginning of the log,
> which the new consumer allows one to do. I just need to know the end of log
> offset so that I know that I have "caught up".
>
> So questions I have are:
>
> i) How do I get the last log offset from the Kafka consumer? The
> SimpleConsumer had a way to get this information. If I can get this info
> from the consumer, I plan to do something like this:
>
>
> private boolean assignNewPartitionAndCatchUp(int newPartition) {
>
> final TopicPartition newTopicPartition = new
> TopicPartition(myTopic,
> newPartition);
>
>// Queries the existing partitions and adds this TopicPartition to
> the list.
>
> List newAssignment =
> createNewAssignmentByAddingPartition(
>
> newTopicPartition);
>
> consumer.assign(newAssignment);
>
>
> // How do I actually do this with the consumer?
>
> final long lastMessageOffset = getLastMessageOffset(
> newTopicPartition);
>
>
> consumer.seekToBeginning(newTopicPartition);
>
> final long timeout = 100;
>
> int numIterations = 0;
>
>final boolean caughtUp = false;
>
> while (!caughtUp && numIterations < maxIterations) {
>
> ConsumerRecords records = consumer.poll(timeout);
>
> numIterations += 1;
>
> for (ConsumerRecord record : records) {
>
>//  All messages are processed regularly even if they belong
> to other partitions.
>
> processRecord(record.value());
>
> final int partition = record.partition();
>
> final long offset = record.offset();
>
> // Only if we find that the new partition has caught up do
> we return.
>
> if (partition == newPartition && offset >=
> lastMessageOffset)
> {
>
> caughtUp = true;
>
> }
>
> }
>
> }
>
> return 

Re: 0.9 consumer reading a range of log messages

2016-01-06 Thread Rajiv Kurian
On Wed, Jan 6, 2016 at 5:31 PM, Jason Gustafson  wrote:

> >
> > i. If I call seekToEnd() on the consumer and then use position() is the
> > position() call making a blocking IO call. It is not entirely clear from
> > the documentation whether this will just block or not.
> >
>
> Yes, position() will block when the offset needs to be reset. You can
> depend on it returning the right offset after a call to seekToEnd().
>
> ii. If the position() call does indeed block are there any consequences in
> > terms of me not calling poll often enough?
>
>
> Since you are using manual assignment, there should be no consequence to
> not calling poll() frequently enough. The heartbeats are only needed when
> the consumer is using automatic assignment. Unfortunately, there is no way
> to set a timeout for this call at the moment. In the worst case, if the
> partition leader is unavailable, it could block indefinitely. We've
> considered several times adding a "max.block.ms" configuration setting
> which would work just as it does for the producer, but we wanted to gauge
> the level of interest before adding yet another setting. Typically we would
> expect the partition leader to become available "soon" and the thought was
> that users would generally just retry anyway.
>
I think a timeout setting for all blocking calls would be very useful.
Given that the subscriber is going to be called from a single thread any
blocking call can starve other multiplexed partitions even if their brokers
are fine. So this could lead to one down broker causing the entire consumer
to come to a grind. A user might instead decided to give up after some
timeout and do something more meaningful than blocking the entire thread.

>
> -Jason
>
> On Wed, Jan 6, 2016 at 4:31 PM, Rajiv Kurian  wrote:
>
> > Thanks or the replies Jason!
> >
> > A couple of follow up questions:
> > i. If I call seekToEnd() on the consumer and then use position() is the
> > position() call making a blocking IO call. It is not entirely clear from
> > the documentation whether this will just block or not.
> >
> > ii. If the position() call does indeed block are there any consequences
> in
> > terms of me not calling poll often enough? Is there a way for me to limit
> > how long I allow it to block, maybe by passing a timeout parameter. I
> only
> > use manual assignments so I am hoping that there is no consequence of
> > infrequent heart beats etc through poll starvation.
> >
> > Thanks,
> > Rajiv
> >
> >
> >
> > On Wed, Jan 6, 2016 at 1:58 PM, Jason Gustafson 
> > wrote:
> >
> > > Hi Rajiv,
> > >
> > > Answers below:
> > >
> > > i) How do I get the last log offset from the Kafka consumer?
> > >
> > >
> > > To get the last offset, first call seekToEnd() and then use position().
> > >
> > > ii) If I ask the consumer to seek to the beginning via the  consumer
> > > > .seekToBeginning(newTopicPartition) call, will it handle the case
> where
> > > > the
> > > > log has rolled over in the meanwhile and what was considered the
> > > beginning
> > > > offset is no longer present?
> > >
> > >
> > > The call to seekToBeginning() only sets a flag indicating that a reset
> is
> > > needed. The actual position will not be fetched until you call poll()
> or
> > > position(). This means that the window for an out of range offset
> should
> > be
> > > small, but of course it could happen. The behavior of the consumer when
> > an
> > > offset is out of range is controlled with the "auto.offset.reset"
> > > configuration. If you use the "earliest" policy, then the consumer will
> > > automatically reset the position to whatever the current earliest
> offset
> > > is. You might also choose to use no automatic reset policy by
> specifying
> > > "none." In this case, poll() will throw an OffsetOutOfRangeException,
> > which
> > > you can catch and manually re-seek to the beginning.
> > >
> > > iii) What settings do I need on the Kafka broker (besides
> > > > log.retention.minutes = 10) to ensure that my partitions don't retain
> > any
> > > > more than 10 minutes of data (plus a couple minutes is fine). Do I
> need
> > > to
> > > > tune how often Kafka checks for log deletion eligibility? Any other
> > > > settings I should play with to ensure timely deletion?
> > >
> > >
> > > Log retention is currently at the granularity of log segments. This
> means
> > > that you cannot generally guarantee that messages will be deleted
> within
> > > the configured retention time. However, you can control the segment
> size
> > > using "log.segment.bytes" and the delay before deletion with "
> > > log.segment.delete.delay.ms." If you can estimate the incoming message
> > > rate, then you can probably tune these settings to get a retention
> policy
> > > closer to what you're looking for. See here for more info on broker
> > > configuration:
> https://kafka.apache.org/documentation.html#brokerconfigs
> > .
> > > And for what it's worth, KIP-32, 

Re: 0.9 consumer reading a range of log messages

2016-01-06 Thread Jason Gustafson
>
> i. If I call seekToEnd() on the consumer and then use position() is the
> position() call making a blocking IO call. It is not entirely clear from
> the documentation whether this will just block or not.
>

Yes, position() will block when the offset needs to be reset. You can
depend on it returning the right offset after a call to seekToEnd().

ii. If the position() call does indeed block are there any consequences in
> terms of me not calling poll often enough?


Since you are using manual assignment, there should be no consequence to
not calling poll() frequently enough. The heartbeats are only needed when
the consumer is using automatic assignment. Unfortunately, there is no way
to set a timeout for this call at the moment. In the worst case, if the
partition leader is unavailable, it could block indefinitely. We've
considered several times adding a "max.block.ms" configuration setting
which would work just as it does for the producer, but we wanted to gauge
the level of interest before adding yet another setting. Typically we would
expect the partition leader to become available "soon" and the thought was
that users would generally just retry anyway.

-Jason

On Wed, Jan 6, 2016 at 4:31 PM, Rajiv Kurian  wrote:

> Thanks or the replies Jason!
>
> A couple of follow up questions:
> i. If I call seekToEnd() on the consumer and then use position() is the
> position() call making a blocking IO call. It is not entirely clear from
> the documentation whether this will just block or not.
>
> ii. If the position() call does indeed block are there any consequences in
> terms of me not calling poll often enough? Is there a way for me to limit
> how long I allow it to block, maybe by passing a timeout parameter. I only
> use manual assignments so I am hoping that there is no consequence of
> infrequent heart beats etc through poll starvation.
>
> Thanks,
> Rajiv
>
>
>
> On Wed, Jan 6, 2016 at 1:58 PM, Jason Gustafson 
> wrote:
>
> > Hi Rajiv,
> >
> > Answers below:
> >
> > i) How do I get the last log offset from the Kafka consumer?
> >
> >
> > To get the last offset, first call seekToEnd() and then use position().
> >
> > ii) If I ask the consumer to seek to the beginning via the  consumer
> > > .seekToBeginning(newTopicPartition) call, will it handle the case where
> > > the
> > > log has rolled over in the meanwhile and what was considered the
> > beginning
> > > offset is no longer present?
> >
> >
> > The call to seekToBeginning() only sets a flag indicating that a reset is
> > needed. The actual position will not be fetched until you call poll() or
> > position(). This means that the window for an out of range offset should
> be
> > small, but of course it could happen. The behavior of the consumer when
> an
> > offset is out of range is controlled with the "auto.offset.reset"
> > configuration. If you use the "earliest" policy, then the consumer will
> > automatically reset the position to whatever the current earliest offset
> > is. You might also choose to use no automatic reset policy by specifying
> > "none." In this case, poll() will throw an OffsetOutOfRangeException,
> which
> > you can catch and manually re-seek to the beginning.
> >
> > iii) What settings do I need on the Kafka broker (besides
> > > log.retention.minutes = 10) to ensure that my partitions don't retain
> any
> > > more than 10 minutes of data (plus a couple minutes is fine). Do I need
> > to
> > > tune how often Kafka checks for log deletion eligibility? Any other
> > > settings I should play with to ensure timely deletion?
> >
> >
> > Log retention is currently at the granularity of log segments. This means
> > that you cannot generally guarantee that messages will be deleted within
> > the configured retention time. However, you can control the segment size
> > using "log.segment.bytes" and the delay before deletion with "
> > log.segment.delete.delay.ms." If you can estimate the incoming message
> > rate, then you can probably tune these settings to get a retention policy
> > closer to what you're looking for. See here for more info on broker
> > configuration: https://kafka.apache.org/documentation.html#brokerconfigs
> .
> > And for what it's worth, KIP-32, which adds a timestamp to each message,
> > should provide some better options for handling this.
> >
> > -Jason
> >
> > On Wed, Jan 6, 2016 at 9:37 AM, Rajiv Kurian  wrote:
> >
> > > I want to use the new 0.9 consumer for a particular application.
> > >
> > > My use case is the following:
> > >
> > > i) The TopicPartition I need to poll has a short log say 10 mins odd
> > > (log.retention.minutes is set to 10).
> > >
> > > ii) I don't use a consumer group i.e. I manage the partition assignment
> > > myself.
> > >
> > > iii) Whenever I get a new partition assigned to one of my processes
> > > (through my own mechanism), I want to query for the current end of the
> > log
> > > and then seek to the beginning of