Re: balanced of Stream Codec

2016-10-14 Thread Ashwin Chandra Putta
Sunil,

For key based partitioning, the getPartition method is supposed to return a
consistent integer representing the key for partitioning. Typically the
java hashCode of the key. The tuples are then routed based on the integer
and looking at its lower bits on the mask (number of lower bits) based on
number of downstream partitions.

A partition key with only 100 possible values may not load balance properly
and will most likely introduce skew, as in your case. It is recommended to
use a key that will generate a high range of values for the return value.

Regards,
Ashwin.

On Fri, Oct 14, 2016 at 1:17 PM, Sunil Parmar 
wrote:

> We’re using Stream codec to consistently / parallel processing of the data
> across the operator partitions. Our requirement is to serialize processing
> of the data based on particular tuple attribute let’s call it
> ‘catagory_name’ . In order to achieve the parallel processing of different
> category names we’re written our stream codec as following.
>
>public class CatagoryStreamCodec extends 
> KryoSerializableStreamCodec
> {
>
> private static final long serialVersionUID = -687991492884005033L;
>
>
>
> @Override
>
> public int getPartition(Object in) {
>
> try {
>
> InputTuple tuple = (InputTuple) in;
>
> String partitionKehy = tuple.getName();
>
> if(partitionKehy != null) {
>
> return partitionKehy.hashCode();
>
> }
>
> }
>}
>
> It’s working as expected *but *we observed inconsistent partitions when
> we run this in production env with 20 partitioner of the operator following
> the codec in the dag.
>
>- Some operator instance didn’t process any data
>- Some operator instance process as many tuples as combined everybody
>else
>
>
> Questions :
>
>- getPartition method supposed to return the actual partition or just
>some lower bit used for deciding partition ?
>- Number of partitions is known to application properties and can vary
>between deployments or environments. Is it best practice to use that
>property in the stream codec ?
>- Any recommended hash function for getting consistent variations in
>the lower bit with less variety of data. we’ve ~100+ categories and I’m
>thinking to have 10+ operator partitions.
>
>
> Thanks,
> Sunil
>



-- 

Regards,
Ashwin.


java.io.IOException: All datanodes DatanodeInfoWithStorage

2016-10-14 Thread chiranjeevi vasupilli
Hi Team,

Can you please let me know the reason, when we will get this kind of
exception. In my application containers getting killled with below
excpetion.


java.lang.RuntimeException: java.io.IOException: All datanodes
DatanodeInfoWithStorage[147.22.192.229:1004,DS-fa5a41a5-c953-477e-a5d9-3dc499d5baee,DISK]
are bad. Aborting...
at
com.datatorrent.lib.io.fs.AbstractFileOutputOperator.endWindow(AbstractFileOutputOperator.java:948)
at
com.datatorrent.stram.engine.GenericNode.processEndWindow(GenericNode.java:141)


-- 
ur's
chiru