It'll really come down to tuning the system properly, in my opinion. There are a few good articles out there, such as this one: https://github.com/infochimps-labs/big_data_for_chimps/blob/master/25-storm%2Btrident-tuning.asciidoc
The rules of thumb that I found most useful (for kafka spout and persistentAggregate store): - Number of workers should a multiple of number of machines - Number of kafka partitions should be a multiple of spout parallelism - Parallelism should be a multiple of number of workers - Persistence parallelism should be equal to the number of workers for best cache efficiency and lowest bulk-request overhead For example: - 3 supervisor machines with 1 topology, so 3 workers. - 3 kafka partitions so spout parallelism 3. - If my supervisor machines have 4 cores and I want 2 executors/core, so 24 parallelism. (4 cores * 2 executors * 3 workers) - Again 3 workers so persistence parallelism is 3 too. -Cody On Tue, May 6, 2014 at 9:49 PM, Kiran Kumar <[email protected]>wrote: > Yes Cody! You can forget about the CSV(s).. they are temporary sources > just to ensure the things.. Regarding the "do something on GroupedStream".. > below is what i am proceeding with > > ... > .groupBy(new Fields(...)) > .partitionAggregate(new Fields(...), new PartitionAggregator(), new > Fields("batch")) > .each(new Fields("batch"), new BatchProcessor(), new Fields(...)) > > > > // Source Code of PartitionAggregator: > > import java.util.Map; > import java.util.Timer; > import java.util.TimerTask; > > import storm.trident.operation.Aggregator; > import storm.trident.operation.TridentCollector; > import storm.trident.operation.TridentOperationContext; > import storm.trident.tuple.TridentTuple; > import backtype.storm.tuple.Values; > > public class PartitionAggregator implements Aggregator<Batch> { > private String taskId; > > @Override > public void prepare(Map conf, TridentOperationContext context) { > this.taskId = "PartitionAggregator-" + context.getPartitionIndex(); > } > > @Override > public Batch init(Object Id, TridentCollector collector) { > return new Batch(); > } > > @Override > public void aggregate(final Batch batch, TridentTuple tuple, final > TridentCollector collector) { > batch.add(tuple); > } > > @Override > public void complete(Batch batch, TridentCollector collector) { > collector.emit(new Values(batch)); > System.out.println(">>>>" + taskId + " emitted batch of size " + > batch.size() + "<<<<"); > } > > @Override > public void cleanup() { > // TODO Auto-generated method stub > } > } > > > // Source Code of Batch.. > import java.util.ArrayList; > > import storm.trident.tuple.TridentTuple; > > public class Batch extends ArrayList<TridentTuple> { > > @Override > public String toString() { > return "Batch [size:" + size() + "]"; > } > } > > > > The above works for me fine.. But i still want to check any other > suggestible approaches to achieve-maximum-possible-low-latency. > On Wednesday, 7 May 2014 4:29 AM, Cody A. Ray <[email protected]> > wrote: > > Can you tell us more about this use case? I don't really understand, but > given what you've said so far, I might create a trident topology something > like this: > > TridentTopology topology = new TridentTopology(); > StormTopology = topology.newStream("spout1", spout) > .each(new Fields("request_id"), new CsvReader(), new > Fields("csv_field1", "csv_field2", "csv_fieldN")); > .groupBy(new Fields("csv_field1")) > .... do something on the GroupedStream > .build(); > > public class CsvReader extends BaseFunction { > public CsvReader() { > } > > @Override > public void execute(TridentTuple tuple, TridentCollector > collector) { > long requestId = tuple.getLong(0); > // do something with this requestId to figure out which CSV > file to read ??? > /* PSEUDOCODE > for (each line in the CSV) { > // emit one tuple per line with all the fields > collector.emit(new Values(line[0], line[1], line[N])); > } > */ > } > } > > (Trident makes working with batches a lot easier. :) > > In general though, I'm not sure where you're getting the CSV files. I > don't think reading CSV files off of the worker nodes' disks directly would > be a good practice in Storm. It'd probably be better if your spouts emitted > the data themselves or something. > > > -Cody > > On Tue, May 6, 2014 at 1:13 AM, Kiran Kumar <[email protected]>wrote: > > Hi Padma, > > Firstly, thanks for responding. > > Here is how i am defining my topology conceptually.. > > - Spout waits for a request signal.. > - once spout got a signal, it generates a request_id and broadcasts that > request_id to 10 csv reader bolts.. > - 10 csv reader bolts reads csv files line-by-line and emits those tuples, > respectively.. > - Now (this is the place where i need suggestion in technical/syntactical) > i need to batch up those tuples from all the 10 csv reader bolts on > specified fields.. > - finally, batch-ed tuples will be processed by final bolts. > > What i need is a technical approach. > On Tuesday, 6 May 2014 11:10 AM, padma priya chitturi < > [email protected]> wrote: > > Hi, > > You can define spouts and bolts in such a way that, input streams read > by spouts would be grouped on specified fields and these could be processed > by specific bolts. This way, you could make batches of input stream. > > > On Tue, May 6, 2014 at 11:02 AM, Kiran Kumar <[email protected]>wrote: > > Hi, > > Can anyone suggest me a topology that makes batches of the input stream > on specified fields. so that the batch will be forwarded to a function that > processes it. > > Regards, > Kiran Kumar Dasari. > > > > > > > > -- > Cody A. Ray, LEED AP > [email protected] > 215.501.7891 > > > -- Cody A. Ray, LEED AP [email protected] 215.501.7891
