Hi All,
Thanks for your prompt reply.
Please find the below code.
------------------------------------------------------------------------------------
[CODE]
builder.setSpout(spoutid, kafkaspout, 5);
builder.setBolt("tweetStream", new ActivityBolt(),
3).shuffleGrouping(spoutid);
builder.setBolt("tweetBolt", new HBStreamTweetBolt("Tweet"),
5).fieldsGrouping("tweetStream", new Fields("tweet"));
builder.setBolt("reTweetBolt", new HBStreamReTweetBolt("ReTweet"),
5).fieldsGrouping("tweetStream", new Fields("reTweet"));
builder.setBolt("replyTweetBolt", new HBStreamReplyTweetBolt("ReplyTweet"),
3).fieldsGrouping("tweetStream", new Fields("replyTweet"));
builder.setBolt("twitterUserLBolt", new HBStreamUserBolt("Twitter_User"),
5).fieldsGrouping("tweetBolt", new
Fields("user")).fieldsGrouping("reTweetBolt", new
Fields("user")).fieldsGrouping("replyTweetBolt", new Fields("user"));
builder.setBolt("tweetMediaURLBolt", new
HBStreamTweetMediaURLBolt("TweetMediaURL"),
3).fieldsGrouping("twitterUserLBolt", new Fields("tweetMediaURL"));
builder.setBolt("tweetHashTagsBolt", new
HBStreamTweetHashtagsBolt("TweetHashtags"),
3).fieldsGrouping("tweetMediaURLBolt", new Fields("tweetHashtags"));
builder.setBolt("tweetEntitiesURLBolt", new
HBStreamTweetEntitiesURLBolt("TweetEntities_URL"),
3).fieldsGrouping("tweetHashTagsBolt", new Fields("tweetEntitiesURL"));
builder.setBolt("tweetUsermentionURLBolt", new
HBStreamTweetUserMentionBolt("TweetUserMention"),
3).fieldsGrouping("tweetEntitiesURLBolt", new
Fields("tweetUsermentionURL"));
[CODE]
-----------------------------------------------------------------------------------
*Activity Bolt:*
declarer.declare(new Fields("tweet","reTweet","replyTweet"));
*Inside Execute:*
if(activity.getVerb().equalsIgnoreCase("post") && activity.getInReplyTo()
== null) {
//System.out.println("######### Emitting Tweet #######");
this.collector.emit(new Values(activity,null,null));
}else if(activity.getVerb().equalsIgnoreCase("share")) {
//System.out.println("######### Emitting ReTweet #######");
this.collector.emit(new Values(null,activity,null));
}else if(activity.getVerb().equalsIgnoreCase("post") &&
activity.getInReplyTo() != null) {
//System.out.println("######### Emitting ReplyTweet #######");
this.collector.emit(new Values(null,null,activity));
}
--------------------------------------------------------------------------------------------------------------------------------------------------------
*HBStreamTweet Bolt*
declarer.declare(new Fields("user"));
*Inside Execute method:*
this.collector.emit(new Values(activity));
-----------------------------------------------------------------------------------------------------------------------------------------------------------
Similarly, we are creating fields and emitting activity.
Is something wrong with the fieldgrouping we have implemented?
Please suggest me the best way to implement this.
Thanks & Regards,
*Riyaz*
On Thu, May 29, 2014 at 1:04 AM, P. Taylor Goetz <[email protected]> wrote:
> Fields grouping uses a mod hash function to determine which task to send a
> tuple.
>
> It sounds like there's not enough variety in the field values you are
> grouping such that they are all getting sent to the same task.
>
> Without seeing your code and data I can't tell for sure.
>
> -Taylor
>
> On May 28, 2014, at 5:56 PM, Shaikh Riyaz <[email protected]> wrote:
>
> Hi All,
>
> We running Storm cluster with following servers.
>
> One Nimbus
> Six supervisor with 2 workers each running on 6700 and 6701 ports.
>
> All tuples are going to only one supervisor and only to one worker (6701)
> running on that supervisor.
>
> We have one KafkaSpout and 6 bolts processing the data. We are using
> fieldgrouping to pass tuple from one bolt to another. Each tuple is saving
> some data to HBase.
>
> One of the executor has emitted 609180 tuples and remaining executor has
> emitted 200 tuples as whole.
>
> we have configured our spout and tuples with parallelism hint 5.
>
> Please let me know what might wrong with the configuration.
>
> Thanks in advance.
>
>
> --
> Regards,
>
> Riyaz
>
>
--
Regards,
Riyaz