All,
Thank you for your time. I am learning more and more about this framework
and it is cool.
I am experimenting with a custom stream grouping that would only send
streams to a bolt if the severity value is Critical. I can use a bolt to
filter a stream but I would rather use a stream grouping for flexibility.
It is not working, no tuples are being sent to the printer bolt.
Here is my custom grouping:
public class CriticalStreamGrouping implements CustomStreamGrouping {
int numTasks = 0;
@Override
public void prepare(WorkerTopologyContext workerTopologyContext,
GlobalStreamId globalStreamId,
List<Integer> integers) {
numTasks = integers.size();
}
@Override
public List<Integer> chooseTasks(int i, List<Object> objects) {
List<Integer> boltsID = new ArrayList();
if (objects.size()>0){
// getting the tuple from the stream, I hope
String currentSeverity = objects.get(0).toString();
// testing for the desired value in the tuple
if (currentSeverity.contains("Critical")) {
boltsID.add(0); //add correct id
}
}
return boltsID;
}
}
and this is the topology
// this topology is testing the new custom stream grouping
builder.setSpout("spout", new DeviceSpout());
builder.setBolt("prime", new PrimeNumberBolt())
.shuffleGrouping("spout");
// this is testing the new custom grouping
builder.setBolt("printerBolt", new PrinterBolt())
.customGrouping("prime", new CriticalStreamGrouping());
//
David Novogrodsky
[email protected]
http://www.linkedin.com/in/davidnovogrodsky