If you want round-robin distribution which will give you uniform load
across all partitions you can use
a StreamCodec like this (provided the number of partitions is known and
static):
*public class CatagoryStreamCodec extends
KryoSerializableStreamCodec<Object> {*
* private int n = 0;*
* @Override*
* public int getPartition(Object in) {*
* return n++ % nPartitions; // nPartitions is the number of
partitions*
* }*
*}*
If you want certain category names to go to certain partitions, you can
create that mapping
within the StreamCodec (map category names to integers in the range
*0..nPartitions-1*), and, for each tuple, lookup the category name in the
map and return the corresponding value.
Ram
....
On Fri, Oct 14, 2016 at 1:17 PM, Sunil Parmar <[email protected]>
wrote:
> We’re using Stream codec to consistently / parallel processing of the data
> across the operator partitions. Our requirement is to serialize processing
> of the data based on particular tuple attribute let’s call it
> ‘catagory_name’ . In order to achieve the parallel processing of different
> category names we’re written our stream codec as following.
>
> public class CatagoryStreamCodec extends
> KryoSerializableStreamCodec<Object>
> {
>
> private static final long serialVersionUID = -687991492884005033L;
>
>
>
> @Override
>
> public int getPartition(Object in) {
>
> try {
>
> InputTuple tuple = (InputTuple) in;
>
> String partitionKehy = tuple.getName();
>
> if(partitionKehy != null) {
>
> return partitionKehy.hashCode();
>
> }
>
> }
> }
>
> It’s working as expected *but *we observed inconsistent partitions when
> we run this in production env with 20 partitioner of the operator following
> the codec in the dag.
>
> - Some operator instance didn’t process any data
> - Some operator instance process as many tuples as combined everybody
> else
>
>
> Questions :
>
> - getPartition method supposed to return the actual partition or just
> some lower bit used for deciding partition ?
> - Number of partitions is known to application properties and can vary
> between deployments or environments. Is it best practice to use that
> property in the stream codec ?
> - Any recommended hash function for getting consistent variations in
> the lower bit with less variety of data. we’ve ~100+ categories and I’m
> thinking to have 10+ operator partitions.
>
>
> Thanks,
> Sunil
>