Yes Cody! You can forget about the CSV(s).. they are temporary sources just to 
ensure the things.. Regarding the "do something on GroupedStream".. below is 
what i am proceeding with

...
.groupBy(new Fields(...))
.partitionAggregate(new Fields(...), new PartitionAggregator(), new 
Fields("batch"))
.each(new Fields("batch"), new BatchProcessor(), new Fields(...))



// Source Code of PartitionAggregator:

import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;

import storm.trident.operation.Aggregator;
import storm.trident.operation.TridentCollector;
import storm.trident.operation.TridentOperationContext;
import storm.trident.tuple.TridentTuple;
import backtype.storm.tuple.Values;

public class PartitionAggregator implements Aggregator<Batch> {
private String taskId;

@Override
public void prepare(Map conf, TridentOperationContext context) {
this.taskId = "PartitionAggregator-" + context.getPartitionIndex();
}

@Override
public Batch init(Object Id, TridentCollector collector) {
return new Batch();
}

@Override
public void aggregate(final Batch batch, TridentTuple tuple, final 
TridentCollector collector) {
batch.add(tuple);
}

@Override
public void complete(Batch batch, TridentCollector collector) {
collector.emit(new Values(batch));
System.out.println(">>>>" + taskId + " emitted batch of size " + batch.size() + 
"<<<<");
}

@Override
public void cleanup() {
// TODO Auto-generated method stub
}
}



// Source Code of Batch..
import java.util.ArrayList;


import storm.trident.tuple.TridentTuple;

public class Batch extends ArrayList<TridentTuple> {

@Override
public String toString() {
return "Batch [size:" + size() + "]";
}
}



The above works for me fine.. But i still want to check any other suggestible 
approaches to achieve-maximum-possible-low-latency.
On Wednesday, 7 May 2014 4:29 AM, Cody A. Ray <[email protected]> wrote:
 
Can you tell us more about this use case? I don't really understand, but given 
what you've said so far, I might create a trident topology something like this:
>
>
>    TridentTopology topology = new TridentTopology();
>    StormTopology = topology.newStream("spout1", spout)
>        .each(new Fields("request_id"), new CsvReader(), new 
>Fields("csv_field1", "csv_field2", "csv_fieldN"));
>        .groupBy(new Fields("csv_field1"))
>
>        .... do something on the GroupedStream
>        .build();
>
>
>    public class CsvReader extends BaseFunction {
>        public CsvReader() {
>        }
>
>
>        @Override
>        public void execute(TridentTuple tuple, TridentCollector collector) {
>            long requestId = tuple.getLong(0);
>            // do something with this requestId to figure out which CSV file 
>to read ???
>            /* PSEUDOCODE
>            for (each line in the CSV) {
>                 // emit one tuple per line with all the fields
>                collector.emit(new Values(line[0], line[1], line[N]));
>            }
>            */
>        }
>    }
>
>
>(Trident makes working with batches a lot easier. :)
>
>
>
>In general though, I'm not sure where you're getting the CSV files. I don't 
>think reading CSV files off of the worker nodes' disks directly would be a 
>good practice in Storm. It'd probably be better if your spouts emitted the 
>data themselves or something. 
>
>
>
>
>-Cody
>
>On Tue, May 6, 2014 at 1:13 AM, Kiran Kumar <[email protected]> wrote:
>
>Hi Padma,
>>
>>
>>Firstly, thanks for responding.
>>
>>
>>Here is how i am defining my topology conceptually..
>>
>>
>>- Spout waits for a request signal..
>>- once spout got a signal, it generates a request_id and broadcasts that 
>>request_id to 10 csv reader bolts..
>>- 10 csv reader bolts reads csv files line-by-line and emits those tuples, 
>>respectively..
>>- Now (this is the place where i need suggestion in technical/syntactical) i 
>>need to batch up those tuples from all the 10 csv reader bolts on specified 
>>fields..
>>- finally, batch-ed tuples will be processed by final bolts.
>>
>>
>>What i need is a technical approach.
>>On Tuesday, 6 May 2014 11:10 AM, padma priya chitturi 
>><[email protected]> wrote:
>> 
>>Hi,
>>>
>>>
>>> You can define spouts and bolts in  such a way that, input streams read by 
>>>spouts would be grouped on specified fields and these could be processed by 
>>>specific bolts. This way, you could make batches of input stream.
>>>
>>>
>>>
>>>On Tue, May 6, 2014 at 11:02 AM, Kiran Kumar <[email protected]> 
>>>wrote:
>>>
>>>Hi,
>>>>
>>>>
>>>>Can anyone suggest me a topology that makes batches of the input stream on 
>>>>specified fields. so that the batch will be forwarded to a function that 
>>>>processes it.
>>>>
>>>>
>>>>Regards,
>>>>Kiran Kumar Dasari.
>>>
>>>
>>>
>
>
>
>-- 
>
>Cody A. Ray, LEED AP
>[email protected]
>
>
>

Reply via email to