Because you didn’t tell trident how to distribute tupes from 1 partition to 4
partitions.
I think the code below can work:
TridentState state = topology
.merge(merge_streams)
.parallelismHint(1)
.shuffle()
.each(new Fields("spoutName", "log"), new
ETLFunction(), new Fields("time_index", "count"))
.parallelismHint(4)
.groupBy(new Fields("time_index"))
.persistentAggregate(new
RedisState.Factory(StateType.NON_TRANSACTIONAL),
new
Fields("time_index", "count"), new Count(), new Fields("agg"))
.parallelismHint(6);
And the groupBy+ persistentAggregate is really parallel by 6 executors.
发件人: IvyTang [mailto:[email protected]]
发送时间: 2014年2月21日 12:14
收件人: [email protected]
主题: Trident spout merge parallelismHint
Here is my code:
private static String[] stream_names = new String[] { "spout_0",
"spout_1" };
for (String stream_name : stream_names) {
Stream stream = topology.newStream(stream_name, new
WordSpout(stream_name, 10000))
.parallelismHint(1);
merge_streams.add(stream);
}
TridentState state = topology
.merge(merge_streams)
.parallelismHint(1)
.each(new Fields("spoutName", "log"), new ETLFunction(), new
Fields("time_index", "count"))
.parallelismHint(4)
.groupBy(new Fields("time_index"))
.persistentAggregate(new
RedisState.Factory(StateType.NON_TRANSACTIONAL),
new Fields("time_index", "count"), new Count(), new
Fields("agg"))
.parallelismHint(6);
I thought the spout for each spout_name will be only 1 spout,but in fact,each
spout_name will open 4 spout.
It seems that the 'parallelismHint(1)’ after topology.newStream and
topology.merge doesn’t work , the parallelismHint(4) affects the spout and the
ETLFunction().
I just want to config each spout_name for 1 spout , how to config?
And parallelismHint(6) in deed let RedisState be 6 instances.Dose that mean the
groupBy+ persistentAggregate is parallel by 6 executors?
Thanks!
--
Best regards,
Ivy Tang
---
此电子邮件没有病毒和恶意软件,因为 avast! 防病毒保护处于活动状态。
http://www.avast.com