Hello Chesnay,

Thanks for the advice. I've begun adding multiple jobs per Python plan file
here: https://issues.apache.org/jira/browse/FLINK-5183 and
https://github.com/GEOFBOT/flink/tree/FLINK-5183

The functionality of the patch works. I am able to run multiple jobs per
file successfully, but the process doesn't exit once the jobs are done.
This is because the main issue that I am encountering is that although I
added a check for PythonPlanBinder to check for more jobs from the Python
process until the Python process exits, there is a race condition where
Python process usually exits after Java checks to see if it is still
running. Therefore, unless you use a debugger to pause the Java process
until the Python process exits or some other phenomenon happens where
Python exits fast enough, the Java process thinks that the Python process
is still alive and will end up waiting indefinitely for more Python jobs.

Another one minor comment I had with my own patch was that I used global
variables to keep track of the number of execution environments and to
differentiate between different environments. Is there a better way to do
this?

Thanks!

Cheers,
Geoffrey

On Wed, Nov 23, 2016 at 5:41 AM, Chesnay Schepler <ches...@apache.org>
wrote:

Hello,

implementing collect() in python is not that trivial and the gain is
questionable. There is an inherent size limit (think 10mb), and it is
a bit at odds with the deployment model of the Python API.

Something easier would be to execute each iteration of the for-loop as a
separate job and save the result in a file.
Note that right now the Pyhton API can't execute multiple jobs from the
same file; we would need some modifications
in the PythonPlanBinder to allow this.

Regards,
Chesnay


On 20.11.2016 23:54, Geoffrey Mon wrote:

Hello,

I know that the reuse of the data set in my plan is causing the problem
(after one dictionary atom is learned using the data set "S", "S" is
updated for use with the next dictionary atom). When I comment out the line
updating the data set "S", I have no problem and the plan processing phase
takes substantially less time.

I assume that this is because updating and reusing "S" makes the graph of
transformations much more complicated and forces the optimizer to do much
more work, since for example the final value of "S" depends on all previous
operations combined. Is there a way to replace the for loop in my plan so
that I don't cause this complication and so that memory usage is
manageable? I considered making "S" an iterative data set, but I need to
save each dictionary atom to a file, and I wouldn't be able to do that if
"S" was iterative and not finalized.

Perhaps I would be able to collect "S" at the end of each dictionary atom
and then make the new "S" directly from these values. This however would
require that "collect" be implemented in the Python API.

In addition, I don't think the problem is YARN-specific anymore because I
have been able to reproduce it on a local machine.

Cheers,
Geoffrey

On Mon, Nov 14, 2016 at 11:38 AM Geoffrey Mon <geof...@gmail.com> wrote:

Hi Ufuk,

The master instance of the cluster was also a m3.xlarge instance with 15 GB
RAM, which I would've expected to be enough. I have gotten the program to
run successfully on a personal virtual cluster where each node has 8 GB RAM
and where the master node was also a worker node, so the problem appears to
have something to do with YARN's memory behavior (such as on EMR).

Nevertheless, it would probably be a good idea to modify my code to reduce
its memory usage. When running my code on my local cluster, performance was
probably bottlenecked.

The job does use a for loop to run the core operations for a specific
number of times, specified as a command line parameter. If it helps, here
is my code:
Python: https://github.com/quinngroup/pyflink-r1dl/blob/master/R1DL_Flink.py
(L260
is the core for loop)
Java:
https://github.com/quinngroup/flink-r1dl/blob/master/src/main/java/com/github/quinngroup/R1DL.java
(L120
is the core for loop)
I would expect the join operations to be a big cause of the excessive
memory usage.

Thanks!

Geoffrey


On Mon, Nov 14, 2016 at 5:13 AM Ufuk Celebi <u...@apache.org> wrote:

The Python API is in alpha state currently, so we would have to check if it
is related specifically to that. Looping in Chesnay who worked on that.

The JVM GC error happens on the client side as that's where the optimizer
runs. How much memory does the client submitting the job have?

How do you compose the job? Do you have nested loops, e.g. for() { ... bulk
iteration Flink program }?

– Ufuk

On 14 November 2016 at 08:02:26, Geoffrey Mon ( <geof...@gmail.com>
geof...@gmail.com) wrote:
> Hello all,
>
> I have a pretty complicated plan file using the Flink Python API running
on
> a AWS EMR cluster of m3.xlarge instances using YARN. The plan is for a
> dictionary learning algorithm and has to run a sequence of operations many
> times; each sequence involves bulk iterations with join operations and
> other more intensive operations, and depends on the results of the
previous
> sequence. I have found that when the number of times to run this sequence
> of operations is high (e.g. 20) I get this exception:
>
> Uncaught error from thread
> [flink-akka.remote.default-remote-dispatcher-7] shutting down JVM
> since 'akka.jvm-exit-on-fatal-error' is enabled for ActorSystem[flink]
> java.lang.StackOverflowError
> at
java.io.ObjectOutputStream$ReplaceTable.lookup(ObjectOutputStream.java:2399)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1113)
> at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> ..............................
>
> I assume this problem is caused by having to send too many serialized
> operations between Java and Python. When using a Java implementation of
the
> same operations, I also get:
>
> java.lang.OutOfMemoryError: GC overhead limit exceeded
> at
org.apache.flink.optimizer.plan.PlanNode.mergeBranchPlanMaps(PlanNode.java:106)
> at
org.apache.flink.optimizer.plan.PlanNode.mergeBranchPlanMaps(PlanNode.java:99)
> at
org.apache.flink.optimizer.plan.DualInputPlanNode.(DualInputPlanNode.java:90)
> at
org.apache.flink.optimizer.plan.DualInputPlanNode.(DualInputPlanNode.java:69)
> at
org.apache.flink.optimizer.operators.HashJoinBuildSecondProperties.instantiate(HashJoinBuildSecondProperties.java:81)
> at
org.apache.flink.optimizer.dag.TwoInputNode.instantiate(TwoInputNode.java:607)
> ............
>
> The problem seems to caused by YARN's handling of memory, because I have
> gotten the same Python implementation to work on a smaller, local virtual
> cluster that is not using YARN, even though my local cluster has far fewer
> computing resources than the 15 GB RAM m3.xlarge AWS instances that EMR is
> using. After the YARN job has failed, sometimes a python process is left
on
> the cluster using up most of the RAM.
>
> How can I solve this issue? I am unsure of how to reduce the number of
> operations while keeping the same functionality.
>
> Thanks,
> Geoffrey
>





-- 
Geoffrey Mon

*"Are you going to lay there and get killed, or get up and do something
about it?"* *—Unidentified Lieutenant at Easy Red sector, Omaha Beach*

Reply via email to