My Bad Ravi that I could not explain my point properly .
Like you said in the fail method when u call outputCollector.fail(tuple) ,
ISpout.fail(Object messageId) gets invoked.
May be u r passing String as this messageId but the argument itself says
Object .
So what I meant to say that you can pass your same object as MessageId
(perhaps name it better for understanding) .
To Elaborate , Lets say I pass along simple Java Bean from Spout to Bolt .
Here is my sample Bean
public class Bean implements Serializable {
private String a; // your custom attributes which are serializable .
private int b; //// your custom attributes which are serializable .
private int c; // your custom attributes which are serializable .
private String msgId; //your custom attributes which are serializable .
private String failureReason; // Failure Reason ..to be populated
inside bolt when tuple fails
//getter setter Methods
}
In your Spout inside nextTuple . Taking example from word count of Storm
Starter project
public void nextTuple() {
try{
final String[] words = new String[] { "nathan", "mike", "jackson",
"golda", "bertels" };
final Random rand = new Random();
final String word = words[rand.nextInt(words.length)];
String msgId = word + UUID.randomUUID().toString();
Bean b = new Bean();
b.setA("String A");
b.setB(123);
b.setC(456);
b.setMsgId(msgId); // not necessary to do
* _collector.emit(new Values(word,b) , b);*
LOG.info("Exit nextTuple Method ");
}
catch(Exception ex)
{
ex.printStackTrace();
}
LOG.info("Final Exit nextTuple method ");
}
See the _collector.emit . I am passing the same bean object as MessageId
Object .
*And declareOutputFields method as *
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word" , "correlationObject"));
}
Now in my Bolt
In the execute Method
@Override
public void execute(Tuple tuple) {
Bean b1 = (Bean)tuple.getValue(1);
* b1.setFailureReason("It failed ");*
_collector.fail(tuple);
}
Now , when _collector.fail method is called Spout's fail method gets
invoked
public void fail(Object msgId) {
Bean b1 = (Bean) msgId;
String failureReason = b1.getFailureReason();
}
*You will see the failureReason u set inside ur bolt received here inside
fail method . *
*Again , I am not saying that this is the best way to achieve what u want ,
but just proposing a way it can be done.*
Hope this helps.
Thanks
Ankur
On Mon, Jan 4, 2016 at 2:24 PM, Ravi Sharma <[email protected]> wrote:
> Hi Ankur,
> Various Storm API for this are like this
>
> Bolts recieve Tuple, which is immutable object.
> Once something fails in Bolt we call outputCollector.fail(tuple)
>
> which in turn invoke Spout's ISpout.fail(Object messageId) method.
>
>
>
> now spout gets only the messageId back(which i created when processing
> started from that spout), so not sure how some other info either some
> object or boolean or integer or String can be passed from Bolts to Spout
> after failing.
>
> Thanks
> Ravi
>
>
>
>
>
>
>
>
>
> On Sun, Jan 3, 2016 at 7:00 PM, Ankur Garg <[email protected]> wrote:
>
>> Hi Ravi,
>>
>> May be a very naive answer here but still posting it :
>>
>> I am assuming that once u catch the failed Tuple inside the fail method
>> of your spout all you need to decide is whether this tuple should be
>> replayed or not .
>>
>> I am assuming the object you are sending between spout and bolts are
>> already serializable . How about adding this extra information fore
>> replaying the tuple to the same Object (Since you already thinking of
>> storing it in some external storage I am assuming this info too is
>> serializable) . It may be a simple boolean flag too .
>>
>> For ex :
>>
>> Original Tuple u r sending may be
>>
>> OrigTuple implements Serializable
>> {
>> ObjectA a;
>> ObjectB b;
>> }
>>
>> I am assuming a , b are all serializable or marked transient .
>>
>> Now in case of failure you can attach Object C too which contains failure
>> information or simple boolean Flag which implies to the spout that it needs
>> to be played . For the ones which dont need to be played it takes default
>> value as false .
>>
>>
>> Like I said before , it is a very simple thought but I could think of
>> this may work based on info u provided and assumptions I made.
>>
>> Thanks
>> Ankur
>>
>>
>>
>> On Sun, Jan 3, 2016 at 3:59 PM, Ravi Sharma <[email protected]> wrote:
>>
>>> Hi All,
>>> I would like to send some extra information back to spout when a tuple
>>> is failed in some Bolt, so that Spout can decide if it want it to replay or
>>> just put the message into queue outside storm for admins to view.
>>>
>>> So is there any way i can attach some more information when sending back
>>> failed tuple to spout.?
>>>
>>> One way i can think of is keeping such information outside storm in some
>>> datastore, with Tuple id and spout can lookup that, but looking for some
>>> way to do it via storm without bringing in other integration/datastore.
>>>
>>>
>>> Thanks
>>> Ravi.
>>>
>>
>>
>