Is numberOfTasks size of the array you receive as third argument in your 
prepare? In that case change argument to boltIds.add method to 
targetTasks.get((int) mod);

What you receive in prepare method as the third argument is the list of 
integers that represent all the tasks for the target. And in chooseTasks you 
are expected to pick a task id.

From: Rajeev <[email protected]>
Reply-To: "[email protected]" <[email protected]>
Date: Wednesday, April 4, 2018 at 7:17 PM
To: "[email protected]" <[email protected]>
Subject: Custom Grouping using flux

I am using a custom grouping that implements CustomStreamGrouping interface.
Below is my chooseTasks method implementation:


@Override
public List< Integer > chooseTasks( int taskId, List< Object > values ) {
List< Integer > boltIds = new ArrayList();
if( values.size() > 0 ) {
long valueToGroup = Long.parseLong( values.get( 1 ).toString() );
long mod = valueToGroup % numberOfTasks;
boltIds.add( ( int ) mod );
}
return boltIds;
}

This is how i use in my flux yaml file under streams:
          - name: "SampleStream"
            from: "Spout01"
            to: "Bolt01"
            grouping:
               type: CUSTOM
               customClass:
                  className: "com.knowesis.sift.core.HashGrouping"


The issue with this is: the data from Spout01 is not sent to Bolt01, but to 
some other bolt in the topology.
I printed the mod value and the numberOfTasks (numberOfTasks = 
targetTasks.size()) - that seems ok to me.

Is the chooseTasks method that i have correct ? is there any documentation on 
the same that explains how to override this method ?

Regards,
Rajeev.

Reply via email to