Re: How to get a single available message from kafka (case where OffsetRange.fromOffset == OffsetRange.untilOffset)

2015-11-30 Thread Cody Koeninger
If you had exactly 1 message in the 0th topicpartition, to read it you
would use

OffsetRange("topicname", 0, 0, 1)

Kafka's simple shell consumer in that case would print

next offset = 1


So instead trying to consume

OffsetRange("topicname", 0, 1, 2)
shouldn't be expected to work



On Sat, Nov 28, 2015 at 8:35 AM, Nikos Viorres  wrote:

> Hi,
>
> I am using KafkaUtils.createRDD to retrieve data from Kafka for batch
> processing and
> when Invoking KafkaUtils.createRDD with an OffsetRange where
> OffsetRange.fromOffset == OffsetRange.untilOffset for a particular
> partition, i get an empy RDD.
> Documentation is clear that until is exclusive and from inclusive, but if
> i use OffsetRange.untilOffset + 1 i get an invalid OffsetRange exception
> during the check.
> Sinve this should also apply in general (if untilOffset is exculsive you
> cannot fetch it ), does it mean that untilOffset is also non-existent in
> Kafka (and thus always exlcusive) or i am missing something?
>
> regards
>
> p.s. by manually using the kafka protocol to query the offsets i see
> that kafka.api.OffsetRequest.EarliestTime()
> == kafka.api.OffsetRequest.LatestTime() and set to a poisitive value
>


How to get a single available message from kafka (case where OffsetRange.fromOffset == OffsetRange.untilOffset)

2015-11-28 Thread Nikos Viorres
Hi,

I am using KafkaUtils.createRDD to retrieve data from Kafka for batch
processing and
when Invoking KafkaUtils.createRDD with an OffsetRange where
OffsetRange.fromOffset == OffsetRange.untilOffset for a particular
partition, i get an empy RDD.
Documentation is clear that until is exclusive and from inclusive, but if i
use OffsetRange.untilOffset + 1 i get an invalid OffsetRange exception
during the check.
Sinve this should also apply in general (if untilOffset is exculsive you
cannot fetch it ), does it mean that untilOffset is also non-existent in
Kafka (and thus always exlcusive) or i am missing something?

regards

p.s. by manually using the kafka protocol to query the offsets i see
that kafka.api.OffsetRequest.EarliestTime()
== kafka.api.OffsetRequest.LatestTime() and set to a poisitive value