Please take a look at the sample application at:
https://github.com/amberarrow/samples/tree/master/custom-codec

It uses the StreamCodec that you provided where tuples have the form
"i^j^hello" where i runs sequentially
through the natural numbers and j (the key) cycles through the range
(1..5).

When I run this application on the sandbox:

Tuples with key 1 or 5 are all coming to one operator, for example:
2016-07-09 19:49:37,068 DEBUG myapexapp.TestPartition process - 0: tuple =
10210^5^hello, operator id = 3
2016-07-09 19:49:37,068 DEBUG myapexapp.TestPartition process - 1: tuple =
10211^1^hello, operator id = 3

Tuples with key 2 are all coming to one operator, for example:
2016-07-09 19:46:54,558 DEBUG myapexapp.TestPartition process - 0: tuple =
6962^2^hello, operator id = 4
2016-07-09 19:46:54,558 DEBUG myapexapp.TestPartition process - 1: tuple =
6967^2^hello, operator id = 4

Tuples with key 3 or 4 are all coming to one operator, for example:
2016-07-09 19:43:01,058 DEBUG myapexapp.TestPartition process - 0: tuple =
2293^3^hello, operator id = 2
2016-07-09 19:43:01,059 DEBUG myapexapp.TestPartition process - 1: tuple =
2294^4^hello, operator id = 2

So, could you elaborate on what you're seeing and how it differs from what
you're expecting ?

Ram

On Thu, Jul 7, 2016 at 3:37 PM, Raja.Aravapalli <[email protected]>
wrote:

>
>
> Hi Sandesh,
>
>
> Below is the way I am setting codec:
>
> SsnCheckCodec ssnCodec = new SsnCheckCodec(3);
> dag.setInputPortAttribute(ssnCheck.input, PortContext.STREAM_CODEC, ssnCodec);
>
>
> Also, one quick question,
>
> When some operator is running in multiple instances…  will the records
> received by each individual instance will be processed in the same order
> they arrived to that instance of operator ?
>
> Regards,
> Raja.
>
> From: Sandesh Hegde <[email protected]>
> Reply-To: "[email protected]" <[email protected]>
> Date: Thursday, July 7, 2016 at 4:29 PM
> To: "[email protected]" <[email protected]>
> Subject: Re: hashing
>
> Where are you setting the StreamCodec?
>
> Here is one method,
>  dag.setInputPortAttribute(campaignProcessor.input,
> Context.PortContext.STREAM_CODEC, new MyCodec());
>
>
> On Thu, Jul 7, 2016 at 1:57 PM Ashwin Chandra Putta <
> [email protected]> wrote:
>
>> Raja,
>>
>> Is MyOperator the actual name of the operator within your dag? If not,
>> can you replace it with the actual name of the operator and try.
>>
>> Regards,
>> Ashwin.
>>
>> On Thu, Jul 7, 2016 at 1:28 PM, Raja.Aravapalli <
>> [email protected]> wrote:
>>
>>>
>>>
>>> Also, to share some info on the parititoner I am using:
>>>
>>> I am using Stateless Partitioner with below code:
>>>
>>> <property>
>>>     <name>dt.operator.MyOperator.attr.PARTITIONER</name>
>>>     <value>com.datatorrent.common.partitioner.StatelessPartitioner:3</value>
>>> </property>
>>>
>>>
>>> Thanks.
>>>
>>> Regards,
>>> Raja.
>>>
>>> From: "Raja.Aravapalli" <[email protected]>
>>> Date: Thursday, July 7, 2016 at 3:20 PM
>>> To: "[email protected]" <[email protected]>
>>> Subject: hashing
>>>
>>>
>>> Hi,
>>>
>>> I have an operator, which is running in 3 instances I.e partions… And, I
>>> want all the records with same key, here my key is "String" type, to be
>>> transferred to same instance/partition.
>>>
>>> But, I am unable to achieve this with my below codec.
>>>
>>>
>>> import com.datatorrent.lib.codec.KryoSerializableStreamCodec;
>>>
>>> public class MyCodec extends KryoSerializableStreamCodec<String> {
>>>
>>>     @Override
>>>     public int getPartition(String tuple) {
>>>
>>>         String[] toSplit = tuple.split("\\^");
>>>         String exId = toSplit[1];
>>>
>>>         return exId.hashCode();
>>>
>>>     }
>>>
>>> }
>>>
>>>
>>>
>>> Any guidance please…
>>>
>>>
>>> Thanks a lot.
>>>
>>>
>>> Regards,
>>> Raja.
>>>
>>
>>
>>
>> --
>>
>> Regards,
>> Ashwin.
>>
>

Reply via email to