Hi, It seems you wanted to send tuples to a bolt's task from which those tuples were received in the current bolt.
Initial bolt can send the current task-id(org.apache.storm.task.TopologyContext#getThisTaskId()) as part of tuple fields and this can be used by subsequent bolt to emit tuples directly to the earlier bolt's task using `OutputCollector#emiteDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) ` Hope it helps, Satish. On Fri, Jun 10, 2016 at 7:47 PM, Jason Kania <[email protected]> wrote: > Thanks for the response and the code reference. > > To explain the use case, we would like to be able to use shuffle grouping > for initial load balancing but then use fields grouping and some generated > token to be able to get back to the same bolt after passing messages to a > different bolt doing other processing. The reason to get back to the same > box is because of a very large media file that needs to be pulled and > processed on that box causing us to want to be sticky to that box. > > We have tried using fieldsGrouping on its own but we end up with > asymmetries in the load across the boxes simply because we only have 5 > boxes doing this specific processing and hashing does not allow it to be > evenly balanced. > > Right now, we are generating random token values and then sending them out > to be collected by the different bolts. The bolts then use the tokens that > have arrived as the fieldsGrouping key for subsequent operations. The ideal > would be if we could directly get a token that would allow return to the > same bolt instance to complete the job. > > I am wondering if something like this would be worthy of an enhancement > request? > > Jason > > ------------------------------ > *From:* Matthias J. Sax <[email protected]> > *To:* [email protected] > *Sent:* Wednesday, June 8, 2016 1:46 PM > *Subject:* Re: Access to storm fieldsGrouping hashing method in bolt/spout > > I cannot completely follow you use case and what you want to accomplish... > > However, even as there is no official API to call the hash function, it > is actually pretty simple. Storm internally creates a List of the > fieldsGrouping attributes and call .hashCode() on it (and of course > applies a modulo afterwards). > > I actually wrote a wrapper to call the internal hash function once (it > works for 0.9.3 -- not sure if it is compatible with newer versions). > You can find the code here: > https://github.com/mjsax/aeolus/tree/master/aeolus-storm-connector > > The project assembles a Java class with a static method to expose a > simple to use Java API that call internal Storm core stuff to get the > receiver task ID. > > > StormConnector.getFieldsGroupingReceiverTaskId( > TopologyContext ctx, > String producer-component-id, > String output-stream-id, > String receiver-component-id, > List<Object> tuple); > > -Matthias > > On 06/08/2016 07:02 PM, Jason Kania wrote: > > Hello, > > > > I am wondering if there is a means to access the hashing method/function > > that storm applies for the fieldsGrouping method. I would like to > > generate a token that will hash back to the current node so that > > subsequent processing can come back to the same node . I realize that > > generating an applicable token would be trial and an error, but I would > > like to take advantage of shufflegrouping to assign tasks but then use > > fieldsGrouping to ensure that the rest of the work for the task comes > > back to the same node. > > > > Thanks, > > > > Jason > > >
