We're using Stream codec to consistently / parallel processing of the data 
across the operator partitions. Our requirement is to serialize processing of 
the data based on particular tuple attribute let's call it 'catagory_name' . In 
order to achieve the parallel processing of different category names we're 
written our stream codec as following.

   public class CatagoryStreamCodec extends KryoSerializableStreamCodec<Object> 

private static final long serialVersionUID = -687991492884005033L;


    public int getPartition(Object in) {

        try {

        InputTuple tuple = (InputTuple) in;

String partitionKehy = tuple.getName();

        if(partitionKehy != null) {

return partitionKehy.hashCode();




It's working as expected but we observed inconsistent partitions when we run 
this in production env with 20 partitioner of the operator following the codec 
in the dag.

  *   Some operator instance didn't process any data
  *   Some operator instance process as many tuples as combined everybody else

Questions :

  *   getPartition method supposed to return the actual partition or just some 
lower bit used for deciding partition ?
  *   Number of partitions is known to application properties and can vary 
between deployments or environments. Is it best practice to use that property 
in the stream codec ?
  *   Any recommended hash function for getting consistent variations in the 
lower bit with less variety of data. we've ~100+ categories and I'm thinking to 
have 10+ operator partitions.


Reply via email to