Re: [Discuss] Design of the python execution operator

2017-12-22 Thread Thomas Weise
Serialization and resource manager are not involved in CONTAINER_LOCAL.

However, CONTAINER_LOCAL is probably not what you want, since it would
force all partitions to run in the same JVM.

Thomas


On Fri, Dec 22, 2017 at 1:19 PM, Ananth G  wrote:

> I guess my comment below regarding overhead of serialisation in container
> local is wrong ? Nevertheless having a local thread implementation gives
> some benefits . For example I am using to whether sleep if there is no
> request in the queue or spin checking for request presence in the request
> queue etc to take care of no delays in the request queue processing itself.
>
> Regards,
> Ananth
>
> > On 23 Dec 2017, at 6:50 am, Ananth G  wrote:
> >
> >
> > Thanks for the comments Thomas and Pramod.
> >
> > Apologies for the delayed response on this thread.
> >
> > > I believe the thread implementation still adds some value over a
> container local approach. It is more of a “thread local” equivalent which
> is more efficient as opposed to a container local implementation. Also the
> number of worker threads is configurable. Setting the value of 1 will let
> the user to not do this ( although I do not see a reason for why not ).
> There is always the over head of serialise/de-serialize cycle even for a
> container local approach and there is the additional possibility of
> container local not being honoured by the Resource manager based on the
> state of the resources.
> >
> > > Regarding the configurable key to ensure all tuples in a window are
> processed, I am adding a switch which can let the user choose ( and javadoc
> that clearly points out issues if not waiting for the tuples to be
> completely processed ). There are pros and cons for this and letting the
> user decide might be a better approach. The reason why I mention cons for
> waiting the tuples to complete ( apart from the reason that Thomas
> mentioned ) is that if one of the commands that the user wrote is an
> erroneous one, all the subsequent calls to that interpreter thread cal
> fail. An example use case is that tuple A set some value for variable x and
> tuple B that is coming next is making use of the variable x. Syntactically
> expression for tuple B is valid but just that it depends on variable x. Now
> if the variable x is not in memory because tuple A is a straggler resulting
> in tuple B resulting in an erroneous interpreter state. Hence the operator
> might stall definitely as end window will be stalled forever resulting in
> killing of the operator ultimately. This is also because the erroneous
> command corrupted the state of the interpreter itself. Of course this can
> happen to all of the threads in the interpreter worker pool resulting in
> this state as well. Perhaps an improvement of the current implementation is
> to detect all such stalled interpreters for more than x windows and rebuild
> the interpreter thread when such a situation is detected.
> >
> > > Thanks for the IdleTimeoutHandler tip as this helped me to ensure that
> the stragglers are drained out irrespective of a new tuple coming in for
> processing. In the previous iteration, the stragglers could only be drained
> when there is a new tuple that came in processing as delayed responses
> queue could only be checked when there is some activity on the main thread.
> >
> > > Thanks for raising the point about the virtual environments: This is a
> point I missed mentioning in the design description below. There is no
> support for virtual environments yet in JEP and hence the current
> limitation. However the work around is simple. As part of the application
> configuration, we need to provide the JAVA_LIBRARY_PATH which contains the
> path to the JEP dynamic libraries. If there are multiple python installs (
> and hence multiple JEP libraries to choose from for each of the apex
> applications that are being deployed), setting the right path for the
> operator JVM will result in picking the corresponding python interpreter
> version. This also essentially means that we cannot have a thread local
> deployment configuration of two python operators that belong to different
> python versions in the same JVM.  The Docker approach ticket should be the
> right fix for virtual environments issue? https://issues.apache.org/
> jira/browse/APEXCORE-796  jira/browse/APEXCORE-796> ( but still might not solve the thread local
> configuration deployment )
> >
> > Regards,
> > Ananth
> >
> >
> >> On 21 Dec 2017, at 11:01 am, Pramod Immaneni  > wrote:
> >>
> >> On Wed, Dec 20, 2017 at 3:34 PM, Thomas Weise > wrote:
> >>
> >>> It is exciting to see this move forward, the ability to use Python
> opens
> >>> many new possibilities.
> >>>
> >>> Regarding use of worker threads, this is a pattern that we are using
> >>> elsewhere (for example in the Kafka input operator). 

