One more thing to add:

You could build an own tool to set specific offsets for each partition.
The mentioned reset tool does something similar, setting the offset for
all partitions to zero. But you could just copy the code and enhance it
to set arbitrary offsets for each partition.

However, for must cases, auto-offset-reset should cover most uses cases
out-of-the-box.

-Matthias

On 11/21/16 5:38 AM, Eno Thereska wrote:
> Hi Sachin,
> 
> There is no need to check within the app whether the offset exists or not, 
> since the consumer code will do that check automatically for you. So in 
> practice an app will include the property below (e.g., set to "earliest"), 
> but that will only have an effect if the consumer detects that the offsets do 
> not exist anymore. If the offset exist, then that line is a noop.
> 
> So in summary, I'd just include that property, and no more code changes are 
> required.
> 
> Thanks
> Eno
> 
>> On 21 Nov 2016, at 12:11, Sachin Mittal <sjmit...@gmail.com> wrote:
>>
>> So in my java code how can I check
>> when there is no initial offset in Kafka or if the current offset does not
>> exist any more on the server (e.g. because that data has been deleted)
>>
>> So in this case as you have said I can set offset as
>> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); //or latest
>>
>> Thanks
>> Sachin
>>
>>
>> On Mon, Nov 21, 2016 at 4:16 PM, Eno Thereska <eno.there...@gmail.com 
>> <mailto:eno.there...@gmail.com>>
>> wrote:
>>
>>> Hi Sachin,
>>>
>>> Currently you can only change the following global configuration by using
>>> "earliest" or "latest", as shown in the code snippet below. As the Javadoc
>>> mentions: "What to do when there is no initial offset in Kafka or if the
>>> current offset does not exist any more on the server (e.g. because that
>>> data has been deleted)":
>>>
>>> ...
>>> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
>>> ...
>>> return new KafkaStreams(builder, props)
>>>
>>>
>>>
>>> In addition, there is a tool to reset the offsets of all topics to the
>>> beginning. This is useful for reprocessing: https://www.confluent.io/blog/
>>> data-reprocessing-with-kafka-streams-resetting-a-streams-application/ <
>>> https://www.confluent.io/blog/data-reprocessing-with- 
>>> <https://www.confluent.io/blog/data-reprocessing-with->
>>> kafka-streams-resetting-a-streams-application/>
>>>
>>> However, there is no option currently for resetting the offset to an
>>> arbitrary offset.
>>>
>>> Thanks
>>> Eno
>>>
>>>> On 21 Nov 2016, at 10:37, Sachin Mittal <sjmit...@gmail.com> wrote:
>>>>
>>>> Hi
>>>> I am running a streaming application with
>>>> streamsProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
>>>>
>>>> How do I find out the offsets for each of the source, intermediate and
>>>> internal topics associated with this application.
>>>>
>>>> And how can I reset them to some specific value via shell of otherwise.
>>>>
>>>> Thanks
>>>> Sachin
> 
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to