StreamCodec is what Apex uses to distribute the tuples. Default codec uses
the hashcode of the tuple, which is not what you want. You need to
implement custom StreamCodec.

One example is below and another one is attached.
https://github.com/DataTorrent/examples/tree/master/tutorials/partition


On Sat, Sep 24, 2016 at 3:40 PM Chris Stockton <[email protected]> wrote:

> If I have a KafkaInputOperator that is reading from 2 partitions and I
> setup a stream to a one-to-one operator, how would I read the messages from
> each Kafka partition, extract a particular field (say user id) and then
> send all the keys to a parallel downstream operator so that the keys are
> grouped together.
>
> Something like this:
>
> Messages coming in on kafka are json of the format {'user_id': 123,
> 'value': 'abc'} where the user_id is some number.  The messages are not
> partitioned on the Kafka topic according to the user_id.
>
> In coming msgs:            repartition by id
>
> id=1, id=2 ==>  K1 -> JsonToPojo --   --> PoJo(1), PoJo(3), PoJo(1)
>                                    \ /
>                                     X
>                                    / \
> id=1, id=4 ==>  K2 -> JsonToPojo --   --> PoJo(4)
>
> Where I transform the Json messages into a POJO like:
>
> class PoJo {
>   int id;
>   String value;
>   ...
> }
>
> Would 'X' need to be a combination of a StreamMerger and some kind of
> custom partitioner, or could the JsonToPojo operator directly partition
> it's output based on the id value?
>
@ApplicationAnnotation(name="Application5")
public class Application5 implements StreamingApplication
{
  @Override
  public void populateDAG(DAG dag, Configuration configuration)
  {
    Generator generator = dag.addOperator("input", new Generator());
    ConsoleOutputOperator consoleOutputOperator = dag.addOperator("output", new ConsoleOutputOperator());

    dag.addStream("stream", generator.outputPort, consoleOutputOperator.input );

    dag.setOperatorAttribute(consoleOutputOperator, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<ConsoleOutputOperator>(3));
    dag.setInputPortAttribute(consoleOutputOperator.input, Context.PortContext.STREAM_CODEC, new Codec());
  }

  public static class Generator extends BaseOperator implements InputOperator
  {
    String names[] = {"x-0", "x-1", "x-2"};
    Random random = new Random();

    @Override
    public void emitTuples()
    {
      try {
        Thread.sleep(200);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }

      outputPort.emit(names[random.nextInt(names.length)]);

    }

    public transient final DefaultOutputPort<String> outputPort = new DefaultOutputPort<>();
  }

  public static class Codec extends DefaultKryoStreamCodec<String>
  {
    @Override
    public int getPartition(String t)
    {
      return Integer.parseInt(t.split("-")[1]);
    }
  }
}

Reply via email to