Hi,

I had started work on an offset manager for kafka 0.8x sometime back which
got left mid-way.This implementation was using kafka topics to store
offsets (similar to 0.9 implementation)

https://github.com/apache/apex-malhar/pull/156

If the community is using it, I can incorporate the comments and make it
Malhar ready.

Thanks,
Dev

On Mon, Jun 6, 2016 at 9:55 PM, hsy...@gmail.com <hsy...@gmail.com> wrote:

> Hi Raja,
>
> Yes, I think if you implement the interface and set it as input operator
> property It should serve the purpose.
>
> I don't think it would be a bottle neck since It is just a list data
> structure of numbers and it only update every checkpoint interval.
>
> Regards,
> Siyuan
>
> On Mon, Jun 6, 2016 at 5:43 PM, Raja.Aravapalli <
> raja.aravapa...@target.com> wrote:
>
>>
>>
>> Thanks a lot Siyuan. It helped me understand better!!
>>
>>
>> So, can you pls confirm, if I implement the offsetManager interface, it
>> will be used to load initial starting position and update the offset
>> status[at some interval] ?
>>
>> Will the application latency greatly decreases if I use HDFS for storage ?
>>
>> Thank you very much.
>>
>> Regards,
>> Raja.
>>
>> From: "hsy...@gmail.com" <hsy...@gmail.com>
>> Reply-To: "users@apex.apache.org" <users@apex.apache.org>
>> Date: Monday, June 6, 2016 at 7:13 PM
>>
>> To: "users@apex.apache.org" <users@apex.apache.org>
>> Subject: Re: kafka offset commit
>>
>> Raja,
>>
>> Not exactly, Apex actually stores offsets as part of the operator state,
>> And state of the operator are checkpointed internally and periodically( in
>> HDFS by default). For more details, you can read this
>> https://www.datatorrent.com/blog/blog-introduction-to-checkpoint/
>>
>> With that said, offsets are stored in HDFS along with other state of the
>> operator so that it can recover in case of any system failure.
>> And also in Apex, you can do stateful restart (start the application by
>> specifying the previous application id). It will initialize all operators
>> and load the checkpointed state (offsets will be part of it) from HDFS and
>> continue run from that state.  The only limit is, you can not easy tell
>> where the current offsets are.  Hope this answered your question.
>>
>> Regards,
>> Siyuan
>>
>>
>> On Mon, Jun 6, 2016 at 4:57 PM, Raja.Aravapalli <
>> raja.aravapa...@target.com> wrote:
>>
>>>
>>> Thanks Siyuan.
>>>
>>> So, to confirm, to apex is not storing offsets status at any location ?
>>> Like how Storm stores in Zookeeper ?
>>>
>>>
>>> Regards,
>>> Raja.
>>>
>>> From: "hsy...@gmail.com" <hsy...@gmail.com>
>>> Reply-To: "users@apex.apache.org" <users@apex.apache.org>
>>> Date: Monday, June 6, 2016 at 6:42 PM
>>>
>>> To: "users@apex.apache.org" <users@apex.apache.org>
>>> Subject: Re: kafka offset commit
>>>
>>> Hey Raja,
>>>
>>> For 0.8, you have to implement OffsetManager interface on your own. The
>>> updateOffsets will be called in application master every time when it get
>>> updated offsets from each physical partition. And the offsets that you see
>>> in the method is committed offset. So you can safely save these offsets
>>> into either zookeeper(0.8.2 client has API to do that) or any other
>>> datastore like DB or HDFS.  And also you have to implement the method
>>> loadInitialOffsets to load back offset you want.
>>>
>>> You are welcome to contribute a default implementation using buildin
>>> kafka offset commit request API for OffsetManager!
>>>
>>> Regards,
>>> Siyuan
>>>
>>> On Mon, Jun 6, 2016 at 3:36 PM, Raja.Aravapalli <
>>> raja.aravapa...@target.com> wrote:
>>>
>>>>
>>>> Hi Thomas,
>>>>
>>>> We are using 0.8 cluster still!!
>>>>
>>>>
>>>> Regards,
>>>> Raja.
>>>>
>>>> From: Thomas Weise <thomas.we...@gmail.com>
>>>> Reply-To: "users@apex.apache.org" <users@apex.apache.org>
>>>> Date: Monday, June 6, 2016 at 5:23 PM
>>>> To: "users@apex.apache.org" <users@apex.apache.org>
>>>> Subject: Re: kafka offset commit
>>>>
>>>> Hi Raja,
>>>>
>>>> Which Kafka version are you using?
>>>>
>>>> With the new 0.9 connector there is no need for the offset manager:
>>>>
>>>>
>>>> https://github.com/apache/apex-malhar/tree/master/kafka/src/main/java/org/apache/apex/malhar/kafka
>>>>
>>>> Thanks,
>>>> Thomas
>>>>
>>>>
>>>> On Mon, Jun 6, 2016 at 3:06 PM, Raja.Aravapalli <
>>>> raja.aravapa...@target.com> wrote:
>>>>
>>>>> Hi
>>>>>
>>>>> Can someone please help me understand, where will the offsets be
>>>>> stored when consuming with “*KafkaSinglePortStringInputOperator*”  ?
>>>>>
>>>>> And, how to handle restarts ?
>>>>>
>>>>>
>>>>> I worked with Storm earlier, Storm maintains the offsets in zookeeper
>>>>> and client id is maintained for every consumer, using which
>>>>>
>>>>> - we can see what is the current offset status for a given partition &
>>>>> modify them as well using zookeeper-cli !!
>>>>> - restarts can be handled
>>>>>
>>>>>
>>>>> As per the Apex documentation, I can see, that using OffsetManager we
>>>>> can handle the restarts effectively, but couldn’t find any examples to
>>>>> refer…
>>>>>
>>>>> How clientId can be used to retrieve offsets status
>>>>> And ability to edit the offsets etc
>>>>>
>>>>> can someone pls help me find this ?
>>>>>
>>>>>
>>>>> Thanks a lot!!
>>>>>
>>>>>
>>>>> -Regards,
>>>>> Raja.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to