Where are you setting the StreamCodec? Here is one method, dag.setInputPortAttribute(campaignProcessor.input, Context.PortContext.STREAM_CODEC, new MyCodec());
On Thu, Jul 7, 2016 at 1:57 PM Ashwin Chandra Putta < [email protected]> wrote: > Raja, > > Is MyOperator the actual name of the operator within your dag? If not, > can you replace it with the actual name of the operator and try. > > Regards, > Ashwin. > > On Thu, Jul 7, 2016 at 1:28 PM, Raja.Aravapalli < > [email protected]> wrote: > >> >> >> Also, to share some info on the parititoner I am using: >> >> I am using Stateless Partitioner with below code: >> >> <property> >> <name>dt.operator.MyOperator.attr.PARTITIONER</name> >> <value>com.datatorrent.common.partitioner.StatelessPartitioner:3</value> >> </property> >> >> >> Thanks. >> >> Regards, >> Raja. >> >> From: "Raja.Aravapalli" <[email protected]> >> Date: Thursday, July 7, 2016 at 3:20 PM >> To: "[email protected]" <[email protected]> >> Subject: hashing >> >> >> Hi, >> >> I have an operator, which is running in 3 instances I.e partions⦠And, I >> want all the records with same key, here my key is "String" type, to be >> transferred to same instance/partition. >> >> But, I am unable to achieve this with my below codec. >> >> >> import com.datatorrent.lib.codec.KryoSerializableStreamCodec; >> >> public class MyCodec extends KryoSerializableStreamCodec<String> { >> >> @Override >> public int getPartition(String tuple) { >> >> String[] toSplit = tuple.split("\\^"); >> String exId = toSplit[1]; >> >> return exId.hashCode(); >> >> } >> >> } >> >> >> >> Any guidance please⦠>> >> >> Thanks a lot. >> >> >> Regards, >> Raja. >> > > > > -- > > Regards, > Ashwin. >
