Hi Sandesh,
Below is the way I am setting codec: SsnCheckCodec ssnCodec = new SsnCheckCodec(3); dag.setInputPortAttribute(ssnCheck.input, PortContext.STREAM_CODEC, ssnCodec); Also, one quick question, When some operator is running in multiple instances… will the records received by each individual instance will be processed in the same order they arrived to that instance of operator ? Regards, Raja. From: Sandesh Hegde <[email protected]<mailto:[email protected]>> Reply-To: "[email protected]<mailto:[email protected]>" <[email protected]<mailto:[email protected]>> Date: Thursday, July 7, 2016 at 4:29 PM To: "[email protected]<mailto:[email protected]>" <[email protected]<mailto:[email protected]>> Subject: Re: hashing Where are you setting the StreamCodec? Here is one method, dag.setInputPortAttribute(campaignProcessor.input, Context.PortContext.STREAM_CODEC, new MyCodec()); On Thu, Jul 7, 2016 at 1:57 PM Ashwin Chandra Putta <[email protected]<mailto:[email protected]>> wrote: Raja, Is MyOperator the actual name of the operator within your dag? If not, can you replace it with the actual name of the operator and try. Regards, Ashwin. On Thu, Jul 7, 2016 at 1:28 PM, Raja.Aravapalli <[email protected]<mailto:[email protected]>> wrote: Also, to share some info on the parititoner I am using: I am using Stateless Partitioner with below code: <property> <name>dt.operator.MyOperator.attr.PARTITIONER</name> <value>com.datatorrent.common.partitioner.StatelessPartitioner:3</value> </property> Thanks. Regards, Raja. From: "Raja.Aravapalli" <[email protected]<mailto:[email protected]>> Date: Thursday, July 7, 2016 at 3:20 PM To: "[email protected]<mailto:[email protected]>" <[email protected]<mailto:[email protected]>> Subject: hashing Hi, I have an operator, which is running in 3 instances I.e partions… And, I want all the records with same key, here my key is "String" type, to be transferred to same instance/partition. But, I am unable to achieve this with my below codec. import com.datatorrent.lib.codec.KryoSerializableStreamCodec; public class MyCodec extends KryoSerializableStreamCodec<String> { @Override public int getPartition(String tuple) { String[] toSplit = tuple.split("\\^"); String exId = toSplit[1]; return exId.hashCode(); } } Any guidance please… Thanks a lot. Regards, Raja. -- Regards, Ashwin.
