Because you didn’t tell trident how to distribute tupes from 1 partition to 4 
partitions.

I think the code below can work:

TridentState state = topology

                                     .merge(merge_streams)

                                     .parallelismHint(1)

                                     .shuffle()

                                     .each(new Fields("spoutName", "log"), new 
ETLFunction(), new Fields("time_index", "count"))

                                     .parallelismHint(4)

                                     .groupBy(new Fields("time_index"))

                                     .persistentAggregate(new 
RedisState.Factory(StateType.NON_TRANSACTIONAL),

                                                        new 
Fields("time_index", "count"), new Count(), new Fields("agg"))

                                     .parallelismHint(6);



And the groupBy+ persistentAggregate is really parallel by 6 executors.



发件人: IvyTang [mailto:[email protected]]
发送时间: 2014年2月21日 12:14
收件人: [email protected]
主题: Trident spout merge parallelismHint



Here is my code:

               private static String[] stream_names = new String[] { "spout_0", 
"spout_1" };

           for (String stream_name : stream_names) {

           Stream stream = topology.newStream(stream_name, new 
WordSpout(stream_name, 10000))

              .parallelismHint(1);

           merge_streams.add(stream);

       }



       TridentState state = topology

              .merge(merge_streams)

                  .parallelismHint(1)

               .each(new Fields("spoutName", "log"), new ETLFunction(), new 
Fields("time_index", "count"))

              .parallelismHint(4)

              .groupBy(new Fields("time_index"))

              .persistentAggregate(new 
RedisState.Factory(StateType.NON_TRANSACTIONAL),

                     new Fields("time_index", "count"), new Count(), new 
Fields("agg"))

                  .parallelismHint(6);



I thought the spout for each spout_name will be only 1 spout,but in fact,each 
spout_name will open 4 spout.

It seems that the 'parallelismHint(1)’ after topology.newStream and 
topology.merge doesn’t work , the parallelismHint(4) affects the spout and the 
ETLFunction().



I just want to config each spout_name for 1 spout , how to config?



And parallelismHint(6) in deed let RedisState be 6 instances.Dose that mean the 
groupBy+ persistentAggregate is parallel by 6 executors?





Thanks!





--

Best regards,



Ivy Tang





---
此电子邮件没有病毒和恶意软件,因为 avast! 防病毒保护处于活动状态。
http://www.avast.com

Reply via email to