Hi Satish,
Thanks for the response. That does sound something like what we need. We'll 
give it shot to see if covers our use case.
Thanks,
Jason

      From: Satish Duggana <[email protected]>
 To: [email protected]; Jason Kania <[email protected]> 
 Sent: Saturday, June 11, 2016 1:10 PM
 Subject: Re: Access to storm fieldsGrouping hashing method in bolt/spout
   
Hi,
It seems you wanted to send tuples to a bolt's task from which those tuples 
were received in the current bolt.
Initial bolt can send the current 
task-id(org.apache.storm.task.TopologyContext#getThisTaskId()) as part of tuple 
fields and this can be used by subsequent bolt to emit tuples directly to the 
earlier bolt's task using `OutputCollector#emiteDirect(int taskId, String 
streamId, Collection<Tuple> anchors, List<Object> tuple) `
Hope it helps,Satish.


On Fri, Jun 10, 2016 at 7:47 PM, Jason Kania <[email protected]> wrote:

Thanks for the response and the code reference.
To explain the use case, we would like to be able to use shuffle grouping for 
initial load balancing but then use fields grouping and some generated token to 
be able to get back to the same bolt after passing messages to a different bolt 
doing other processing. The reason to get back to the same box is because of a 
very large media file that needs to be pulled and processed on that box causing 
us to want to be sticky to that box.
We have tried using fieldsGrouping on its own but we end up with asymmetries in 
the load across the boxes simply because we only have 5 boxes doing this 
specific processing and hashing does not allow it to be evenly balanced.
Right now, we are generating random token values and then sending them out to 
be collected by the different bolts. The bolts then use the tokens that have 
arrived as the fieldsGrouping key for subsequent operations. The ideal would be 
if we could directly get a token that would allow return to the same bolt 
instance to complete the job.
I am wondering if something like this would be worthy of an enhancement request?
Jason
      From: Matthias J. Sax <[email protected]>
 To: [email protected] 
 Sent: Wednesday, June 8, 2016 1:46 PM
 Subject: Re: Access to storm fieldsGrouping hashing method in bolt/spout
  
I cannot completely follow you use case and what you want to accomplish...

However, even as there is no official API to call the hash function, it
is actually pretty simple. Storm internally creates a List of the
fieldsGrouping attributes and call .hashCode() on it (and of course
applies a modulo afterwards).

I actually wrote a wrapper to call the internal hash function once (it
works for 0.9.3 -- not sure if it is compatible with newer versions).
You can find the code here:
https://github.com/mjsax/aeolus/tree/master/aeolus-storm-connector

The project assembles a Java class with a static method to expose a
simple to use Java API that call internal Storm core stuff to get the
receiver task ID.

> StormConnector.getFieldsGroupingReceiverTaskId(
    TopologyContext ctx,
    String producer-component-id,
    String output-stream-id,
    String receiver-component-id,
    List<Object> tuple);

-Matthias

On 06/08/2016 07:02 PM, Jason Kania wrote:
> Hello,
> 
> I am wondering if there is a means to access the hashing method/function
> that storm applies for the fieldsGrouping method. I would like to
> generate a token that will hash back to the current node so that
> subsequent processing can come back to the same node . I realize that
> generating an applicable token would be trial and an error, but I would
> like to take advantage of shufflegrouping to assign tasks but then use
> fieldsGrouping to ensure that the rest of the work for the task comes
> back to the same node.
> 
> Thanks,
> 
> Jason


   



  

Reply via email to