Thank you Bhupesh and Ram. Appreciate your quick response. I see ThrottlingStatsListener.processStats() method gets called whenever new stats are received from the operators. How frequently these stats are sent by the operators? Is it end of every window?
Thanks, Vishal On Sat, Dec 3, 2016 at 1:46 PM, Munagala Ramanath <[email protected]> wrote: To further clarify Bhupesh's comment, suppose you determine in window N in the input operator the data reading phase is complete and send the control tuple on the dedicated port to the output operator in window N+1. If the downstream operators (including the output operator) P_i are processing respective windows W_i the output operator will not actually see that control tuple until all the W_i have reached N+1. Another option is to use the OperatorRequest mechanism to communicate among the operators out-of-band; an example is at: https://github.com/DataTorrent/examples/tree/master/tutorials/throttle That example shows how to modulate the speed of upstream operators but it can be adapted for your scenario by checking and recording the "completion status" of all the operators. Ram On Sat, Dec 3, 2016 at 5:10 AM, Bhupesh Chawda <[email protected]> wrote: Hi Vishal, A window is processed by an operator only when the previous window is completely processed. When you send the control tuple in a new window, you can be sure that all previous windows have been processed. That is the reason I asked you to send the control tuple in a new window. For shutdown, you can try throwing a ShutdownException() from the input operator. This will propagate through the entire Dag and shutdown all the operators in sequence. ~ Bhupesh On Dec 3, 2016 18:15, "Vishal Agrawal" <[email protected]> wrote: Thank you Bhupesh. Another catch is just because input operator has processed last record doesn't mean all the intermediate operators have processed it as well. How can I ensure that all the operators have processed all the records before performing the write operation. Also is there a way to shutdown the dag programmatically once it has performed the write operation. Thanks, Vishal On Fri, Dec 2, 2016 at 11:11 PM Bhupesh Chawda <[email protected]> wrote: Hi Vishal, The support for such operations is currently being enhanced in Apex. For now, you can do the following: - Have an additional output port in your input operator as well as an input port in the "Writer" operator. - Once the Input operator has read and emitted all the data that it wanted to, you can send a tuple on the new port that you have created. This tuple will act as your signal. Make sure to do this in a new window - ideally if the input is done in window x, send this tuple in window x+1. - When you receive this tuple on the Writer operator, you can perform the write operation on the external system. ~ Bhupesh On Sat, Dec 3, 2016 at 3:56 AM, Vishal Agrawal <[email protected]> wrote: Hi, I am performing a batch operation. My input operator is reading multiple files line by line and then there are bunch of operators manipulating the records to evaluate result. My output operator is supposed to write the final result to external system once all the records from each of the files are processed. On completion of reading all the files, how can I trigger an event which will inform my output operator to perform the write operation on external system. Thanks, Vishal
