Which is the bolt that is failing? Your setup seems strange to me. You use fields grouping, and have three bolts that subscribe from a router bolt. But for the three fields that you set two to null. This means, I assume, that if you send a tweet, it will always get routed to the same retweet bolt, and the same reply bolt. Why not emit directly to three separate streams? On May 29, 2014 8:53 AM, "Shaikh Riyaz" <[email protected]> wrote:
> Hi All, > > Please help me to solve this problem. > > Still not, server is going down due to high load. > > Thanks &Regards, > Riyaz > > > On Thu, May 29, 2014 at 2:05 AM, Shaikh Riyaz <[email protected]> > wrote: > >> Here is the configuration. >> Config conf = new Config(); >> conf.setNumWorkers(10); >> conf.setMaxSpoutPending(80000); >> conf.setMaxTaskParallelism(6); >> //conf.setDebug(true); >> conf.put(Config.NIMBUS_HOST, "x.x.x.x"); >> conf.put(Config.STORM_ZOOKEEPER_PORT, 2181); >> conf.put(RichSpoutBatchExecutor.MAX_BATCH_SIZE_CONF, 64 * 1024); >> >> Do i have to use shuffle grouping instead of field gouping? >> >> >> >> On Thu, May 29, 2014 at 2:01 AM, P. Taylor Goetz <[email protected]> >> wrote: >> >>> That seems okay. How many workers are you assigning to the topology? >>> >>> -Taylor >>> >>> On May 28, 2014, at 6:15 PM, Shaikh Riyaz <[email protected]> wrote: >>> >>> Hi All, >>> >>> Thanks for your prompt reply. >>> >>> Please find the below code. >>> >>> ------------------------------------------------------------------------------------ >>> [CODE] >>> builder.setSpout(spoutid, kafkaspout, 5); >>> builder.setBolt("tweetStream", new ActivityBolt(), >>> 3).shuffleGrouping(spoutid); >>> builder.setBolt("tweetBolt", new HBStreamTweetBolt("Tweet"), >>> 5).fieldsGrouping("tweetStream", new Fields("tweet")); >>> builder.setBolt("reTweetBolt", new HBStreamReTweetBolt("ReTweet"), >>> 5).fieldsGrouping("tweetStream", new Fields("reTweet")); >>> builder.setBolt("replyTweetBolt", new >>> HBStreamReplyTweetBolt("ReplyTweet"), 3).fieldsGrouping("tweetStream", new >>> Fields("replyTweet")); >>> builder.setBolt("twitterUserLBolt", new >>> HBStreamUserBolt("Twitter_User"), 5).fieldsGrouping("tweetBolt", new >>> Fields("user")).fieldsGrouping("reTweetBolt", new >>> Fields("user")).fieldsGrouping("replyTweetBolt", new Fields("user")); >>> builder.setBolt("tweetMediaURLBolt", new >>> HBStreamTweetMediaURLBolt("TweetMediaURL"), >>> 3).fieldsGrouping("twitterUserLBolt", new Fields("tweetMediaURL")); >>> builder.setBolt("tweetHashTagsBolt", new >>> HBStreamTweetHashtagsBolt("TweetHashtags"), >>> 3).fieldsGrouping("tweetMediaURLBolt", new Fields("tweetHashtags")); >>> builder.setBolt("tweetEntitiesURLBolt", new >>> HBStreamTweetEntitiesURLBolt("TweetEntities_URL"), >>> 3).fieldsGrouping("tweetHashTagsBolt", new Fields("tweetEntitiesURL")); >>> builder.setBolt("tweetUsermentionURLBolt", new >>> HBStreamTweetUserMentionBolt("TweetUserMention"), >>> 3).fieldsGrouping("tweetEntitiesURLBolt", new >>> Fields("tweetUsermentionURL")); >>> [CODE] >>> >>> ----------------------------------------------------------------------------------- >>> >>> *Activity Bolt:* >>> >>> declarer.declare(new Fields("tweet","reTweet","replyTweet")); >>> >>> *Inside Execute:* >>> if(activity.getVerb().equalsIgnoreCase("post") && >>> activity.getInReplyTo() == null) { >>> //System.out.println("######### Emitting Tweet #######"); >>> this.collector.emit(new Values(activity,null,null)); >>> }else if(activity.getVerb().equalsIgnoreCase("share")) { >>> //System.out.println("######### Emitting ReTweet #######"); >>> this.collector.emit(new Values(null,activity,null)); >>> }else if(activity.getVerb().equalsIgnoreCase("post") && >>> activity.getInReplyTo() != null) { >>> //System.out.println("######### Emitting ReplyTweet #######"); >>> this.collector.emit(new Values(null,null,activity)); >>> } >>> >>> -------------------------------------------------------------------------------------------------------------------------------------------------------- >>> >>> *HBStreamTweet Bolt* >>> >>> declarer.declare(new Fields("user")); >>> >>> *Inside Execute method:* >>> this.collector.emit(new Values(activity)); >>> >>> ----------------------------------------------------------------------------------------------------------------------------------------------------------- >>> >>> Similarly, we are creating fields and emitting activity. >>> >>> Is something wrong with the fieldgrouping we have implemented? >>> >>> Please suggest me the best way to implement this. >>> >>> Thanks & Regards, >>> *Riyaz* >>> >>> >>> On Thu, May 29, 2014 at 1:04 AM, P. Taylor Goetz <[email protected]> >>> wrote: >>> >>>> Fields grouping uses a mod hash function to determine which task to >>>> send a tuple. >>>> >>>> It sounds like there's not enough variety in the field values you are >>>> grouping such that they are all getting sent to the same task. >>>> >>>> Without seeing your code and data I can't tell for sure. >>>> >>>> -Taylor >>>> >>>> On May 28, 2014, at 5:56 PM, Shaikh Riyaz <[email protected]> wrote: >>>> >>>> Hi All, >>>> >>>> We running Storm cluster with following servers. >>>> >>>> One Nimbus >>>> Six supervisor with 2 workers each running on 6700 and 6701 ports. >>>> >>>> All tuples are going to only one supervisor and only to one worker >>>> (6701) running on that supervisor. >>>> >>>> We have one KafkaSpout and 6 bolts processing the data. We are using >>>> fieldgrouping to pass tuple from one bolt to another. Each tuple is saving >>>> some data to HBase. >>>> >>>> One of the executor has emitted 609180 tuples and remaining executor >>>> has emitted 200 tuples as whole. >>>> >>>> we have configured our spout and tuples with parallelism hint 5. >>>> >>>> Please let me know what might wrong with the configuration. >>>> >>>> Thanks in advance. >>>> >>>> >>>> -- >>>> Regards, >>>> >>>> Riyaz >>>> >>>> >>> >>> >>> -- >>> Regards, >>> >>> Riyaz >>> >>> >> >> >> -- >> Regards, >> >> Riyaz >> >> > > > -- > Regards, > > Riyaz > >