Re: [Discuss] Design of the python execution operator

2017-12-22 Thread Ananth G
I guess my comment below regarding overhead of serialisation in container local 
is wrong ? Nevertheless having a local thread implementation gives some 
benefits . For example I am using to whether sleep if there is no request in 
the queue or spin checking for request presence in the request queue etc to 
take care of no delays in the request queue processing itself.

Regards,
Ananth

> On 23 Dec 2017, at 6:50 am, Ananth G  wrote:
> 
> 
> Thanks for the comments Thomas and Pramod. 
> 
> Apologies for the delayed response on this thread. 
> 
> > I believe the thread implementation still adds some value over a container 
> > local approach. It is more of a “thread local” equivalent which is more 
> > efficient as opposed to a container local implementation. Also the number 
> > of worker threads is configurable. Setting the value of 1 will let the user 
> > to not do this ( although I do not see a reason for why not ). There is 
> > always the over head of serialise/de-serialize cycle even for a container 
> > local approach and there is the additional possibility of container local 
> > not being honoured by the Resource manager based on the state of the 
> > resources. 
> 
> > Regarding the configurable key to ensure all tuples in a window are 
> > processed, I am adding a switch which can let the user choose ( and javadoc 
> > that clearly points out issues if not waiting for the tuples to be 
> > completely processed ). There are pros and cons for this and letting the 
> > user decide might be a better approach. The reason why I mention cons for 
> > waiting the tuples to complete ( apart from the reason that Thomas 
> > mentioned ) is that if one of the commands that the user wrote is an 
> > erroneous one, all the subsequent calls to that interpreter thread cal 
> > fail. An example use case is that tuple A set some value for variable x and 
> > tuple B that is coming next is making use of the variable x. Syntactically 
> > expression for tuple B is valid but just that it depends on variable x. Now 
> > if the variable x is not in memory because tuple A is a straggler resulting 
> > in tuple B resulting in an erroneous interpreter state. Hence the operator 
> > might stall definitely as end window will be stalled forever resulting in 
> > killing of the operator ultimately. This is also because the erroneous 
> > command corrupted the state of the interpreter itself. Of course this can 
> > happen to all of the threads in the interpreter worker pool resulting in 
> > this state as well. Perhaps an improvement of the current implementation is 
> > to detect all such stalled interpreters for more than x windows and rebuild 
> > the interpreter thread when such a situation is detected. 
> 
> > Thanks for the IdleTimeoutHandler tip as this helped me to ensure that the 
> > stragglers are drained out irrespective of a new tuple coming in for 
> > processing. In the previous iteration, the stragglers could only be drained 
> > when there is a new tuple that came in processing as delayed responses 
> > queue could only be checked when there is some activity on the main thread. 
> 
> > Thanks for raising the point about the virtual environments: This is a 
> > point I missed mentioning in the design description below. There is no 
> > support for virtual environments yet in JEP and hence the current 
> > limitation. However the work around is simple. As part of the application 
> > configuration, we need to provide the JAVA_LIBRARY_PATH which contains the 
> > path to the JEP dynamic libraries. If there are multiple python installs ( 
> > and hence multiple JEP libraries to choose from for each of the apex 
> > applications that are being deployed), setting the right path for the 
> > operator JVM will result in picking the corresponding python interpreter 
> > version. This also essentially means that we cannot have a thread local 
> > deployment configuration of two python operators that belong to different 
> > python versions in the same JVM.  The Docker approach ticket should be the 
> > right fix for virtual environments issue? 
> > https://issues.apache.org/jira/browse/APEXCORE-796 
> >  ( but still might not 
> > solve the thread local configuration deployment )
> 
> Regards,
> Ananth
> 
> 
>> On 21 Dec 2017, at 11:01 am, Pramod Immaneni > > wrote:
>> 
>> On Wed, Dec 20, 2017 at 3:34 PM, Thomas Weise > > wrote:
>> 
>>> It is exciting to see this move forward, the ability to use Python opens
>>> many new possibilities.
>>> 
>>> Regarding use of worker threads, this is a pattern that we are using
>>> elsewhere (for example in the Kafka input operator). When the operator
>>> performs blocking operations and consumes little memory and/or CPU, then it
>>> is more economic to first use threads to increase parallelism and
>>> 

