Take a look at the attached example, which sends 3 strings to 3 different
instances.
Yes, the tuples going to one partition will be processed in the order they
are sent to that partition.
On Thu, Jul 7, 2016 at 3:57 PM Raja.Aravapalli <[email protected]>
wrote:
>
>
> Hi Sandesh,
>
>
> Below is the way I am setting codec:
>
> SsnCheckCodec ssnCodec = new SsnCheckCodec(3);
> dag.setInputPortAttribute(ssnCheck.input, PortContext.STREAM_CODEC, ssnCodec);
>
>
> Also, one quick question,
>
> When some operator is running in multiple instances… will the records
> received by each individual instance will be processed in the same order
> they arrived to that instance of operator ?
>
> Regards,
> Raja.
>
> From: Sandesh Hegde <[email protected]>
> Reply-To: "[email protected]" <[email protected]>
> Date: Thursday, July 7, 2016 at 4:29 PM
> To: "[email protected]" <[email protected]>
> Subject: Re: hashing
>
> Where are you setting the StreamCodec?
>
> Here is one method,
> dag.setInputPortAttribute(campaignProcessor.input,
> Context.PortContext.STREAM_CODEC, new MyCodec());
>
>
> On Thu, Jul 7, 2016 at 1:57 PM Ashwin Chandra Putta <
> [email protected]> wrote:
>
>> Raja,
>>
>> Is MyOperator the actual name of the operator within your dag? If not,
>> can you replace it with the actual name of the operator and try.
>>
>> Regards,
>> Ashwin.
>>
>> On Thu, Jul 7, 2016 at 1:28 PM, Raja.Aravapalli <
>> [email protected]> wrote:
>>
>>>
>>>
>>> Also, to share some info on the parititoner I am using:
>>>
>>> I am using Stateless Partitioner with below code:
>>>
>>> <property>
>>> <name>dt.operator.MyOperator.attr.PARTITIONER</name>
>>> <value>com.datatorrent.common.partitioner.StatelessPartitioner:3</value>
>>> </property>
>>>
>>>
>>> Thanks.
>>>
>>> Regards,
>>> Raja.
>>>
>>> From: "Raja.Aravapalli" <[email protected]>
>>> Date: Thursday, July 7, 2016 at 3:20 PM
>>> To: "[email protected]" <[email protected]>
>>> Subject: hashing
>>>
>>>
>>> Hi,
>>>
>>> I have an operator, which is running in 3 instances I.e partions… And, I
>>> want all the records with same key, here my key is "String" type, to be
>>> transferred to same instance/partition.
>>>
>>> But, I am unable to achieve this with my below codec.
>>>
>>>
>>> import com.datatorrent.lib.codec.KryoSerializableStreamCodec;
>>>
>>> public class MyCodec extends KryoSerializableStreamCodec<String> {
>>>
>>> @Override
>>> public int getPartition(String tuple) {
>>>
>>> String[] toSplit = tuple.split("\\^");
>>> String exId = toSplit[1];
>>>
>>> return exId.hashCode();
>>>
>>> }
>>>
>>> }
>>>
>>>
>>>
>>> Any guidance please…
>>>
>>>
>>> Thanks a lot.
>>>
>>>
>>> Regards,
>>> Raja.
>>>
>>
>>
>>
>> --
>>
>> Regards,
>> Ashwin.
>>
>
@ApplicationAnnotation(name="Application5")
public class Application5 implements StreamingApplication
{
@Override
public void populateDAG(DAG dag, Configuration configuration)
{
Generator generator = dag.addOperator("input", new Generator());
ConsoleOutputOperator consoleOutputOperator = dag.addOperator("output", new ConsoleOutputOperator());
dag.addStream("stream", generator.outputPort, consoleOutputOperator.input );
dag.setOperatorAttribute(consoleOutputOperator, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<ConsoleOutputOperator>(3));
dag.setInputPortAttribute(consoleOutputOperator.input, Context.PortContext.STREAM_CODEC, new Codec());
}
public static class Generator extends BaseOperator implements InputOperator
{
String names[] = {"x-0", "x-1", "x-2"};
Random random = new Random();
@Override
public void emitTuples()
{
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
outputPort.emit(names[random.nextInt(names.length)]);
}
public transient final DefaultOutputPort<String> outputPort = new DefaultOutputPort<>();
}
public static class Codec extends DefaultKryoStreamCodec<String>
{
@Override
public int getPartition(String t)
{
return Integer.parseInt(t.split("-")[1]);
}
}
}