I am using a custom grouping that implements CustomStreamGrouping interface.
Below is my chooseTasks method implementation:
@Override
public List< Integer > chooseTasks( int taskId, List< Object > values ) {
List< Integer > boltIds = new ArrayList();
if( values.size() > 0 ) {
long valueToGroup = Long.parseLong( values.get( 1 ).toString() );
long mod = valueToGroup % numberOfTasks;
boltIds.add( ( int ) mod );
}
return boltIds;
}
This is how i use in my flux yaml file under streams:
- name: "SampleStream"
from: "Spout01"
to: "Bolt01"
grouping:
type: CUSTOM
customClass:
className: "com.knowesis.sift.core.HashGrouping"
The issue with this is: the data from Spout01 is not sent to Bolt01, but to
some other bolt in the topology.
I printed the mod value and the numberOfTasks (numberOfTasks =
targetTasks.size()) - that seems ok to me.
Is the chooseTasks method that i have correct ? is there any documentation
on the same that explains how to override this method ?
Regards,
Rajeev.