Re: [Discuss] Design of the python execution operator

2017-12-22 Thread Ananth G

Thanks for the comments Thomas and Pramod. 

Apologies for the delayed response on this thread. 

> I believe the thread implementation still adds some value over a container 
> local approach. It is more of a “thread local” equivalent which is more 
> efficient as opposed to a container local implementation. Also the number of 
> worker threads is configurable. Setting the value of 1 will let the user to 
> not do this ( although I do not see a reason for why not ). There is always 
> the over head of serialise/de-serialize cycle even for a container local 
> approach and there is the additional possibility of container local not being 
> honoured by the Resource manager based on the state of the resources. 

> Regarding the configurable key to ensure all tuples in a window are 
> processed, I am adding a switch which can let the user choose ( and javadoc 
> that clearly points out issues if not waiting for the tuples to be completely 
> processed ). There are pros and cons for this and letting the user decide 
> might be a better approach. The reason why I mention cons for waiting the 
> tuples to complete ( apart from the reason that Thomas mentioned ) is that if 
> one of the commands that the user wrote is an erroneous one, all the 
> subsequent calls to that interpreter thread cal fail. An example use case is 
> that tuple A set some value for variable x and tuple B that is coming next is 
> making use of the variable x. Syntactically expression for tuple B is valid 
> but just that it depends on variable x. Now if the variable x is not in 
> memory because tuple A is a straggler resulting in tuple B resulting in an 
> erroneous interpreter state. Hence the operator might stall definitely as end 
> window will be stalled forever resulting in killing of the operator 
> ultimately. This is also because the erroneous command corrupted the state of 
> the interpreter itself. Of course this can happen to all of the threads in 
> the interpreter worker pool resulting in this state as well. Perhaps an 
> improvement of the current implementation is to detect all such stalled 
> interpreters for more than x windows and rebuild the interpreter thread when 
> such a situation is detected. 

> Thanks for the IdleTimeoutHandler tip as this helped me to ensure that the 
> stragglers are drained out irrespective of a new tuple coming in for 
> processing. In the previous iteration, the stragglers could only be drained 
> when there is a new tuple that came in processing as delayed responses queue 
> could only be checked when there is some activity on the main thread. 

> Thanks for raising the point about the virtual environments: This is a point 
> I missed mentioning in the design description below. There is no support for 
> virtual environments yet in JEP and hence the current limitation. However the 
> work around is simple. As part of the application configuration, we need to 
> provide the JAVA_LIBRARY_PATH which contains the path to the JEP dynamic 
> libraries. If there are multiple python installs ( and hence multiple JEP 
> libraries to choose from for each of the apex applications that are being 
> deployed), setting the right path for the operator JVM will result in picking 
> the corresponding python interpreter version. This also essentially means 
> that we cannot have a thread local deployment configuration of two python 
> operators that belong to different python versions in the same JVM.  The 
> Docker approach ticket should be the right fix for virtual environments 
> issue? https://issues.apache.org/jira/browse/APEXCORE-796 
>  ( but still might not 
> solve the thread local configuration deployment )

Regards,
Ananth


> On 21 Dec 2017, at 11:01 am, Pramod Immaneni  wrote:
> 
> On Wed, Dec 20, 2017 at 3:34 PM, Thomas Weise  > wrote:
> 
>> It is exciting to see this move forward, the ability to use Python opens
>> many new possibilities.
>> 
>> Regarding use of worker threads, this is a pattern that we are using
>> elsewhere (for example in the Kafka input operator). When the operator
>> performs blocking operations and consumes little memory and/or CPU, then it
>> is more economic to first use threads to increase parallelism and
>> throughput (up to a limit), and then the more expensive containers for
>> horizontal scaling (multiple threads to make good use of container
>> resources and then scale using the usual partitioning).
>> 
> 
> I think there is a difference. In case of kafka or other input operators
> the threads are less constrained. They can operate with independence and
> can dictate the pace limited only by back pressure. In this case the
> operator is most likely going to be downsteram in the DAG and would have
> constraints for processing guarantees. For scalability, container local
> could also be used as a substitue for multiple threads without resorting 

