I'd recommend to use additional output port solution outlined by Bhupesh. There are few Apex applications on the field that leverage that solution.

Thank you,

Vlad

On 12/4/16 11:45, Vishal Agrawal wrote:
Thank you Bhupesh and Ram. Appreciate your quick response.

I see ThrottlingStatsListener.processStats() method gets called whenever new stats are received from the operators. How frequently these stats are sent by the operators? Is it end of every window?


Thanks,
Vishal



On Sat, Dec 3, 2016 at 1:46 PM, Munagala Ramanath <r...@datatorrent.com <mailto:r...@datatorrent.com>> wrote:

    To further clarify Bhupesh's comment, suppose you determine in
    window N in the input operator the
    data reading phase is complete and send the control tuple on the
    dedicated port to the output
    operator in window N+1. If the downstream operators (including the
    output operator) P_i are
    processing respective windows W_i the output operator will
    not actually see that control tuple until all the W_i have reached
    N+1.

    Another option is to use the OperatorRequest mechanism to
    communicate among the operators
    out-of-band; an example is at:
    https://github.com/DataTorrent/examples/tree/master/tutorials/throttle

    That example shows how to modulate the speed of upstream operators
    but it can be adapted for
    your scenario by checking and recording the "completion status" of
    all the operators.

    Ram

    On Sat, Dec 3, 2016 at 5:10 AM, Bhupesh Chawda
    <bhup...@datatorrent.com <mailto:bhup...@datatorrent.com>> wrote:

        Hi Vishal,



        A window is processed by an operator only when the previous
        window is completely processed. When you send the control
        tuple in a new window, you can be sure that all previous
        windows have been processed.


        That is the reason I asked you to send the control tuple in a
        new window.



        For shutdown, you can try throwing a  ShutdownException() from
        the input operator. This will propagate through the entire Dag
        and shutdown all the operators in sequence.



        ~ Bhupesh




        On Dec 3, 2016 18:15, "Vishal Agrawal"
        <vishal.agrawal...@gmail.com
        <mailto:vishal.agrawal...@gmail.com>> wrote:

            Thank you Bhupesh.

            Another catch is just because input operator has processed
            last record doesn't mean all the intermediate operators
            have processed it as well. How can I ensure that all the
            operators have processed all the records before performing
            the write operation.

            Also is there a way to shutdown the dag programmatically
            once it has performed the write operation.


            Thanks,
            Vishal


            On Fri, Dec 2, 2016 at 11:11 PM Bhupesh Chawda
            <bhup...@datatorrent.com <mailto:bhup...@datatorrent.com>>
            wrote:

                Hi Vishal,

                The support for such operations is currently being
                enhanced in Apex.

                For now, you can do the following:
                 - Have an additional output port in your input
                operator as well as an input port in the "Writer"
                operator.
                 - Once the Input operator has read and emitted all
                the data that it wanted to, you can send a tuple on
                the new port that you have created. This tuple will
                act as your signal. Make sure to do this in a new
                window - ideally if the input is done in window x,
                send this tuple in window x+1.
                 - When you receive this tuple on the Writer operator,
                you can perform the write operation on the external
                system.

                ~ Bhupesh

                On Sat, Dec 3, 2016 at 3:56 AM, Vishal Agrawal
                <vishal.agrawal...@gmail.com
                <mailto:vishal.agrawal...@gmail.com>> wrote:

                    Hi,

                    I am performing a batch operation. My input
                    operator is reading multiple files line by line
                    and then there are bunch of operators manipulating
                    the records to evaluate result.
                    My output operator is supposed to write the final
                    result to external system once all the records
                    from each of the files are processed.

                    On completion of reading all the files, how can I
                    trigger an event which will inform my output
                    operator to perform the write operation on
                    external system.


                    Thanks,
                    Vishal




















Reply via email to