I'm not sure whether the functionality exists in Kafka 0.8, but there's likely also a console consumer script in your Kafka install's /bin directory you might be able to use.
2018-03-05 21:46 GMT+01:00 Stig Rohde Døssing <[email protected]>: > Not sure if this is what you're asking, but take a look at how storm-kafka > fetches messages https://github.com/apache/storm/blob/master/external/ > storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java#L191. You can > probably do something similar. Also refer to the Kafka 0.8 documentation at > https://kafka.apache.org/082/documentation.html, specifically the part > about the simple consumer API. > > 2018-03-05 19:03 GMT+01:00 Ajeesh <[email protected]>: > >> Thanks, I am able to take the offsets but how do I read messages using >> specific offset in Kafka? I am using Kafka version 0.8.2.1 >> >> On Mon, Mar 5, 2018, 11:26 PM Stig Rohde Døssing <[email protected]> wrote: >> >>> I'm going to assume you're using storm-kafka, and you're asking how to >>> find the current committed offset. >>> >>> You can find the offsets in your Zookeeper. In your Zookeeper install >>> there's a /bin/zkCli.sh script you can use to poke around in the Zookeeper >>> node system. The offsets will be somewhere in /zk-root-from-your-spout-confi >>> g/id-from-your-spout-config/topic-name/. >>> >>> There's a tool to migrate offsets from storm-kafka to storm-kafka-client >>> here https://github.com/apache/storm/blob/master/external/storm- >>> kafka-migration/src/main/java/org/apache/storm/kafka/ >>> migration/KafkaSpoutMigration.java. You can very easily modify it to >>> just print the offsets instead (just delete line 84 and down in the main >>> method). See also the README at https://github.com/apache/stor >>> m/blob/master/external/storm-kafka-migration/README.md >>> >>> 2018-03-05 10:43 GMT+01:00 Ajeesh <[email protected]>: >>> >>>> How to dump Kafka consumer offset? >>>> >>>> On Thu, Mar 1, 2018, 10:25 PM Erik Weathers <[email protected]> >>>> wrote: >>>> >>>>> Agreed, there have been a number of fixes in the storm-kafka spout >>>>> that might account for that problem. If you need to debug further on >>>>> 0.9.x >>>>> you shoulda dump the Kafka consumer offsets and see if the topology is >>>>> getting stuck at some specific offsets. Then examine the data at those >>>>> offsets using a console consumer to try to infer why the topology would >>>>> get >>>>> stuck. >>>>> >>>>> - Erik >>>>> >>>>> On Wed, Feb 28, 2018 at 2:41 PM Jungtaek Lim <[email protected]> >>>>> wrote: >>>>> >>>>>> Hi Ajeesh, >>>>>> >>>>>> Sorry but the version is really outdated, released 3 years ago. Would >>>>>> you mind upgrading to recent version, 1.2.1 for example and see how it >>>>>> help? >>>>>> >>>>>> Thanks, >>>>>> Jungtaek Lim (HeartSaVioR) >>>>>> >>>>>> 2018년 2월 28일 (수) 오후 9:48, Ajeesh <[email protected]>님이 작성: >>>>>> >>>>>>> Hi Team, >>>>>>> >>>>>>> We are facing issues in Storm version 0.9.4, Storm application >>>>>>> hangs after processing for 3-4 days. We tried to restart the same Storm >>>>>>> topology but it will fail within 1-2 minutes by processing around >>>>>>> 15K-16K. >>>>>>> If we decrease the max.spout.pending value then it fails by processing >>>>>>> only >>>>>>> few tuples. >>>>>>> >>>>>>> If we start a new topology with new Kafka topic then everything >>>>>>> works fine for 3-4 days. Our daily volume will be around 11 million. >>>>>>> >>>>>>> Checked the execute latency, it's around 6ms. >>>>>>> Checked worker logs, there's no error/exceptions. >>>>>>> Storm visualization graph shows all nodes in "green" color. >>>>>>> >>>>>>> Workflow: >>>>>>> KafkaSpout->Bolt-1->Bolt-2->Bolt-3->Bolt-4. >>>>>>> >>>>>>> Storm configurations: >>>>>>> No. of workers: 10 >>>>>>> No. of executors: 260 >>>>>>> Max Spout Pending: 50 >>>>>>> No. of KafkaSpout executors: 10 >>>>>>> >>>>>>> TODO: >>>>>>> 1. Wanna take a thread dump >>>>>>> 2. Is there anything you require to know more about this >>>>>>> issue? >>>>>>> >>>>>>> Analyzed worker logs in debug mode: >>>>>>> 2018-02-28T05:12:51.462-0500 s.k.PartitionManager [DEBUG] >>>>>>> failing at offset=163348 with _pending.size()=3953 pending and >>>>>>> _emittedToOffset=168353 >>>>>>> 2018-02-28T05:12:51.461-0500 s.k.PartitionManager [DEBUG] failing at >>>>>>> offset=195116 with _pending.size()=4442 pending and >>>>>>> _emittedToOffset=199437 >>>>>>> 2018-02-28T05:12:51.463-0500 s.k.PartitionManager [DEBUG] failing at >>>>>>> offset=194007 with _pending.size()=4442 pending and >>>>>>> _emittedToOffset=199437 >>>>>>> 2018-02-28T05:12:51.463-0500 s.k.PartitionManager [DEBUG] failing at >>>>>>> offset=194700 with _pending.size()=4442 pending and >>>>>>> _emittedToOffset=199437 >>>>>>> 2018-02-28T05:12:51.463-0500 s.k.PartitionManager [DEBUG] failing at >>>>>>> offset=193891 with _pending.size()=4442 pending and >>>>>>> _emittedToOffset=199437 >>>>>>> 2018-02-28T05:12:51.463-0500 s.k.PartitionManager [DEBUG] failing at >>>>>>> offset=194455 with _pending.size()=4442 pending and >>>>>>> _emittedToOffset=199437 >>>>>>> 2018-02-28T05:12:51.463-0500 s.k.PartitionManager [DEBUG] failing at >>>>>>> offset=194632 with _pending.size()=4442 pending and >>>>>>> _emittedToOffset=199437 >>>>>>> >>>>>>> 2018-02-28T05:14:05.241-0500 o.a.z.ClientCnxn [DEBUG] Got ping >>>>>>> response for sessionid: 0x261d81e8b3d003e after 10ms >>>>>>> 2018-02-28T05:14:05.703-0500 o.a.z.ClientCnxn [DEBUG] Got ping >>>>>>> response for sessionid: 0x361d81df7a80048 after 0ms >>>>>>> 2018-02-28T05:14:05.703-0500 o.a.z.ClientCnxn [DEBUG] Got ping >>>>>>> response for sessionid: 0x261d81e8b3d003d after 0ms >>>>>>> 2018-02-28T05:14:05.745-0500 o.a.z.ClientCnxn [DEBUG] Got ping >>>>>>> response for sessionid: 0x161d81df7a30043 after 2ms >>>>>>> 2018-02-28T05:14:05.775-0500 o.a.z.ClientCnxn [DEBUG] Got ping >>>>>>> response for sessionid: 0x161d81df7a30045 after 3ms >>>>>>> 2018-02-28T05:14:05.849-0500 o.a.z.ClientCnxn [DEBUG] Got ping >>>>>>> response for sessionid: 0x361d81df7a80044 after 1ms >>>>>>> 2018-02-28T05:14:05.969-0500 o.a.z.ClientCnxn [DEBUG] Got ping >>>>>>> response for sessionid: 0x161d81df7a30046 after 0ms >>>>>>> 2018-02-28T05:14:07.067-0500 o.a.z.ClientCnxn [DEBUG] Got ping >>>>>>> response for sessionid: 0x161d81df7a30041 after 11ms >>>>>>> 2018-02-28T05:14:07.131-0500 o.a.z.ClientCnxn [DEBUG] Got ping >>>>>>> response for sessionid: 0x261d81e8b3d003c after 0ms >>>>>>> 2018-02-28T05:14:07.135-0500 o.a.z.ClientCnxn [DEBUG] Got ping >>>>>>> response for sessionid: 0x161d81df7a30042 after 0ms >>>>>>> 2018-02-28T05:14:07.140-0500 o.a.z.ClientCnxn [DEBUG] Got ping >>>>>>> response for sessionid: 0x261d81e8b3d003b after 0ms >>>>>>> 2018-02-28T05:14:07.150-0500 o.a.z.ClientCnxn [DEBUG] Got ping >>>>>>> response for sessionid: 0x161d81df7a30044 after 0ms >>>>>>> 2018-02-28T05:14:08.319-0500 o.a.z.ClientCnxn [DEBUG] Got ping >>>>>>> response for sessionid: 0x361d81df7a8004b after 6ms >>>>>>> 2018-02-28T05:14:08.938-0500 o.a.z.ClientCnxn [DEBUG] Got ping >>>>>>> response for sessionid: 0x261d81e8b3d0042 after 1ms >>>>>>> 2018-02-28T05:14:08.977-0500 o.a.z.ClientCnxn [DEBUG] Got ping >>>>>>> response for sessionid: 0x161d81df7a30047 after 10ms >>>>>>> 2018-02-28T05:14:08.985-0500 o.a.z.ClientCnxn [DEBUG] Got ping >>>>>>> response for sessionid: 0x261d81e8b3d0043 after 6ms >>>>>>> 2018-02-28T05:14:08.985-0500 o.a.z.ClientCnxn [DEBUG] Got ping >>>>>>> response for sessionid: 0x261d81e8b3d0044 after 7ms >>>>>>> >>>>>>> >>> >
