You can implement your own grouping by using direct grouping (from "*Direct
grouping*: This is a special kind of grouping. A stream grouped this way
means that the *producer* of the tuple decides which task of the consumer
will receive this tuple. Direct groupings can only be declared on streams
that have been declared as direct streams. Tuples emitted to a direct
stream must be emitted using one of the
int, java.util.List) methods. A bolt can get the task ids of its consumers
by either using the provided TopologyContext
or by keeping track of the output of the emit method in OutputCollector
(which returns the task ids that the tuple was sent to)."

Using direct grouping will let the bolt upstream of the ES writing bolt
decide which ES bolt receives a given message. So you could have spouts ->
sorter bolts -> ES bolts, where sorter bolts use direct grouping to
partition the stream by index id in whatever way you need.

On another note, I want to say that ES' bulk API supports writing to
multiple indices in one go, so if you haven't already you should benchmark
to see what the performance penalty of mixing indices in one bulk API call
would be. If the penalty isn't much, you might be fine with shuffle
grouping still.

2017-08-08 2:46 GMT+02:00 Jakes John <>:

> Hi,
>    I need to have a streaming pipeline  Kafka->storm-> ElasticSearch.  The
> volume of message produced to Kafka is in order of  millions. Hence, I need
> to have maximum throughput in Elasticsearch writes.  Each message has an id
> which is mapped to a Elasticsearch index.  The number of possible message
> ids possible are less than 50(which means, max number of created ES
> indices). I would like to batch the ES writes where messages are grouped by
> index *as much as possible*.
> The problem is that message counts per id are dynamic and certain ids can
> have very huge message inflows when compared to other.  Largest message id
> can have > 10x message inflow than the smallest. Hence, shuffle grouping on
> ids doesn't work here.  Partial key grouping also won't work as I need more
> number of output streams for largest message ids.
> eg:  i have 10 tasks that write to ES
> Say all my messages are spread across 2 message ids - ID1, ID2 which I
> have to write to 2 separate index in ES
> Say ID1 has 4 times more messages than ID2 at one instant
> So, the best possible output would be,
> First 8 tasks writes messages with ID1 to ES
> Last 2 tasks writes messages with ID2 to ES
> Say at a different instant,  ID1 has same number of messages as ID2
> So, the best possible output would be,
> First 5 tasks writes messages with ID1 to ES
> Last 5 tasks writes messages with ID2 to ES
> My grouping requirement is just an optimization but it is not a
> requirement.   What is the best way where I can group messages
> *dynamically* on input streams with hugely varying message counts* in the
> best way possible*?  Also, I have the control over creating message ids
> if it helps the data distribution

Reply via email to