Hi, I had started work on an offset manager for kafka 0.8x sometime back which got left mid-way.This implementation was using kafka topics to store offsets (similar to 0.9 implementation)
https://github.com/apache/apex-malhar/pull/156 If the community is using it, I can incorporate the comments and make it Malhar ready. Thanks, Dev On Mon, Jun 6, 2016 at 9:55 PM, hsy...@gmail.com <hsy...@gmail.com> wrote: > Hi Raja, > > Yes, I think if you implement the interface and set it as input operator > property It should serve the purpose. > > I don't think it would be a bottle neck since It is just a list data > structure of numbers and it only update every checkpoint interval. > > Regards, > Siyuan > > On Mon, Jun 6, 2016 at 5:43 PM, Raja.Aravapalli < > raja.aravapa...@target.com> wrote: > >> >> >> Thanks a lot Siyuan. It helped me understand better!! >> >> >> So, can you pls confirm, if I implement the offsetManager interface, it >> will be used to load initial starting position and update the offset >> status[at some interval] ? >> >> Will the application latency greatly decreases if I use HDFS for storage ? >> >> Thank you very much. >> >> Regards, >> Raja. >> >> From: "hsy...@gmail.com" <hsy...@gmail.com> >> Reply-To: "users@apex.apache.org" <users@apex.apache.org> >> Date: Monday, June 6, 2016 at 7:13 PM >> >> To: "users@apex.apache.org" <users@apex.apache.org> >> Subject: Re: kafka offset commit >> >> Raja, >> >> Not exactly, Apex actually stores offsets as part of the operator state, >> And state of the operator are checkpointed internally and periodically( in >> HDFS by default). For more details, you can read this >> https://www.datatorrent.com/blog/blog-introduction-to-checkpoint/ >> >> With that said, offsets are stored in HDFS along with other state of the >> operator so that it can recover in case of any system failure. >> And also in Apex, you can do stateful restart (start the application by >> specifying the previous application id). It will initialize all operators >> and load the checkpointed state (offsets will be part of it) from HDFS and >> continue run from that state. The only limit is, you can not easy tell >> where the current offsets are. Hope this answered your question. >> >> Regards, >> Siyuan >> >> >> On Mon, Jun 6, 2016 at 4:57 PM, Raja.Aravapalli < >> raja.aravapa...@target.com> wrote: >> >>> >>> Thanks Siyuan. >>> >>> So, to confirm, to apex is not storing offsets status at any location ? >>> Like how Storm stores in Zookeeper ? >>> >>> >>> Regards, >>> Raja. >>> >>> From: "hsy...@gmail.com" <hsy...@gmail.com> >>> Reply-To: "users@apex.apache.org" <users@apex.apache.org> >>> Date: Monday, June 6, 2016 at 6:42 PM >>> >>> To: "users@apex.apache.org" <users@apex.apache.org> >>> Subject: Re: kafka offset commit >>> >>> Hey Raja, >>> >>> For 0.8, you have to implement OffsetManager interface on your own. The >>> updateOffsets will be called in application master every time when it get >>> updated offsets from each physical partition. And the offsets that you see >>> in the method is committed offset. So you can safely save these offsets >>> into either zookeeper(0.8.2 client has API to do that) or any other >>> datastore like DB or HDFS. And also you have to implement the method >>> loadInitialOffsets to load back offset you want. >>> >>> You are welcome to contribute a default implementation using buildin >>> kafka offset commit request API for OffsetManager! >>> >>> Regards, >>> Siyuan >>> >>> On Mon, Jun 6, 2016 at 3:36 PM, Raja.Aravapalli < >>> raja.aravapa...@target.com> wrote: >>> >>>> >>>> Hi Thomas, >>>> >>>> We are using 0.8 cluster still!! >>>> >>>> >>>> Regards, >>>> Raja. >>>> >>>> From: Thomas Weise <thomas.we...@gmail.com> >>>> Reply-To: "users@apex.apache.org" <users@apex.apache.org> >>>> Date: Monday, June 6, 2016 at 5:23 PM >>>> To: "users@apex.apache.org" <users@apex.apache.org> >>>> Subject: Re: kafka offset commit >>>> >>>> Hi Raja, >>>> >>>> Which Kafka version are you using? >>>> >>>> With the new 0.9 connector there is no need for the offset manager: >>>> >>>> >>>> https://github.com/apache/apex-malhar/tree/master/kafka/src/main/java/org/apache/apex/malhar/kafka >>>> >>>> Thanks, >>>> Thomas >>>> >>>> >>>> On Mon, Jun 6, 2016 at 3:06 PM, Raja.Aravapalli < >>>> raja.aravapa...@target.com> wrote: >>>> >>>>> Hi >>>>> >>>>> Can someone please help me understand, where will the offsets be >>>>> stored when consuming with “*KafkaSinglePortStringInputOperator*” ? >>>>> >>>>> And, how to handle restarts ? >>>>> >>>>> >>>>> I worked with Storm earlier, Storm maintains the offsets in zookeeper >>>>> and client id is maintained for every consumer, using which >>>>> >>>>> - we can see what is the current offset status for a given partition & >>>>> modify them as well using zookeeper-cli !! >>>>> - restarts can be handled >>>>> >>>>> >>>>> As per the Apex documentation, I can see, that using OffsetManager we >>>>> can handle the restarts effectively, but couldn’t find any examples to >>>>> refer… >>>>> >>>>> How clientId can be used to retrieve offsets status >>>>> And ability to edit the offsets etc >>>>> >>>>> can someone pls help me find this ? >>>>> >>>>> >>>>> Thanks a lot!! >>>>> >>>>> >>>>> -Regards, >>>>> Raja. >>>>> >>>>> >>>>> >>>>> >>>> >>> >> >