Unfortunately, that is not the case in 0.9. isReady is called both before a new 
batch and a batch replay. You can see this if you look at the code - isReady is 
called in MasterBatchCoordinator, and whether a batch is emitted or reemitted 
is determined by PartitionedTridentSpoutExecutor, called by 
TridentSpoutExecutor. This is only called after MBC emits a tuple to the $batch 
stream, which is controlled by isReady.

SimonC

From: Karthikeyan Muthukumarasamy [mailto:[email protected]]
Sent: 21 February 2014 01:54
To: [email protected]
Subject: Re: Problems with a coordinator and emitter of an error-handling 
IPartitionedTridentSpout

Hi,
This is what I remember when I last worked on the IPartitionedSpout:
1) When new transactions are to be played, isReady() is called first. You 
return true or false depending on the state of your source
2) If you return true, emitPartitionBatchNew() will be called for each Emitter 
instance with same transactionId and you are suposed to emit what you want to 
be considered as part of that txn from that partition. You also need to return 
a metadata object (typically a Map, or a list, when I tried a java domain 
object, it failed) which will help you replay the exact batch, should a replay 
be required
3) When a transaction has to be replayed, there is no call to isReady(), you 
will be called for emitPartitionBatch() with the meta data object that you last 
returned, so you should be able to replay those emits. This seems logical 
because there is no need to check if you are ready when you are doing a replay
Coming to your question on emitter and coordinator running on same machine, No, 
thats never guaranteed so you cant use an in-memory static queue mechanism. If 
you need to share something between the coordinator and emitter, it should be 
based on an external store like Redis or database etc.
Hope this helps!
MK

On Thu, Feb 20, 2014 at 12:05 PM, Simon Cooper 
<[email protected]<mailto:[email protected]>> wrote:
I'm trying to implement an error-handling IPartitionedTridentSpout that limits 
the number of retries of a batch. The problem I've got is the interaction 
between the coordinator and emitter.

The spout reads from a kafka queue. If there's no messages been put on the 
queue recently, then the coordinator returns false from isReady. When there are 
messages, the coordinator returns true and lets the emitter read the spouts. 
This leads to some rather interesting code in which the coordinator has to try 
and guess which offset the emitter is currently reading from, but that's beside 
the point.

The problem comes from when a batch is retried. When a batch is retried, 
isReady is called with the txid to be replayed. In this situation, the 
coordinator must always return true, even if no more messages are on the queue, 
as the emitter is reading messages already on the queue from a previous batch. 
But the coordinator has no way of knowing if the txid passed to isReady() is a 
replay or a new transaction, as the txid is a raw long, not a 
TransactionAttempt.

So either the emitter does not replay a batch until some new messages are put 
on the queue, or the coordinator simply returns true in every call to isReady 
and the kafka server gets hammered (and lots of txs go through the system 
unnecessarily). Both solutions are bad.

What's the solution to this? Do the emitter and coordinator need their own 
method of communication other than the interfaces provided by trident? Are the 
emitter and coordinator guaranteed to always run on the same worker, so I can 
use an in-memory static queue or something for extra coordinator <-> emitter 
communication? Or am I misunderstanding the meaning of isReady()?

It would be really really helpful if the coordinator could be passed a 
TransactionAttempt to isReady rather than a raw long...

Thanks,
SimonC

Reply via email to