Hi Sandesh,


Below is the way I am setting codec:

SsnCheckCodec ssnCodec = new SsnCheckCodec(3);
dag.setInputPortAttribute(ssnCheck.input, PortContext.STREAM_CODEC, ssnCodec);

Also, one quick question,

When some operator is running in multiple instances…  will the records received 
by each individual instance will be processed in the same order they arrived to 
that instance of operator ?

Regards,
Raja.

From: Sandesh Hegde <[email protected]<mailto:[email protected]>>
Reply-To: "[email protected]<mailto:[email protected]>" 
<[email protected]<mailto:[email protected]>>
Date: Thursday, July 7, 2016 at 4:29 PM
To: "[email protected]<mailto:[email protected]>" 
<[email protected]<mailto:[email protected]>>
Subject: Re: hashing

Where are you setting the StreamCodec?

Here is one method,
 dag.setInputPortAttribute(campaignProcessor.input, 
Context.PortContext.STREAM_CODEC, new MyCodec());


On Thu, Jul 7, 2016 at 1:57 PM Ashwin Chandra Putta 
<[email protected]<mailto:[email protected]>> wrote:
Raja,

Is MyOperator the actual name of the operator within your dag? If not, can you 
replace it with the actual name of the operator and try.

Regards,
Ashwin.

On Thu, Jul 7, 2016 at 1:28 PM, Raja.Aravapalli 
<[email protected]<mailto:[email protected]>> wrote:


Also, to share some info on the parititoner I am using:

I am using Stateless Partitioner with below code:


<property>
    <name>dt.operator.MyOperator.attr.PARTITIONER</name>
    <value>com.datatorrent.common.partitioner.StatelessPartitioner:3</value>
</property>

Thanks.

Regards,
Raja.

From: "Raja.Aravapalli" 
<[email protected]<mailto:[email protected]>>
Date: Thursday, July 7, 2016 at 3:20 PM
To: "[email protected]<mailto:[email protected]>" 
<[email protected]<mailto:[email protected]>>
Subject: hashing


Hi,

I have an operator, which is running in 3 instances I.e partions… And, I want 
all the records with same key, here my key is "String" type, to be transferred 
to same instance/partition.

But, I am unable to achieve this with my below codec.



import com.datatorrent.lib.codec.KryoSerializableStreamCodec;

public class MyCodec extends KryoSerializableStreamCodec<String> {

    @Override
    public int getPartition(String tuple) {

        String[] toSplit = tuple.split("\\^");
        String exId = toSplit[1];

        return exId.hashCode();

    }

}


Any guidance please…


Thanks a lot.


Regards,
Raja.



--

Regards,
Ashwin.

Reply via email to