Yes Cody! You can forget about the CSV(s).. they are temporary sources just to
ensure the things.. Regarding the "do something on GroupedStream".. below is
what i am proceeding with
...
.groupBy(new Fields(...))
.partitionAggregate(new Fields(...), new PartitionAggregator(), new
Fields("batch"))
.each(new Fields("batch"), new BatchProcessor(), new Fields(...))
// Source Code of PartitionAggregator:
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import storm.trident.operation.Aggregator;
import storm.trident.operation.TridentCollector;
import storm.trident.operation.TridentOperationContext;
import storm.trident.tuple.TridentTuple;
import backtype.storm.tuple.Values;
public class PartitionAggregator implements Aggregator<Batch> {
private String taskId;
@Override
public void prepare(Map conf, TridentOperationContext context) {
this.taskId = "PartitionAggregator-" + context.getPartitionIndex();
}
@Override
public Batch init(Object Id, TridentCollector collector) {
return new Batch();
}
@Override
public void aggregate(final Batch batch, TridentTuple tuple, final
TridentCollector collector) {
batch.add(tuple);
}
@Override
public void complete(Batch batch, TridentCollector collector) {
collector.emit(new Values(batch));
System.out.println(">>>>" + taskId + " emitted batch of size " + batch.size() +
"<<<<");
}
@Override
public void cleanup() {
// TODO Auto-generated method stub
}
}
// Source Code of Batch..
import java.util.ArrayList;
import storm.trident.tuple.TridentTuple;
public class Batch extends ArrayList<TridentTuple> {
@Override
public String toString() {
return "Batch [size:" + size() + "]";
}
}
The above works for me fine.. But i still want to check any other suggestible
approaches to achieve-maximum-possible-low-latency.
On Wednesday, 7 May 2014 4:29 AM, Cody A. Ray <[email protected]> wrote:
Can you tell us more about this use case? I don't really understand, but given
what you've said so far, I might create a trident topology something like this:
>
>
> TridentTopology topology = new TridentTopology();
> StormTopology = topology.newStream("spout1", spout)
> .each(new Fields("request_id"), new CsvReader(), new
>Fields("csv_field1", "csv_field2", "csv_fieldN"));
> .groupBy(new Fields("csv_field1"))
>
> .... do something on the GroupedStream
> .build();
>
>
> public class CsvReader extends BaseFunction {
> public CsvReader() {
> }
>
>
> @Override
> public void execute(TridentTuple tuple, TridentCollector collector) {
> long requestId = tuple.getLong(0);
> // do something with this requestId to figure out which CSV file
>to read ???
> /* PSEUDOCODE
> for (each line in the CSV) {
> // emit one tuple per line with all the fields
> collector.emit(new Values(line[0], line[1], line[N]));
> }
> */
> }
> }
>
>
>(Trident makes working with batches a lot easier. :)
>
>
>
>In general though, I'm not sure where you're getting the CSV files. I don't
>think reading CSV files off of the worker nodes' disks directly would be a
>good practice in Storm. It'd probably be better if your spouts emitted the
>data themselves or something.
>
>
>
>
>-Cody
>
>On Tue, May 6, 2014 at 1:13 AM, Kiran Kumar <[email protected]> wrote:
>
>Hi Padma,
>>
>>
>>Firstly, thanks for responding.
>>
>>
>>Here is how i am defining my topology conceptually..
>>
>>
>>- Spout waits for a request signal..
>>- once spout got a signal, it generates a request_id and broadcasts that
>>request_id to 10 csv reader bolts..
>>- 10 csv reader bolts reads csv files line-by-line and emits those tuples,
>>respectively..
>>- Now (this is the place where i need suggestion in technical/syntactical) i
>>need to batch up those tuples from all the 10 csv reader bolts on specified
>>fields..
>>- finally, batch-ed tuples will be processed by final bolts.
>>
>>
>>What i need is a technical approach.
>>On Tuesday, 6 May 2014 11:10 AM, padma priya chitturi
>><[email protected]> wrote:
>>
>>Hi,
>>>
>>>
>>> You can define spouts and bolts in such a way that, input streams read by
>>>spouts would be grouped on specified fields and these could be processed by
>>>specific bolts. This way, you could make batches of input stream.
>>>
>>>
>>>
>>>On Tue, May 6, 2014 at 11:02 AM, Kiran Kumar <[email protected]>
>>>wrote:
>>>
>>>Hi,
>>>>
>>>>
>>>>Can anyone suggest me a topology that makes batches of the input stream on
>>>>specified fields. so that the batch will be forwarded to a function that
>>>>processes it.
>>>>
>>>>
>>>>Regards,
>>>>Kiran Kumar Dasari.
>>>
>>>
>>>
>
>
>
>--
>
>Cody A. Ray, LEED AP
>[email protected]
>
>
>