Please take a look at the sample application at: https://github.com/amberarrow/samples/tree/master/custom-codec
It uses the StreamCodec that you provided where tuples have the form "i^j^hello" where i runs sequentially through the natural numbers and j (the key) cycles through the range (1..5). When I run this application on the sandbox: Tuples with key 1 or 5 are all coming to one operator, for example: 2016-07-09 19:49:37,068 DEBUG myapexapp.TestPartition process - 0: tuple = 10210^5^hello, operator id = 3 2016-07-09 19:49:37,068 DEBUG myapexapp.TestPartition process - 1: tuple = 10211^1^hello, operator id = 3 Tuples with key 2 are all coming to one operator, for example: 2016-07-09 19:46:54,558 DEBUG myapexapp.TestPartition process - 0: tuple = 6962^2^hello, operator id = 4 2016-07-09 19:46:54,558 DEBUG myapexapp.TestPartition process - 1: tuple = 6967^2^hello, operator id = 4 Tuples with key 3 or 4 are all coming to one operator, for example: 2016-07-09 19:43:01,058 DEBUG myapexapp.TestPartition process - 0: tuple = 2293^3^hello, operator id = 2 2016-07-09 19:43:01,059 DEBUG myapexapp.TestPartition process - 1: tuple = 2294^4^hello, operator id = 2 So, could you elaborate on what you're seeing and how it differs from what you're expecting ? Ram On Thu, Jul 7, 2016 at 3:37 PM, Raja.Aravapalli <[email protected]> wrote: > > > Hi Sandesh, > > > Below is the way I am setting codec: > > SsnCheckCodec ssnCodec = new SsnCheckCodec(3); > dag.setInputPortAttribute(ssnCheck.input, PortContext.STREAM_CODEC, ssnCodec); > > > Also, one quick question, > > When some operator is running in multiple instances… will the records > received by each individual instance will be processed in the same order > they arrived to that instance of operator ? > > Regards, > Raja. > > From: Sandesh Hegde <[email protected]> > Reply-To: "[email protected]" <[email protected]> > Date: Thursday, July 7, 2016 at 4:29 PM > To: "[email protected]" <[email protected]> > Subject: Re: hashing > > Where are you setting the StreamCodec? > > Here is one method, > dag.setInputPortAttribute(campaignProcessor.input, > Context.PortContext.STREAM_CODEC, new MyCodec()); > > > On Thu, Jul 7, 2016 at 1:57 PM Ashwin Chandra Putta < > [email protected]> wrote: > >> Raja, >> >> Is MyOperator the actual name of the operator within your dag? If not, >> can you replace it with the actual name of the operator and try. >> >> Regards, >> Ashwin. >> >> On Thu, Jul 7, 2016 at 1:28 PM, Raja.Aravapalli < >> [email protected]> wrote: >> >>> >>> >>> Also, to share some info on the parititoner I am using: >>> >>> I am using Stateless Partitioner with below code: >>> >>> <property> >>> <name>dt.operator.MyOperator.attr.PARTITIONER</name> >>> <value>com.datatorrent.common.partitioner.StatelessPartitioner:3</value> >>> </property> >>> >>> >>> Thanks. >>> >>> Regards, >>> Raja. >>> >>> From: "Raja.Aravapalli" <[email protected]> >>> Date: Thursday, July 7, 2016 at 3:20 PM >>> To: "[email protected]" <[email protected]> >>> Subject: hashing >>> >>> >>> Hi, >>> >>> I have an operator, which is running in 3 instances I.e partions… And, I >>> want all the records with same key, here my key is "String" type, to be >>> transferred to same instance/partition. >>> >>> But, I am unable to achieve this with my below codec. >>> >>> >>> import com.datatorrent.lib.codec.KryoSerializableStreamCodec; >>> >>> public class MyCodec extends KryoSerializableStreamCodec<String> { >>> >>> @Override >>> public int getPartition(String tuple) { >>> >>> String[] toSplit = tuple.split("\\^"); >>> String exId = toSplit[1]; >>> >>> return exId.hashCode(); >>> >>> } >>> >>> } >>> >>> >>> >>> Any guidance please… >>> >>> >>> Thanks a lot. >>> >>> >>> Regards, >>> Raja. >>> >> >> >> >> -- >> >> Regards, >> Ashwin. >> >
