" Using direct grouping will let the bolt upstream of the ES writing bolt
decide which ES bolt receives a given message. So you could have spouts ->
sorter bolts -> ES bolts, where sorter bolts use direct grouping to
partition the stream by index id in whatever way you need. "
           What is the best way to use direct grouping in a dynamic way?
For eg: The distribution of index ids will be different across time. I
might need more threads for a index during one point while lesser threads
during the other

On Tue, Aug 8, 2017 at 1:35 AM, Stig Rohde Døssing <s...@apache.org> wrote:

> You can implement your own grouping by using direct grouping (from
> http://storm.apache.org/releases/2.0.0-SNAPSHOT/Concepts.html): "*Direct
> grouping*: This is a special kind of grouping. A stream grouped this way
> means that the *producer* of the tuple decides which task of the consumer
> will receive this tuple. Direct groupings can only be declared on streams
> that have been declared as direct streams. Tuples emitted to a direct
> stream must be emitted using one of the [emitDirect](javadocs/org/
> apache/storm/task/OutputCollector.html#emitDirect(int, int,
> java.util.List) methods. A bolt can get the task ids of its consumers by
> either using the provided TopologyContext
> <http://storm.apache.org/releases/2.0.0-SNAPSHOT/javadocs/org/apache/storm/task/TopologyContext.html>
> or by keeping track of the output of the emit method in OutputCollector
> <http://storm.apache.org/releases/2.0.0-SNAPSHOT/javadocs/org/apache/storm/task/OutputCollector.html>
> (which returns the task ids that the tuple was sent to)."
>
> Using direct grouping will let the bolt upstream of the ES writing bolt
> decide which ES bolt receives a given message. So you could have spouts ->
> sorter bolts -> ES bolts, where sorter bolts use direct grouping to
> partition the stream by index id in whatever way you need.
>
> On another note, I want to say that ES' bulk API supports writing to
> multiple indices in one go, so if you haven't already you should benchmark
> to see what the performance penalty of mixing indices in one bulk API call
> would be. If the penalty isn't much, you might be fine with shuffle
> grouping still.
>
> 2017-08-08 2:46 GMT+02:00 Jakes John <jakesjohn12...@gmail.com>:
>
>> Hi,
>>
>>    I need to have a streaming pipeline  Kafka->storm-> ElasticSearch.
>> The volume of message produced to Kafka is in order of  millions. Hence, I
>> need to have maximum throughput in Elasticsearch writes.  Each message has
>> an id which is mapped to a Elasticsearch index.  The number of possible
>> message ids possible are less than 50(which means, max number of created ES
>> indices). I would like to batch the ES writes where messages are grouped by
>> index *as much as possible*.
>> The problem is that message counts per id are dynamic and certain ids can
>> have very huge message inflows when compared to other.  Largest message id
>> can have > 10x message inflow than the smallest. Hence, shuffle grouping on
>> ids doesn't work here.  Partial key grouping also won't work as I need more
>> number of output streams for largest message ids.
>>
>> eg:  i have 10 tasks that write to ES
>>
>>
>> Say all my messages are spread across 2 message ids - ID1, ID2 which I
>> have to write to 2 separate index in ES
>>
>> Say ID1 has 4 times more messages than ID2 at one instant
>>
>> So, the best possible output would be,
>> First 8 tasks writes messages with ID1 to ES
>> Last 2 tasks writes messages with ID2 to ES
>>
>>
>> Say at a different instant,  ID1 has same number of messages as ID2
>>
>> So, the best possible output would be,
>> First 5 tasks writes messages with ID1 to ES
>> Last 5 tasks writes messages with ID2 to ES
>>
>>
>> My grouping requirement is just an optimization but it is not a
>> requirement.   What is the best way where I can group messages
>> *dynamically* on input streams with hugely varying message counts* in
>> the best way possible*?  Also, I have the control over creating message
>> ids if it helps the data distribution
>>
>
>

Reply via email to