Where are you setting the StreamCodec?

Here is one method,
 dag.setInputPortAttribute(campaignProcessor.input,
Context.PortContext.STREAM_CODEC, new MyCodec());


On Thu, Jul 7, 2016 at 1:57 PM Ashwin Chandra Putta <
[email protected]> wrote:

> Raja,
>
> Is MyOperator the actual name of the operator within your dag? If not,
> can you replace it with the actual name of the operator and try.
>
> Regards,
> Ashwin.
>
> On Thu, Jul 7, 2016 at 1:28 PM, Raja.Aravapalli <
> [email protected]> wrote:
>
>>
>>
>> Also, to share some info on the parititoner I am using:
>>
>> I am using Stateless Partitioner with below code:
>>
>> <property>
>>     <name>dt.operator.MyOperator.attr.PARTITIONER</name>
>>     <value>com.datatorrent.common.partitioner.StatelessPartitioner:3</value>
>> </property>
>>
>>
>> Thanks.
>>
>> Regards,
>> Raja.
>>
>> From: "Raja.Aravapalli" <[email protected]>
>> Date: Thursday, July 7, 2016 at 3:20 PM
>> To: "[email protected]" <[email protected]>
>> Subject: hashing
>>
>>
>> Hi,
>>
>> I have an operator, which is running in 3 instances I.e partions… And, I
>> want all the records with same key, here my key is "String" type, to be
>> transferred to same instance/partition.
>>
>> But, I am unable to achieve this with my below codec.
>>
>>
>> import com.datatorrent.lib.codec.KryoSerializableStreamCodec;
>>
>> public class MyCodec extends KryoSerializableStreamCodec<String> {
>>
>>     @Override
>>     public int getPartition(String tuple) {
>>
>>         String[] toSplit = tuple.split("\\^");
>>         String exId = toSplit[1];
>>
>>         return exId.hashCode();
>>
>>     }
>>
>> }
>>
>>
>>
>> Any guidance please…
>>
>>
>> Thanks a lot.
>>
>>
>> Regards,
>> Raja.
>>
>
>
>
> --
>
> Regards,
> Ashwin.
>

Reply via email to