Take a look at the attached example, which sends 3 strings to 3 different
instances.

Yes, the tuples going to one partition will be processed in the order they
are sent to that partition.

On Thu, Jul 7, 2016 at 3:57 PM Raja.Aravapalli <[email protected]>
wrote:

>
>
> Hi Sandesh,
>
>
> Below is the way I am setting codec:
>
> SsnCheckCodec ssnCodec = new SsnCheckCodec(3);
> dag.setInputPortAttribute(ssnCheck.input, PortContext.STREAM_CODEC, ssnCodec);
>
>
> Also, one quick question,
>
> When some operator is running in multiple instances…  will the records
> received by each individual instance will be processed in the same order
> they arrived to that instance of operator ?
>
> Regards,
> Raja.
>
> From: Sandesh Hegde <[email protected]>
> Reply-To: "[email protected]" <[email protected]>
> Date: Thursday, July 7, 2016 at 4:29 PM
> To: "[email protected]" <[email protected]>
> Subject: Re: hashing
>
> Where are you setting the StreamCodec?
>
> Here is one method,
>  dag.setInputPortAttribute(campaignProcessor.input,
> Context.PortContext.STREAM_CODEC, new MyCodec());
>
>
> On Thu, Jul 7, 2016 at 1:57 PM Ashwin Chandra Putta <
> [email protected]> wrote:
>
>> Raja,
>>
>> Is MyOperator the actual name of the operator within your dag? If not,
>> can you replace it with the actual name of the operator and try.
>>
>> Regards,
>> Ashwin.
>>
>> On Thu, Jul 7, 2016 at 1:28 PM, Raja.Aravapalli <
>> [email protected]> wrote:
>>
>>>
>>>
>>> Also, to share some info on the parititoner I am using:
>>>
>>> I am using Stateless Partitioner with below code:
>>>
>>> <property>
>>>     <name>dt.operator.MyOperator.attr.PARTITIONER</name>
>>>     <value>com.datatorrent.common.partitioner.StatelessPartitioner:3</value>
>>> </property>
>>>
>>>
>>> Thanks.
>>>
>>> Regards,
>>> Raja.
>>>
>>> From: "Raja.Aravapalli" <[email protected]>
>>> Date: Thursday, July 7, 2016 at 3:20 PM
>>> To: "[email protected]" <[email protected]>
>>> Subject: hashing
>>>
>>>
>>> Hi,
>>>
>>> I have an operator, which is running in 3 instances I.e partions… And, I
>>> want all the records with same key, here my key is "String" type, to be
>>> transferred to same instance/partition.
>>>
>>> But, I am unable to achieve this with my below codec.
>>>
>>>
>>> import com.datatorrent.lib.codec.KryoSerializableStreamCodec;
>>>
>>> public class MyCodec extends KryoSerializableStreamCodec<String> {
>>>
>>>     @Override
>>>     public int getPartition(String tuple) {
>>>
>>>         String[] toSplit = tuple.split("\\^");
>>>         String exId = toSplit[1];
>>>
>>>         return exId.hashCode();
>>>
>>>     }
>>>
>>> }
>>>
>>>
>>>
>>> Any guidance please…
>>>
>>>
>>> Thanks a lot.
>>>
>>>
>>> Regards,
>>> Raja.
>>>
>>
>>
>>
>> --
>>
>> Regards,
>> Ashwin.
>>
>
@ApplicationAnnotation(name="Application5")
public class Application5 implements StreamingApplication
{
  @Override
  public void populateDAG(DAG dag, Configuration configuration)
  {
    Generator generator = dag.addOperator("input", new Generator());
    ConsoleOutputOperator consoleOutputOperator = dag.addOperator("output", new ConsoleOutputOperator());

    dag.addStream("stream", generator.outputPort, consoleOutputOperator.input );

    dag.setOperatorAttribute(consoleOutputOperator, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<ConsoleOutputOperator>(3));
    dag.setInputPortAttribute(consoleOutputOperator.input, Context.PortContext.STREAM_CODEC, new Codec());
  }

  public static class Generator extends BaseOperator implements InputOperator
  {
    String names[] = {"x-0", "x-1", "x-2"};
    Random random = new Random();

    @Override
    public void emitTuples()
    {
      try {
        Thread.sleep(200);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }

      outputPort.emit(names[random.nextInt(names.length)]);

    }

    public transient final DefaultOutputPort<String> outputPort = new DefaultOutputPort<>();
  }

  public static class Codec extends DefaultKryoStreamCodec<String>
  {
    @Override
    public int getPartition(String t)
    {
      return Integer.parseInt(t.split("-")[1]);
    }
  }
}

Reply via email to