Re: [Discuss] Design of the python execution operator

2017-12-20 Thread Pramod Immaneni
On Wed, Dec 20, 2017 at 3:34 PM, Thomas Weise  wrote:

> It is exciting to see this move forward, the ability to use Python opens
> many new possibilities.
>
> Regarding use of worker threads, this is a pattern that we are using
> elsewhere (for example in the Kafka input operator). When the operator
> performs blocking operations and consumes little memory and/or CPU, then it
> is more economic to first use threads to increase parallelism and
> throughput (up to a limit), and then the more expensive containers for
> horizontal scaling (multiple threads to make good use of container
> resources and then scale using the usual partitioning).
>

I think there is a difference. In case of kafka or other input operators
the threads are less constrained. They can operate with independence and
can dictate the pace limited only by back pressure. In this case the
operator is most likely going to be downsteram in the DAG and would have
constraints for processing guarantees. For scalability, container local
could also be used as a substitue for multiple threads without resorting to
using separate containers. I can understand use of a separate thread to be
able to get around problems like stalled processing but would first try to
see if something like container local would work for scaling.


> It is also correct that generally there is no ordering guarantee within a
> streaming window, and that would be the case when multiple input ports are
> present as well. (The platform cannot guarantee such ordering, this would
> need to be done by the operator).



