All,

Thank you for your time.  I am learning more and more about this framework
and it is cool.

I am experimenting with a custom stream grouping that would only send
streams to a bolt if the severity value is Critical.  I can use a bolt to
filter a stream but I would rather use a stream grouping for flexibility.
 It is not working, no tuples are being sent to the printer bolt.

Here is my custom grouping:
public  class CriticalStreamGrouping implements CustomStreamGrouping {
    int numTasks = 0;
    @Override
    public void prepare(WorkerTopologyContext workerTopologyContext,
GlobalStreamId globalStreamId,
                        List<Integer> integers) {
        numTasks = integers.size();
    }

    @Override
    public List<Integer> chooseTasks(int i, List<Object> objects) {

        List<Integer> boltsID = new ArrayList();
        if (objects.size()>0){
            // getting the tuple from the stream, I hope
            String currentSeverity = objects.get(0).toString();
            // testing for the desired value in the tuple
            if (currentSeverity.contains("Critical")) {
                boltsID.add(0); //add correct id
            }
        }
        return boltsID;
    }

}

and this is the topology
        // this topology is testing the new custom stream grouping
        builder.setSpout("spout", new DeviceSpout());
        builder.setBolt("prime", new PrimeNumberBolt())
                .shuffleGrouping("spout");
        // this is testing the new custom grouping
        builder.setBolt("printerBolt", new PrinterBolt())
                .customGrouping("prime", new CriticalStreamGrouping());
        //


David Novogrodsky
[email protected]
http://www.linkedin.com/in/davidnovogrodsky

Reply via email to