Re: how to make KafkaSource consume the existing messages
Hi Attila, Hi Chris, Many thanks for your reply and good comments. Yes, besides New group.id and "auto.offset.reset=earliest", "enable.auto.commit=false" is also needed to make KafkaSouce read messages from the beginning. After I added the following lines into flume agent configuration, restarting flume then it worked as I expected. tier1.sources.source1.kafka.consumer.group.id = group-new2 tier1.sources.source1.kafka.consumer.auto.offset.reset = earliest tier1.sources.source1.kafka.consumer.enable.auto.commit = falseThanks a lot for help!Attila Simon ---2016-10-13 下午 06:32:02---Hi, One more thing. If you switch to the new group.id and would like toFrom: Attila Simon To: user@flume.apache.org, d...@flume.apache.orgDate: 2016-10-13 下午 06:32Subject: Re: how to make KafkaSource consume the existing messages Hi,One more thing. If you switch to the new group.id and would like tomaintain the read from beginning behaviour every time flume restartthen you might try setting enable.auto.commit to false.Again Kafka normally won't store the events indefinitely.Cheers,AttilaOn Thu, Oct 13, 2016 at 11:45 AM, Attila Simon wrote:> for the records cc dev@>> On Thu, Oct 13, 2016 at 11:43 AM, Attila Simon wrote:>> Hi,>>>> auto.offset.reset aim to handle failure scenarios when Flume lost the>> track of offsets. When Flume is able to successfully consume the>> messages it also commits the last processed offset. When failure>> happens and was set resetting offset would use the last>> committed value.>> I don't think that always starting from "zero" offset would be>> valuable (would result a lot of duplicates). So I assume you would>> like to have a recovery scenario. What you can do is setting the>> consumer group.id to something new so if kafka still has the messages>> - you can check that with command line kafka consumer setting the>> --from-beginning argument as kafka by default purges them periodically>> - then flume would reset the offset to the effective beginning since>> offsets are stored per group.id.>>>> Quoted from Kafka docs>> (http://kafka.apache.org/documentation#newconsumerconfigs):>> auto.offset.reset - What to do when there is no initial offset in>> Kafka or if the current offset does not exist any more on the server>> (e.g. because that data has been deleted):>>>> earliest: automatically reset the offset to the earliest offset>> latest: automatically reset the offset to the latest offset>> none: throw exception to the consumer if no previous offset is found>> for the consumer's group>> anything else: throw exception to the consumer.>>>> Cheers,>> Attila>>>>>> On Thu, Oct 13, 2016 at 10:00 AM, Ping PW Wang wrote:>>> Hi,>>> I used KafkaSource to consume the messages from Kafka. I found only new>>> messages were received while the old existing message not. I tried to use a>>> new consumer group and update the parameter "auto.offset.reset = latest" to>>> "earliest", but this does not work.>>>>>> tier2.sources.source1.kafka.consumer.group.id = test-consumer-group-new>>> tier2.sources.source1.kafka.consumer.auto.offset.reset = earliest>>>>>> Anyone knows how to make KafkaSource consume the existing messages?>>> Thanks a lot for any advice!>>>>>> Best Regards,>>>>>> Wang Ping (王苹)>>> InfoSphere BigInsights, CDL>>> Email: wpw...@cn.ibm.com Phone: (8610)82453448 Mobile: (86)17090815725>>> Address: Ring Bldg.No.28 Building,ZhongGuanCun Software Park,No.8 Dong Bei>>> Wang West Road, Haidian District Beijing P.R.China 100193>>> 地址:北京市海淀区东北旺西路8号,中关村软件园28号楼 邮编:100193>>>
Re: how to make KafkaSource consume the existing messages
Hi, Which version of Kafka are you using? Off the top of my head it should be: tier2.sources.source1.kafka.auto.offset.reset = earliest Of course changing the group ID or if it's an older version of Kafka removing the corresponding offset znode from zookeeper ought to do the trick -- Chris Horrocks On Thu, Oct 13, 2016 at 8:55 am, Ping PW Wang <'wpw...@cn.ibm.com'> wrote: Hi, I used KafkaSource to consume the messages from Kafka. I found only new messages were received while the old existing message not. I tried to use a new consumer group and update the parameter "auto.offset.reset = latest" to "earliest", but this does not work. tier2.sources.source1.kafka.consumer.group.id = test-consumer-group-new tier2.sources.source1.kafka.consumer.auto.offset.reset = earliest Anyone knows how to make KafkaSource consume the existing messages? Thanks a lot for any advice!
Re: how to make KafkaSource consume the existing messages
Hi, One more thing. If you switch to the new group.id and would like to maintain the read from beginning behaviour every time flume restart then you might try setting enable.auto.commit to false. Again Kafka normally won't store the events indefinitely. Cheers, Attila On Thu, Oct 13, 2016 at 11:45 AM, Attila Simon wrote: > for the records cc dev@ > > On Thu, Oct 13, 2016 at 11:43 AM, Attila Simon wrote: >> Hi, >> >> auto.offset.reset aim to handle failure scenarios when Flume lost the >> track of offsets. When Flume is able to successfully consume the >> messages it also commits the last processed offset. When failure >> happens and was set resetting offset would use the last >> committed value. >> I don't think that always starting from "zero" offset would be >> valuable (would result a lot of duplicates). So I assume you would >> like to have a recovery scenario. What you can do is setting the >> consumer group.id to something new so if kafka still has the messages >> - you can check that with command line kafka consumer setting the >> --from-beginning argument as kafka by default purges them periodically >> - then flume would reset the offset to the effective beginning since >> offsets are stored per group.id. >> >> Quoted from Kafka docs >> (http://kafka.apache.org/documentation#newconsumerconfigs): >> auto.offset.reset - What to do when there is no initial offset in >> Kafka or if the current offset does not exist any more on the server >> (e.g. because that data has been deleted): >> >> earliest: automatically reset the offset to the earliest offset >> latest: automatically reset the offset to the latest offset >> none: throw exception to the consumer if no previous offset is found >> for the consumer's group >> anything else: throw exception to the consumer. >> >> Cheers, >> Attila >> >> >> On Thu, Oct 13, 2016 at 10:00 AM, Ping PW Wang wrote: >>> Hi, >>> I used KafkaSource to consume the messages from Kafka. I found only new >>> messages were received while the old existing message not. I tried to use a >>> new consumer group and update the parameter "auto.offset.reset = latest" to >>> "earliest", but this does not work. >>> >>> tier2.sources.source1.kafka.consumer.group.id = test-consumer-group-new >>> tier2.sources.source1.kafka.consumer.auto.offset.reset = earliest >>> >>> Anyone knows how to make KafkaSource consume the existing messages? >>> Thanks a lot for any advice! >>> >>> Best Regards, >>> >>> Wang Ping (王苹) >>> InfoSphere BigInsights, CDL >>> Email: wpw...@cn.ibm.com Phone: (8610)82453448 Mobile: (86)17090815725 >>> Address: Ring Bldg.No.28 Building,ZhongGuanCun Software Park,No.8 Dong Bei >>> Wang West Road, Haidian District Beijing P.R.China 100193 >>> 地址:北京市海淀区东北旺西路8号,中关村软件园28号楼 邮编:100193 >>>
Re: how to make KafkaSource consume the existing messages
for the records cc dev@ On Thu, Oct 13, 2016 at 11:43 AM, Attila Simon wrote: > Hi, > > auto.offset.reset aim to handle failure scenarios when Flume lost the > track of offsets. When Flume is able to successfully consume the > messages it also commits the last processed offset. When failure > happens and was set resetting offset would use the last > committed value. > I don't think that always starting from "zero" offset would be > valuable (would result a lot of duplicates). So I assume you would > like to have a recovery scenario. What you can do is setting the > consumer group.id to something new so if kafka still has the messages > - you can check that with command line kafka consumer setting the > --from-beginning argument as kafka by default purges them periodically > - then flume would reset the offset to the effective beginning since > offsets are stored per group.id. > > Quoted from Kafka docs > (http://kafka.apache.org/documentation#newconsumerconfigs): > auto.offset.reset - What to do when there is no initial offset in > Kafka or if the current offset does not exist any more on the server > (e.g. because that data has been deleted): > > earliest: automatically reset the offset to the earliest offset > latest: automatically reset the offset to the latest offset > none: throw exception to the consumer if no previous offset is found > for the consumer's group > anything else: throw exception to the consumer. > > Cheers, > Attila > > > On Thu, Oct 13, 2016 at 10:00 AM, Ping PW Wang wrote: >> Hi, >> I used KafkaSource to consume the messages from Kafka. I found only new >> messages were received while the old existing message not. I tried to use a >> new consumer group and update the parameter "auto.offset.reset = latest" to >> "earliest", but this does not work. >> >> tier2.sources.source1.kafka.consumer.group.id = test-consumer-group-new >> tier2.sources.source1.kafka.consumer.auto.offset.reset = earliest >> >> Anyone knows how to make KafkaSource consume the existing messages? >> Thanks a lot for any advice! >> >> Best Regards, >> >> Wang Ping (王苹) >> InfoSphere BigInsights, CDL >> Email: wpw...@cn.ibm.com Phone: (8610)82453448 Mobile: (86)17090815725 >> Address: Ring Bldg.No.28 Building,ZhongGuanCun Software Park,No.8 Dong Bei >> Wang West Road, Haidian District Beijing P.R.China 100193 >> 地址:北京市海淀区东北旺西路8号,中关村软件园28号楼 邮编:100193 >>
Re: how to make KafkaSource consume the existing messages
Hi, auto.offset.reset aim to handle failure scenarios when Flume lost the track of offsets. When Flume is able to successfully consume the messages it also commits the last processed offset. When failure happens and was set resetting offset would use the last committed value. I don't think that always starting from "zero" offset would be valuable (would result a lot of duplicates). So I assume you would like to have a recovery scenario. What you can do is setting the consumer group.id to something new so if kafka still has the messages - you can check that with command line kafka consumer setting the --from-beginning argument as kafka by default purges them periodically - then flume would reset the offset to the effective beginning since offsets are stored per group.id. Quoted from Kafka docs (http://kafka.apache.org/documentation#newconsumerconfigs): auto.offset.reset - What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted): earliest: automatically reset the offset to the earliest offset latest: automatically reset the offset to the latest offset none: throw exception to the consumer if no previous offset is found for the consumer's group anything else: throw exception to the consumer. Cheers, Attila On Thu, Oct 13, 2016 at 10:00 AM, Ping PW Wang wrote: > Hi, > I used KafkaSource to consume the messages from Kafka. I found only new > messages were received while the old existing message not. I tried to use a > new consumer group and update the parameter "auto.offset.reset = latest" to > "earliest", but this does not work. > > tier2.sources.source1.kafka.consumer.group.id = test-consumer-group-new > tier2.sources.source1.kafka.consumer.auto.offset.reset = earliest > > Anyone knows how to make KafkaSource consume the existing messages? > Thanks a lot for any advice! > > Best Regards, > > Wang Ping (王苹) > InfoSphere BigInsights, CDL > Email: wpw...@cn.ibm.com Phone: (8610)82453448 Mobile: (86)17090815725 > Address: Ring Bldg.No.28 Building,ZhongGuanCun Software Park,No.8 Dong Bei > Wang West Road, Haidian District Beijing P.R.China 100193 > 地址:北京市海淀区东北旺西路8号,中关村软件园28号楼 邮编:100193 >
how to make KafkaSource consume the existing messages
Hi,I used KafkaSource to consume the messages from Kafka. I found only new messages were received while the old existing message not. I tried to use a new consumer group and update the parameter "auto.offset.reset = latest" to "earliest", but this does not work.tier2.sources.source1.kafka.consumer.group.id = test-consumer-group-newtier2.sources.source1.kafka.consumer.auto.offset.reset = earliestAnyone knows how to make KafkaSource consume the existing messages?Thanks a lot for any advice! Best Regards,Wang Ping (王苹)InfoSphere BigInsights, CDLEmail: wpw...@cn.ibm.com Phone: (8610)82453448 Mobile: (86)17090815725Address: Ring Bldg.No.28 Building,ZhongGuanCun Software Park,No.8 Dong Bei Wang West Road, Haidian District Beijing P.R.China 100193地址:北京市海淀区东北旺西路8号,中关村软件园28号楼 邮编:100193
how to make KafkaSource consume the existing messages
Hi,I used KafkaSource to consume the messages from Kafka. I found only new messages were received while the old existing message not. I tried to use a new consumer group and update the parameter "auto.offset.reset = latest" to "earliest", but this does not work.tier2.sources.source1.kafka.consumer.group.id = test-consumer-group-newtier2.sources.source1.kafka.consumer.auto.offset.reset = earliestAnyone knows how to make KafkaSource consume the existing messages?Thanks a lot for any advice!