Please see Threads section at http://apex.apache.org/docs/apex/development_best_practices/

Note that an operator has only one thread created by the Apex that it can use to interact with the system. Starting with 3.5.0 release, this is enforced by the platform and it will throw an exception if an operator tries to emit on a different thread.

Thank you,

Vlad

/Join us at Apex Big Data World-San Jose <http://www.apexbigdata.com/san-jose.html>, April 4, 2017/ http://www.apexbigdata.com/san-jose-register.html <http://www.apexbigdata.com/san-jose-register.html>
On 2/21/17 11:54, Munagala Ramanath wrote:
To amplify Sandesh's answer a bit, the main operator thread invokes user callbacks like beginWindow(), endWindow(), process() method of input ports, and emitTuples() in input operators.

Additionally, if the operator implements the IdleTimeHandler, and if the operator is deemed to be idle, the handleIdleTime() callback will be called. All tuple output must be done in one of these callbacks.

So you can check the thread-safe queue in any of these callbacks and emit output tuples as needed.

Ram

On Tue, Feb 21, 2017 at 11:42 AM, Sandesh Hegde <sand...@datatorrent.com <mailto:sand...@datatorrent.com>> wrote:

    -Removed dev@

    Operators can implement idle Time Handler.
    
https://www.datatorrent.com/docs/apidocs/com/datatorrent/api/Operator.IdleTimeHandler.html
    
<https://www.datatorrent.com/docs/apidocs/com/datatorrent/api/Operator.IdleTimeHandler.html>

    On Tue, Feb 21, 2017 at 11:33 AM Sunil Parmar
    <spar...@threatmetrix.com <mailto:spar...@threatmetrix.com>> wrote:

        Ram,
        Thanks for the prompt response. If we use the approach you
        suggested we’re dependent on main thread’s process call I.e.
        Tuples in the thread safe queue gets only processed when main
        thread is processing incoming tuples. How can we explicitly
        call the process from polling of delay queue ?

        Just for reference here’s the sample code snippet for our
        operator.

        public class MyOperator extends BaseOperator implements

                Operator.ActivationListener<Context.OperatorContext> {

        …..

        @InputPortFieldAnnotation

        public transient DefaultInputPort<String> kafkaStreamInput =

        new DefaultInputPort<String>() {

                List<String> errors = new ArrayList<String>();

        @Override

        public void process(String consumerRecord) {

        //Code for normal tuple process

        //Code to poll thread safe queue

        }


        *————————————**—————————————————————————*
        *From: *Munagala Ramanath <r...@datatorrent.com
        <mailto:r...@datatorrent.com>>
        *To: *users@apex.apache.org <mailto:users@apex.apache.org>
        *CC: *"d...@apex.apache.org <mailto:d...@apex.apache.org>"
        <d...@apex.apache.org <mailto:d...@apex.apache.org>>, Allan De
        Leon <adel...@threatmetrix.com
        <mailto:adel...@threatmetrix.com>>, Tim Zhu
        <t...@threatmetrix.com <mailto:t...@threatmetrix.com>>
        *Subject: *Re: Occasional Out of order tuples when emitting
        from a thread
        *Date: *2017-02-21 10:08 (-0800)
        *List: *users@apex.apache.org
        <https://lists.apache.org/list.html?users@apex.apache.org>

        Please note that tuples should not be emitted by any thread other than 
the
        main operator thread.

        A common pattern is to use a thread-safe queue and have worker threads
        enqueue
        tuples there; the main operator thread then pulls tuples from the queue 
and
        emits them.

        Ram

        _______________________________________________________

        Munagala V. Ramanath

        Software Engineer

        E:r...@datatorrent.com <mailto:r...@datatorrent.com>  | M:(408) 331-5034 
<tel:%28408%29%20331-5034>  | Twitter: @UnknownRam

www.datatorrent.com <http://www.datatorrent.com> |apex.apache.org <http://apex.apache.org>
        From: Sunil Parmar <spar...@threatmetrix.com
        <mailto:spar...@threatmetrix.com>> Date: Tuesday, February 21,
        2017 at 10:05 AM To: "users@apex.apache.org
        <mailto:users@apex.apache.org>" <users@apex.apache.org
        <mailto:users@apex.apache.org>>, "d...@apex.apache.org
        <mailto:d...@apex.apache.org>" <d...@apex.apache.org
        <mailto:d...@apex.apache.org>> Cc: Allan De Leon
        <adel...@threatmetrix.com <mailto:adel...@threatmetrix.com>>,
        Tim Zhu <t...@threatmetrix.com <mailto:t...@threatmetrix.com>>
        Subject: Occasional Out of order tuples when emitting from a
        thread
        Hi there,
        We have the following setup:

          * we have a generic operator that’s processing tuples in its
            input port
          * in the input port’s process method, we check for a
            condition, and:
              o if the condition is met, the tuple is emitted to the
                next operator right away (in the process method)
              o Otherwise, if the condition is not met, we store the
                tuple  in some cache and we use some threads that
                periodically check the condition to become true. Once
                the condition is true, the threads call the emit
                method on the stored tuples.

        With this setup, we occasionally encounter the following error:
        2017-02-15 17:29:09,364 ERROR
        com.datatorrent.stram.engine.GenericNode: Catastrophic Error:
        Out of sequence BEGIN_WINDOW tuple 58a4046100003b7f on port
        transformedJSON while expecting 58a4046100003b7e
        Is there a way to make the above work correctly?
        If not, can you recommend a better way of doing this?
        How can we ensure window assignment is done synchronously
        before emitting tuples ?
        Thanks very much in advance…
        -allan

-- /Join us at ApexBigDataWorld-San Jose
    <http://www.apexbigdata.com/san-jose.html>, April 4, 2017!/
    http://www.apexbigdata.com/san-jose-register.html
    <http://www.apexbigdata.com/san-jose-register.html>

--

_______________________________________________________

Munagala V. Ramanath

Software Engineer

E: r...@datatorrent.com <mailto:r...@datatorrent.com> | M: (408) 331-5034 | Twitter: @UnknownRam

www.datatorrent.com <http://www.datatorrent.com> | apex.apache.org <http://apex.apache.org>

Reply via email to