Unfortunately, that is not the case in 0.9. isReady is called both before a new batch and a batch replay. You can see this if you look at the code - isReady is called in MasterBatchCoordinator, and whether a batch is emitted or reemitted is determined by PartitionedTridentSpoutExecutor, called by TridentSpoutExecutor. This is only called after MBC emits a tuple to the $batch stream, which is controlled by isReady.
SimonC From: Karthikeyan Muthukumarasamy [mailto:[email protected]] Sent: 21 February 2014 01:54 To: [email protected] Subject: Re: Problems with a coordinator and emitter of an error-handling IPartitionedTridentSpout Hi, This is what I remember when I last worked on the IPartitionedSpout: 1) When new transactions are to be played, isReady() is called first. You return true or false depending on the state of your source 2) If you return true, emitPartitionBatchNew() will be called for each Emitter instance with same transactionId and you are suposed to emit what you want to be considered as part of that txn from that partition. You also need to return a metadata object (typically a Map, or a list, when I tried a java domain object, it failed) which will help you replay the exact batch, should a replay be required 3) When a transaction has to be replayed, there is no call to isReady(), you will be called for emitPartitionBatch() with the meta data object that you last returned, so you should be able to replay those emits. This seems logical because there is no need to check if you are ready when you are doing a replay Coming to your question on emitter and coordinator running on same machine, No, thats never guaranteed so you cant use an in-memory static queue mechanism. If you need to share something between the coordinator and emitter, it should be based on an external store like Redis or database etc. Hope this helps! MK On Thu, Feb 20, 2014 at 12:05 PM, Simon Cooper <[email protected]<mailto:[email protected]>> wrote: I'm trying to implement an error-handling IPartitionedTridentSpout that limits the number of retries of a batch. The problem I've got is the interaction between the coordinator and emitter. The spout reads from a kafka queue. If there's no messages been put on the queue recently, then the coordinator returns false from isReady. When there are messages, the coordinator returns true and lets the emitter read the spouts. This leads to some rather interesting code in which the coordinator has to try and guess which offset the emitter is currently reading from, but that's beside the point. The problem comes from when a batch is retried. When a batch is retried, isReady is called with the txid to be replayed. In this situation, the coordinator must always return true, even if no more messages are on the queue, as the emitter is reading messages already on the queue from a previous batch. But the coordinator has no way of knowing if the txid passed to isReady() is a replay or a new transaction, as the txid is a raw long, not a TransactionAttempt. So either the emitter does not replay a batch until some new messages are put on the queue, or the coordinator simply returns true in every call to isReady and the kafka server gets hammered (and lots of txs go through the system unnecessarily). Both solutions are bad. What's the solution to this? Do the emitter and coordinator need their own method of communication other than the interfaces provided by trident? Are the emitter and coordinator guaranteed to always run on the same worker, so I can use an in-memory static queue or something for extra coordinator <-> emitter communication? Or am I misunderstanding the meaning of isReady()? It would be really really helpful if the coordinator could be passed a TransactionAttempt to isReady rather than a raw long... Thanks, SimonC
