Try reducing number of "max spout pending" for the spout. It is likely that messages just sitting at the spout and timing out there before getting a chance to be processed. Set it to some low value, 1 or 5, and experiment with higher values to improve throughput while keeping it away from the failures.
---------- Andrey Yegorov On Sun, Jul 27, 2014 at 12:03 AM, Anuj Agrawal <[email protected]> wrote: > Hi Harsha, > > Attached more screenshots with a little more details and system stats > enabled. And, a sample of logs for 10 minutes. > > Can't figure out from these. > > Thanks, > Anuj > > > On Sun, Jul 27, 2014 at 1:34 AM, Harsha <[email protected]> wrote: > >> >> Ok I assume you are using KafkaSpout from strom/external without any >> changes to the code. From the UI screenshot it looks like your bolt is >> acknowledging messages . Enable system stats on that topology page (Its at >> the bottom of the page) and check if the ackers are running without any >> errors. I guess the reason it might be happening is that your spout is not >> receiving acks for all the messages processed by your bolt hence failing >> them and your kafka offset won't move forward because of these failures. I >> will also look for any kafka errors for your consumer id. You are running 4 >> topologies that are reading from kafka and these topologies are reading >> from different topics and/or have their own unique consumerid + topicids. >> >> On Sat, Jul 26, 2014, at 11:16 AM, Anuj Agrawal wrote: >> >> Hi Harsha, >> >> I don't see any errors in UI or logs. I just see failure counts >> increasing. See screenshot attached. >> >> Logs are filled up with the lines of kind that I showed earlier - >> committing offsets and fetching messages. The offsets sometimes decrease by >> 1 and sometimes move forward by a very small number. However, approx 1500 >> messages are fetched every time. >> >> I do see new messages being inserted into kafka. Have verified that. >> >> Thanks, >> Anuj >> >> >> On Sat, Jul 26, 2014 at 10:37 PM, Harsha <[email protected]> wrote: >> >> >> Hi Anuj, >> can you also send the errors what you are seeing in UI and also >> in logs. Are you seeing new messages inserted into your kafka topics just >> to make sure there aren't issues with your kafka . >> -Harsha >> >> >> >> On Sat, Jul 26, 2014, at 05:47 AM, Anuj Agrawal wrote: >> >> I am running 4 topologies on a storm cluster each with one bolt and one >> kafka spout. Of these, 3 are showing a high number of failures (in UI) in >> the spout itself. I looked at the logs and found that the offset isn't just >> moving (in fact, at times it is reduced by one). Sample log for one of the >> partitions below: >> >> >> anuj.agrawal@server-ingestion1:/var/log/storm$ grep "2014-07-26 17:5" >> worker-6704.log | grep "partition=24" >> 2014-07-26 17:50:27 s.k.PartitionManager [INFO] Committing offset for >> Partition{host=server-kafka5.local:9092, partition=24} >> 2014-07-26 17:50:27 s.k.PartitionManager [INFO] Committed offset 96435 >> for Partition{host=server-kafka5.local:9092, partition=24} for topology: >> 8fba6b24-e1cd-4476-91a6-bb493a0e7c87 >> 2014-07-26 17:50:57 s.k.PartitionManager [INFO] Committing offset for >> Partition{host=server-kafka5.local:9092, partition=24} >> 2014-07-26 17:50:57 s.k.PartitionManager [INFO] Committed offset 96435 >> for Partition{host=server-kafka5.local:9092, partition=24} for topology: >> 8fba6b24-e1cd-4476-91a6-bb493a0e7c87 >> 2014-07-26 17:51:27 s.k.PartitionManager [INFO] Committing offset for >> Partition{host=server-kafka5.local:9092, partition=24} >> 2014-07-26 17:51:27 s.k.PartitionManager [INFO] Committed offset 96435 >> for Partition{host=server-kafka5.local:9092, partition=24} for topology: >> 8fba6b24-e1cd-4476-91a6-bb493a0e7c87 >> 2014-07-26 17:51:57 s.k.PartitionManager [INFO] Committing offset for >> Partition{host=server-kafka5.local:9092, partition=24} >> 2014-07-26 17:51:57 s.k.PartitionManager [INFO] Committed offset 96435 >> for Partition{host=server-kafka5.local:9092, partition=24} for topology: >> 8fba6b24-e1cd-4476-91a6-bb493a0e7c87 >> 2014-07-26 17:52:27 s.k.PartitionManager [INFO] Committing offset for >> Partition{host=server-kafka5.local:9092, partition=24} >> 2014-07-26 17:52:27 s.k.PartitionManager [INFO] Committed offset 96435 >> for Partition{host=server-kafka5.local:9092, partition=24} for topology: >> 8fba6b24-e1cd-4476-91a6-bb493a0e7c87 >> 2014-07-26 17:52:57 s.k.PartitionManager [INFO] Committing offset for >> Partition{host=server-kafka5.local:9092, partition=24} >> 2014-07-26 17:52:57 s.k.PartitionManager [INFO] Committed offset 96435 >> for Partition{host=server-kafka5.local:9092, partition=24} for topology: >> 8fba6b24-e1cd-4476-91a6-bb493a0e7c87 >> 2014-07-26 17:53:27 s.k.PartitionManager [INFO] Committing offset for >> Partition{host=server-kafka5.local:9092, partition=24} >> 2014-07-26 17:53:27 s.k.PartitionManager [INFO] Committed offset 96435 >> for Partition{host=server-kafka5.local:9092, partition=24} for topology: >> 8fba6b24-e1cd-4476-91a6-bb493a0e7c87 >> 2014-07-26 17:53:57 s.k.PartitionManager [INFO] Committing offset for >> Partition{host=server-kafka5.local:9092, partition=24} >> 2014-07-26 17:53:57 s.k.PartitionManager [INFO] Committed offset 96435 >> for Partition{host=server-kafka5.local:9092, partition=24} for topology: >> 8fba6b24-e1cd-4476-91a6-bb493a0e7c87 >> 2014-07-26 17:54:27 s.k.PartitionManager [INFO] Committing offset for >> Partition{host=server-kafka5.local:9092, partition=24} >> 2014-07-26 17:54:27 s.k.PartitionManager [INFO] Committed offset 96435 >> for Partition{host=server-kafka5.local:9092, partition=24} for topology: >> 8fba6b24-e1cd-4476-91a6-bb493a0e7c87 >> 2014-07-26 17:54:57 s.k.PartitionManager [INFO] Committing offset for >> Partition{host=server-kafka5.local:9092, partition=24} >> 2014-07-26 17:54:57 s.k.PartitionManager [INFO] Committed offset 96435 >> for Partition{host=server-kafka5.local:9092, partition=24} for topology: >> 8fba6b24-e1cd-4476-91a6-bb493a0e7c87 >> 2014-07-26 17:55:27 s.k.PartitionManager [INFO] Committing offset for >> Partition{host=server-kafka5.local:9092, partition=24} >> 2014-07-26 17:55:27 s.k.PartitionManager [INFO] Committed offset 96435 >> for Partition{host=server-kafka5.local:9092, partition=24} for topology: >> 8fba6b24-e1cd-4476-91a6-bb493a0e7c87 >> 2014-07-26 17:55:57 s.k.PartitionManager [INFO] Committing offset for >> Partition{host=server-kafka5.local:9092, partition=24} >> 2014-07-26 17:55:57 s.k.ZkState [INFO] Writing >> /server/cp-kafka/AndroidAppEventIngestion/partition_24 the data >> {topology={id=8fba6b24-e1cd-4476-91a6-bb493a0e7c87, >> name=AndriodAppEventIngestion}, offset=96434, partition=24, >> broker={host=server-kafka5.local, port=9092}, topic=AndroidApp} >> 2014-07-26 17:55:57 s.k.PartitionManager [INFO] Committed offset 96434 >> for Partition{host=server-kafka5.local:9092, partition=24} for topology: >> 8fba6b24-e1cd-4476-91a6-bb493a0e7c87 >> 2014-07-26 17:56:27 s.k.PartitionManager [INFO] Committing offset for >> Partition{host=server-kafka5.local:9092, partition=24} >> 2014-07-26 17:56:27 s.k.PartitionManager [INFO] Committed offset 96434 >> for Partition{host=server-kafka5.local:9092, partition=24} for topology: >> 8fba6b24-e1cd-4476-91a6-bb493a0e7c87 >> 2014-07-26 17:56:57 s.k.PartitionManager [INFO] Committing offset for >> Partition{host=server-kafka5.local:9092, partition=24} >> 2014-07-26 17:56:57 s.k.PartitionManager [INFO] Committed offset 96434 >> for Partition{host=server-kafka5.local:9092, partition=24} for topology: >> 8fba6b24-e1cd-4476-91a6-bb493a0e7c87 >> 2014-07-26 17:57:27 s.k.PartitionManager [INFO] Committing offset for >> Partition{host=server-kafka5.local:9092, partition=24} >> 2014-07-26 17:57:27 s.k.PartitionManager [INFO] Committed offset 96434 >> for Partition{host=server-kafka5.local:9092, partition=24} for topology: >> 8fba6b24-e1cd-4476-91a6-bb493a0e7c87 >> 2014-07-26 17:57:57 s.k.PartitionManager [INFO] Committing offset for >> Partition{host=server-kafka5.local:9092, partition=24} >> 2014-07-26 17:57:57 s.k.PartitionManager [INFO] Committed offset 96434 >> for Partition{host=server-kafka5.local:9092, partition=24} for topology: >> 8fba6b24-e1cd-4476-91a6-bb493a0e7c87 >> 2014-07-26 17:58:27 s.k.PartitionManager [INFO] Committing offset for >> Partition{host=server-kafka5.local:9092, partition=24} >> 2014-07-26 17:58:27 s.k.PartitionManager [INFO] Committed offset 96434 >> for Partition{host=server-kafka5.local:9092, partition=24} for topology: >> 8fba6b24-e1cd-4476-91a6-bb493a0e7c87 >> 2014-07-26 17:58:57 s.k.PartitionManager [INFO] Committing offset for >> Partition{host=server-kafka5.local:9092, partition=24} >> 2014-07-26 17:58:57 s.k.PartitionManager [INFO] Committed offset 96434 >> for Partition{host=server-kafka5.local:9092, partition=24} for topology: >> 8fba6b24-e1cd-4476-91a6-bb493a0e7c87 >> 2014-07-26 17:59:27 s.k.PartitionManager [INFO] Committing offset for >> Partition{host=server-kafka5.local:9092, partition=24} >> 2014-07-26 17:59:27 s.k.PartitionManager [INFO] Committed offset 96434 >> for Partition{host=server-kafka5.local:9092, partition=24} for topology: >> 8fba6b24-e1cd-4476-91a6-bb493a0e7c87 >> 2014-07-26 17:59:57 s.k.PartitionManager [INFO] Committing offset for >> Partition{host=server-kafka5.local:9092, partition=24} >> 2014-07-26 17:59:57 s.k.PartitionManager [INFO] Committed offset 96434 >> for Partition{host=server-kafka5.local:9092, partition=24} for topology: >> 8fba6b24-e1cd-4476-91a6-bb493a0e7c87 >> anuj.agrawal@server-ingestion1:/var/log/storm$ >> anuj.agrawal@server-ingestion1:/var/log/storm$ >> anuj.agrawal@server-ingestion1:/var/log/storm$ >> anuj.agrawal@server-ingestion1:/var/log/storm$ >> anuj.agrawal@server-ingestion1:/var/log/storm$ grep "2014-07-26 17:5" >> worker-6704.log | grep ":24" >> *2014-07-26 17:55:27 s.k.PartitionManager [INFO] Fetched 1526 messages >> from Kafka: server-kafka5.local:24* >> *2014-07-26 17:55:27 s.k.PartitionManager [INFO] Added 1526 messages from >> Kafka: server-kafka5.local:24 to internal buffers* >> anuj.agrawal@server-ingestion1:/var/log/storm$ grep "2014-07-26 17:5" >> worker-6704.log | grep "partition_24" >> *2014-07-26 17:55:57 s.k.ZkState [INFO] Writing >> /server/cp-kafka/AndroidAppEventIngestion/partition_24 the data >> {topology={id=8fba6b24-e1cd-4476-91a6-bb493a0e7c87, >> name=AndriodAppEventIngestion}, offset=96434, partition=24, >> broker={host=server-kafka5.local, port=9092}, topic=AndroidApp}* >> anuj.agrawal@server-ingestion1:/var/log/storm$ >> anuj.agrawal@server-ingestion1:/var/log/storm$ >> >> I am unable to figure out what is going on here. Can anyone please help? >> >> >> >> >> >> Email had 1 attachment: >> >> - Screen Shot 2014-07-26 at 11.44.04 PM.png >> - 155k (image/png) >> >> >> > >