> Idempotency can be expensive (latency and/or space complexity), and not all
> applications need it (like certain use cases that process record by record
> and don't accumulate state). An example might be Python logic that is used
> for scoring against a model that was built offline. Idempotency would
> actually be rather difficult to implement, since the operator would need to
> remember which tuples were emitted in a given interval and on replay block
> until they are available (and also hold others that may be processed sooner
> than in the original order). It may be easier to record emitted tuples to a
> WAL instead of reprocessing.
>

Ordering cannot be guaranteed but the operator would need to finish the
work it is given a window within the window boundary, otherwise there is a
chance for data loss in recovery scenarios. You could make checkpoint the
boundary by which all pending work is completed instead of every window
boundary but then downstream operators cannot rely on window level
idempotency for exactly once. Something like file output operator would
work but not our db kind of operator. Both options could be supported in
the operator.


> Regarding not emitting stragglers until the next input arrives, can this
> not be accomplished using IdleTimeHandler?
>
> What is preventing the use of virtual environments?
>
> Thanks,
> Thomas
>
>
> On Tue, Dec 19, 2017 at 8:19 AM, Pramod Immaneni 
> wrote:
>
> > Hi Ananth,
> >
> > From your explanation, it looks like the threads overall allow you to
> > achieve two things. Have some sort of overall timeout if by which a tuple
> > doesn't finish processing then it is flagged as such. Second, it doesn't
> > block processing of subsequent tuples and you can still process them
> > meeting the SLA. By checkpoint, however, I think you should try to have a
> > resolution one way or the other for all the tuples received within the
> > checkpoint period or every window boundary (see idempotency below),
> > otherwise, there is a chance of data loss in case of operator restarts.
> If
> > a loss is acceptable for stragglers you could let straggler processing
> > continue beyond checkpoint boundary and let them finish when they can.
> You
> > could support both behaviors by use of a property. Furthermore, you may
> not
> > want all threads stuck with stragglers and then you are back to square
> one
> > so you may need to stop processing stragglers beyond a certain thread
> usage
> > threshold. Is there a way to interrupt the processing of the engine?
> >
> > Then there is the question of idempotency. I suspect it would be
> difficult
> > to maintain it unless you wait for processing to finish for all tuples
> > received during the window every window boundary. You may provide an
> option
> > for relaxing the strict guarantees for the stragglers like mentioned
> above.
> >
> > Pramod
> >
> > On Thu, Dec 14, 2017 at 10:49 AM, Ananth G 
> wrote:
> >
> > > Hello Pramod,
> > >
> > > Thanks for the comments. I adjusted the title of the JIRA. Here is
> what I
> > > was thinking for the worker pool implementation.
> > >
> > > - The main reason ( which I forgot to mention in the design points
> below
> > )
> > > is that the java embedded engine allows only the thread that created
> the
> > > instance to execute the python logic. This is more because of the JNI
> > > 

Re: [Discuss] Design of the python execution operator

2017-12-20 Thread Thomas Weise
It is exciting to see this move forward, the ability to use Python opens
many new possibilities.

Regarding use of worker threads, this is a pattern that we are using
elsewhere (for example in the Kafka input operator). When the operator
performs blocking operations and consumes little memory and/or CPU, then it
is more economic to first use threads to increase parallelism and
throughput (up to a limit), and then the more expensive containers for
horizontal scaling (multiple threads to make good use of container
resources and then scale using the usual partitioning).

It is also correct that generally there is no ordering guarantee within a
streaming window, and that would be the case when multiple input ports are
present as well. (The platform cannot guarantee such ordering, this would
need to be done by the operator).

Idempotency can be expensive (latency and/or space complexity), and not all
applications need it (like certain use cases that process record by record
and don't accumulate state). An example might be Python logic that is used
for scoring against a model that was built offline. Idempotency would
actually be rather difficult to implement, since the operator would need to
remember which tuples were emitted in a given interval and on replay block
until they are available (and also hold others that may be processed sooner
than in the original order). It may be easier to record emitted tuples to a
WAL instead of reprocessing.

Regarding not emitting stragglers until the next input arrives, can this
not be accomplished using IdleTimeHandler?

What is preventing the use of virtual environments?

Thanks,
Thomas


On Tue, Dec 19, 2017 at 8:19 AM, Pramod Immaneni 
wrote:

> Hi Ananth,
>
> From your explanation, it looks like the threads overall allow you to
> achieve two things. Have some sort of overall timeout if by which a tuple
> doesn't finish processing then it is flagged as such. Second, it doesn't
> block processing of subsequent tuples and you can still process them
> meeting the SLA. By checkpoint, however, I think you should try to have a
> resolution one way or the other for all the tuples received within the
> checkpoint period or every window boundary (see idempotency below),
> otherwise, there is a chance of data loss in case of operator restarts. If
> a loss is acceptable for stragglers you could let straggler processing
> continue beyond checkpoint boundary and let them finish when they can. You
> could support both behaviors by use of a property. Furthermore, you may not
> want all threads stuck with stragglers and then you are back to square one
> so you may need to stop processing stragglers beyond a certain thread usage
> threshold. Is there a way to interrupt the processing of the engine?
>
> Then there is the question of idempotency. I suspect it would be difficult
> to maintain it unless you wait for processing to finish for all tuples
> received during the window every window boundary. You may provide an option
> for relaxing the strict guarantees for the stragglers like mentioned above.
>
> Pramod
>
> On Thu, Dec 14, 2017 at 10:49 AM, Ananth G  wrote:
>
> > Hello Pramod,
> >
> > Thanks for the comments. I adjusted the title of the JIRA. Here is what I
> > was thinking for the worker pool implementation.
> >
> > - The main reason ( which I forgot to mention in the design points below
> )
> > is that the java embedded engine allows only the thread that created the
> > instance to execute the python logic. This is more because of the JNI
> > specification itself. Some hints here https://stackoverflow.com/
> > questions/18056347/jni-calling-java-from-c-with-multiple-threads <
> > https://stackoverflow.com/questions/18056347/jni-
> calling-java-from-c-with-
> > multiple-threads> and here http://journals.ecs.soton.ac.
> > uk/java/tutorial/native1.1/implementing/sync.html <
> > http://journals.ecs.soton.ac.uk/java/tutorial/native1.1/
> > implementing/sync.html>
> >
> > - This essentially means that the main operator thread will have to call
> > the python code execution logic if the design were otherwise.
> >
> > - Since the end user can choose to can write any kind of logic including
> > blocking I/O as part of the implementation, I did not want to stall the
> > operator thread for any usage pattern.
> >
> > - In fact there is only one overall interpreter in the JVM process space
> > and the interpreter thread is just a JNI wrapper around it to account for
> > the JNI limitations above.
> >
> > - It is for the very same reason, there is an API in the implementation
> to
> > support for registering Shared Modules across all of the interpreter
> > threads. Use cases for this exist when there is a global variable
> provided
> > by the underlying Python library and loading it multiple times can cause
> > issues. Hence the API to register a shared module which can be used by
> all
> > of the Interpreter Threads.
> >
> > - The operator 

Re: [Discuss] Design of the python execution operator

2017-12-19 Thread Pramod Immaneni
Hi Ananth,

>From your explanation, it looks like the threads overall allow you to
achieve two things. Have some sort of overall timeout if by which a tuple
doesn't finish processing then it is flagged as such. Second, it doesn't
block processing of subsequent tuples and you can still process them
meeting the SLA. By checkpoint, however, I think you should try to have a
resolution one way or the other for all the tuples received within the
checkpoint period or every window boundary (see idempotency below),
otherwise, there is a chance of data loss in case of operator restarts. If
a loss is acceptable for stragglers you could let straggler processing
continue beyond checkpoint boundary and let them finish when they can. You
could support both behaviors by use of a property. Furthermore, you may not
want all threads stuck with stragglers and then you are back to square one
so you may need to stop processing stragglers beyond a certain thread usage
threshold. Is there a way to interrupt the processing of the engine?

Then there is the question of idempotency. I suspect it would be difficult
to maintain it unless you wait for processing to finish for all tuples
received during the window every window boundary. You may provide an option
for relaxing the strict guarantees for the stragglers like mentioned above.

Pramod

On Thu, Dec 14, 2017 at 10:49 AM, Ananth G  wrote:

> Hello Pramod,
>
> Thanks for the comments. I adjusted the title of the JIRA. Here is what I
> was thinking for the worker pool implementation.
>
> - The main reason ( which I forgot to mention in the design points below )
> is that the java embedded engine allows only the thread that created the
> instance to execute the python logic. This is more because of the JNI
> specification itself. Some hints here https://stackoverflow.com/
> questions/18056347/jni-calling-java-from-c-with-multiple-threads <
> https://stackoverflow.com/questions/18056347/jni-calling-java-from-c-with-
> multiple-threads> and here http://journals.ecs.soton.ac.
> uk/java/tutorial/native1.1/implementing/sync.html <
> http://journals.ecs.soton.ac.uk/java/tutorial/native1.1/
> implementing/sync.html>
>
> - This essentially means that the main operator thread will have to call
> the python code execution logic if the design were otherwise.
>
> - Since the end user can choose to can write any kind of logic including
> blocking I/O as part of the implementation, I did not want to stall the
> operator thread for any usage pattern.
>
> - In fact there is only one overall interpreter in the JVM process space
> and the interpreter thread is just a JNI wrapper around it to account for
> the JNI limitations above.
>
> - It is for the very same reason, there is an API in the implementation to
> support for registering Shared Modules across all of the interpreter
> threads. Use cases for this exist when there is a global variable provided
> by the underlying Python library and loading it multiple times can cause
> issues. Hence the API to register a shared module which can be used by all
> of the Interpreter Threads.
>
> - The operator submits to a work request queue and consumes from a
> response queue for each of the interpreter thread. There exists one request
> and one response queue per interpreter thread.
>
> - The stragglers will get drained from the response queue for a previously
> submitted request queue.
>
> - The other reason why I chose to implement it this ways is also for some
> of the use case that I foresee in the ML scoring scenarios. In fraud
> systems, if I have a strict SLA to score a model, the main thread in the
> operator is not helping me implement this pattern at all. The caller to the
> Apex application will need to proceed if the scoring gets delayed due to
> whatever reason. However the scoring can continue on the interpreter thread
> and can be drained later ( It is just that the caller did not make use of
> this result but can still be persisted for operators consuming from the
> straggler port.
>
> - There are 3 output ports for this operator. DefaultOutputPort,
> stragglersPort and an errorPort.
>
> - Some libraries like Tensorflow can become really heavy. Tensorflow
> models can execute a tensorflow DAG as part of a model scoring
> implementation and hence I wanted to take the approach of a worker pool.
> Yes your point is valid if we wait for the stragglers to complete in a
> given window. The current implementation does not force to wait for all of
> the stragglers to complete. The stragglers are emitted only when there is a
> new tuple that is being processed. i.e. when a new tuple arrives for
> scoring , the straggler response queue is checked if there are any entries
> and if yes, the responses are emitted into the stragglerPort. This
> essentially means that there are situations when the straggler port is
> emitting the result for a request submitted in the previous window. This
> also implies that idempotency cannot be 

Re: [Discuss] Design of the python execution operator

2017-12-14 Thread Ananth G
Hello Pramod,

Thanks for the comments. I adjusted the title of the JIRA. Here is what I was 
thinking for the worker pool implementation.

- The main reason ( which I forgot to mention in the design points below ) is 
that the java embedded engine allows only the thread that created the instance 
to execute the python logic. This is more because of the JNI specification 
itself. Some hints here 
https://stackoverflow.com/questions/18056347/jni-calling-java-from-c-with-multiple-threads
 

 and here 
http://journals.ecs.soton.ac.uk/java/tutorial/native1.1/implementing/sync.html 


- This essentially means that the main operator thread will have to call the 
python code execution logic if the design were otherwise.

- Since the end user can choose to can write any kind of logic including 
blocking I/O as part of the implementation, I did not want to stall the 
operator thread for any usage pattern. 

- In fact there is only one overall interpreter in the JVM process space and 
the interpreter thread is just a JNI wrapper around it to account for the JNI 
limitations above.

- It is for the very same reason, there is an API in the implementation to 
support for registering Shared Modules across all of the interpreter threads. 
Use cases for this exist when there is a global variable provided by the 
underlying Python library and loading it multiple times can cause issues. Hence 
the API to register a shared module which can be used by all of the Interpreter 
Threads. 

- The operator submits to a work request queue and consumes from a response 
queue for each of the interpreter thread. There exists one request and one 
response queue per interpreter thread.

- The stragglers will get drained from the response queue for a previously 
submitted request queue. 

- The other reason why I chose to implement it this ways is also for some of 
the use case that I foresee in the ML scoring scenarios. In fraud systems, if I 
have a strict SLA to score a model, the main thread in the operator is not 
helping me implement this pattern at all. The caller to the Apex application 
will need to proceed if the scoring gets delayed due to whatever reason. 
However the scoring can continue on the interpreter thread and can be drained 
later ( It is just that the caller did not make use of this result but can 
still be persisted for operators consuming from the straggler port. 

- There are 3 output ports for this operator. DefaultOutputPort, stragglersPort 
and an errorPort. 

- Some libraries like Tensorflow can become really heavy. Tensorflow models can 
execute a tensorflow DAG as part of a model scoring implementation and hence I 
wanted to take the approach of a worker pool. Yes your point is valid if we 
wait for the stragglers to complete in a given window. The current 
implementation does not force to wait for all of the stragglers to complete. 
The stragglers are emitted only when there is a new tuple that is being 
processed. i.e. when a new tuple arrives for scoring , the straggler response 
queue is checked if there are any entries and if yes, the responses are emitted 
into the stragglerPort. This essentially means that there are situations when 
the straggler port is emitting the result for a request submitted in the 
previous window. This also implies that idempotency cannot be guaranteed across 
runs of the same input data. In fact all threaded implementations have this 
issue as ordering of the results is not guaranteed to be unique even within a 
given window ?

I can enforce a block/drain at the end of the window to force a completion 
basing on the feedback. 


Regards,
Ananth

> On 15 Dec 2017, at 4:21 am, Pramod Immaneni  wrote:
> 
> Hi Anath,
> 
> Sounds interesting and looks like you have put quite a bit of work on it.
> Might I suggest changing the title of 2260 to better fit your proposal and
> implementation, mainly so that there is differentiation from 2261.
> 
> I wanted to discuss the proposal to use multiple threads in an operator
> instance. Unless the execution threads are blocking for some sort of i/o
> why would it result in a noticeable performance difference compared to
> processing in operator thread and running multiple partitions of the
> operator in container local. By running the processing in a separate thread
> from the operator lifecycle thread you don't still get away from matching
> the incoming data throughput. The checkpoint will act as a time where you
> backpressure will start to materialize when the operator would have to wait
> for your background processing to complete to guarantee all data till the
> checkpoint is processed.
> 
> Thanks
> 
> 
> On Thu, Dec 14, 2017 at 2:20 AM, Ananth G  wrote:
> 
>> Hello All,
>> 
>> I would like to submit the design for 

Re: [Discuss] Design of the python execution operator

2017-12-14 Thread Pramod Immaneni
Hi Anath,

Sounds interesting and looks like you have put quite a bit of work on it.
Might I suggest changing the title of 2260 to better fit your proposal and
implementation, mainly so that there is differentiation from 2261.

I wanted to discuss the proposal to use multiple threads in an operator
instance. Unless the execution threads are blocking for some sort of i/o
why would it result in a noticeable performance difference compared to
processing in operator thread and running multiple partitions of the
operator in container local. By running the processing in a separate thread
from the operator lifecycle thread you don't still get away from matching
the incoming data throughput. The checkpoint will act as a time where you
backpressure will start to materialize when the operator would have to wait
for your background processing to complete to guarantee all data till the
checkpoint is processed.

Thanks


On Thu, Dec 14, 2017 at 2:20 AM, Ananth G  wrote:

> Hello All,
>
> I would like to submit the design for the Python execution operator before
> I raise the pull request so that I can refine the implementation based on
> feedback. Could you please provide feedback on the design if any and I will
> raise the PR accordingly.
>
> - This operator is for the JIRA ticket raised here
> https://issues.apache.org/jira/browse/APEXMALHAR-2260 <
> https://issues.apache.org/jira/browse/APEXMALHAR-2260>
> - The operator embeds a python interpreter in the operator JVM process
> space and is not external to the JVM.
> - The implementation is proposing the use of Java Embedded Python ( JEP )
> given here https://github.com/ninia/jep 
> - The JEP engine is under zlib/libpng license. Since this is an approved
> license under https://www.apache.org/legal/resolved.html#category-a <
> https://www.apache.org/legal/resolved.html#category-a> I am assuming it
> is ok for the community to approve the inclusion of this library
> - Python integration is a messy piece due to the nature of dynamic
> libraries. All python libraries need to be natively installed. This also
> means we will not be able bundle python libraries and dependencies as part
> of the build into the target JVM container. Hence this operator has the
> current limitation of the python binaries installed through an external
> process on all of the YARN nodes for now.
> - The JEP maven dependency jar in the POM is a JNI wrapper around the
> dynamic library that is installed externally to the Apex installation
> process on all of the YARN nodes.
> - Hope to take up https://issues.apache.org/jira/browse/APEXCORE-796 <
> https://issues.apache.org/jira/browse/APEXCORE-796> to solve this issue
> in the future.
> - The python operator implementation can be extended to py4J based
> implementation ( as opposed to in-memory model like JEP ) in the future if
> required be. JEP is the implementation based on an in-memory design pattern.
> - The python operator allows for 4 major API patterns
> - Execute a method call by accepting parameters to pass to the
> interpreter
> - Execute a python script as given in a file path
> - Evaluate an expression and allows for passing of variables between
> the java code and the python in-memory interpreter bridge
> - A handy method wherein a series of instructions can be passed in one
> single java call ( executed as a sequence of python eval instructions under
> the hood )
> - Automatic garbage collection of the variables that are passed from java
> code to the in memory python interpreter
> - Support for all major python libraries. Tensorflow, Keras, Scikit,
> xgboost. Preliminary tests for these libraries seem to work as per code
> here : https://github.com/ananthc/sampleapps/tree/master/apache-
> apex/apexjvmpython  sampleapps/tree/master/apache-apex/apexjvmpython>
> - The implementation allows for SLA based execution model. i.e. the
> operator is given a chance to execute the python code and if not complete
> within a time out, the operator code returns back null.
> - A tuple that has become a straggler as per previous point will
> automatically be drained off to a different port so that downstream
> operators can still consume the straggler if they want to when the results
> arrive.
> - Because of the nature of python being an interpreter and if a previous
> tuple is being still processed, there is chance of a back pressure pattern
> building up very quickly. Hence this operator works on the concept of a
> worker pool. The Python operator uses a configurable number of worker
> thread each of which embed the Python interpreter within their processing
> space. i.e. it is in fact a collection of python ink memory interpreters
> inside the Python operator implementation.
> - The operator chooses one of the threads at runtime basing on their busy
> state thus allowing for back-pressure issues to be resolved automatically.
> - There is a