Hi Satish,
Thanks for the response. That does sound something like what we need. We'll
give it shot to see if covers our use case.
Thanks,
Jason
From: Satish Duggana <[email protected]>
To: [email protected]; Jason Kania <[email protected]>
Sent: Saturday, June 11, 2016 1:10 PM
Subject: Re: Access to storm fieldsGrouping hashing method in bolt/spout
Hi,
It seems you wanted to send tuples to a bolt's task from which those tuples
were received in the current bolt.
Initial bolt can send the current
task-id(org.apache.storm.task.TopologyContext#getThisTaskId()) as part of tuple
fields and this can be used by subsequent bolt to emit tuples directly to the
earlier bolt's task using `OutputCollector#emiteDirect(int taskId, String
streamId, Collection<Tuple> anchors, List<Object> tuple) `
Hope it helps,Satish.
On Fri, Jun 10, 2016 at 7:47 PM, Jason Kania <[email protected]> wrote:
Thanks for the response and the code reference.
To explain the use case, we would like to be able to use shuffle grouping for
initial load balancing but then use fields grouping and some generated token to
be able to get back to the same bolt after passing messages to a different bolt
doing other processing. The reason to get back to the same box is because of a
very large media file that needs to be pulled and processed on that box causing
us to want to be sticky to that box.
We have tried using fieldsGrouping on its own but we end up with asymmetries in
the load across the boxes simply because we only have 5 boxes doing this
specific processing and hashing does not allow it to be evenly balanced.
Right now, we are generating random token values and then sending them out to
be collected by the different bolts. The bolts then use the tokens that have
arrived as the fieldsGrouping key for subsequent operations. The ideal would be
if we could directly get a token that would allow return to the same bolt
instance to complete the job.
I am wondering if something like this would be worthy of an enhancement request?
Jason
From: Matthias J. Sax <[email protected]>
To: [email protected]
Sent: Wednesday, June 8, 2016 1:46 PM
Subject: Re: Access to storm fieldsGrouping hashing method in bolt/spout
I cannot completely follow you use case and what you want to accomplish...
However, even as there is no official API to call the hash function, it
is actually pretty simple. Storm internally creates a List of the
fieldsGrouping attributes and call .hashCode() on it (and of course
applies a modulo afterwards).
I actually wrote a wrapper to call the internal hash function once (it
works for 0.9.3 -- not sure if it is compatible with newer versions).
You can find the code here:
https://github.com/mjsax/aeolus/tree/master/aeolus-storm-connector
The project assembles a Java class with a static method to expose a
simple to use Java API that call internal Storm core stuff to get the
receiver task ID.
> StormConnector.getFieldsGroupingReceiverTaskId(
TopologyContext ctx,
String producer-component-id,
String output-stream-id,
String receiver-component-id,
List<Object> tuple);
-Matthias
On 06/08/2016 07:02 PM, Jason Kania wrote:
> Hello,
>
> I am wondering if there is a means to access the hashing method/function
> that storm applies for the fieldsGrouping method. I would like to
> generate a token that will hash back to the current node so that
> subsequent processing can come back to the same node . I realize that
> generating an applicable token would be trial and an error, but I would
> like to take advantage of shufflegrouping to assign tasks but then use
> fieldsGrouping to ensure that the rest of the work for the task comes
> back to the same node.
>
> Thanks,
>
> Jason