Hello,

I have a problem for which I hope someone will be able to give a hint.
I am running the Flink *standalone* cluster with 2 Docker containers (1
TaskManager and 1 JobManager) using 1 TaskManager with 30 GB of RAM.

The dataset is a large one: SNAP Friendster, which has around 1800 M edges.
https://snap.stanford.edu/data/com-Friendster.html

I am trying to run the Gelly built-in label propagation algorithm on top of
it.
As this is a very big dataset, I believe I am exceeding the available RAM
and that the system is using secondary storage, which then fails:


Connected to JobManager at Actor[akka.tcp://
flink@172.19.0.2:6123/user/jobmanager#894624508]
12/01/2016 17:58:24    Job execution switched to status RUNNING.
12/01/2016 17:58:24    DataSource (at main(App.java:33)
(org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to
SCHEDULED
12/01/2016 17:58:24    DataSource (at main(App.java:33)
(org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to
DEPLOYING
12/01/2016 17:58:24    DataSource (at main(App.java:33)
(org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to
RUNNING
12/01/2016 17:58:24    Map (Map at fromTuple2DataSet(Graph.java:343))(1/1)
switched to SCHEDULED
12/01/2016 17:58:24    Map (Map at fromTuple2DataSet(Graph.java:343))(1/1)
switched to DEPLOYING
12/01/2016 17:58:24    Map (Map at fromTuple2DataSet(Graph.java:343))(1/1)
switched to RUNNING
12/01/2016 17:59:51    Map (Map at fromTuple2DataSet(Graph.java:343))(1/1)
switched to FAILED
*java.lang.RuntimeException: Error obtaining the sorted input: Thread
'SortMerger spilling thread' terminated due to an exception: No space left
on device*
    at
org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
    at
org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1098)
    at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:86)
    at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486)
    at
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585)
    at java.lang.Thread.run(Thread.java:745)
*Caused by: java.io.IOException: Thread 'SortMerger spilling thread'
terminated due to an exception: No space left on device*
    at
org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
Caused by: java.io.IOException: No space left on device
    at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
    at sun.nio.ch.FileDispatcherImpl.write(FileDispatcherImpl.java:60)
    at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
    at sun.nio.ch.IOUtil.write(IOUtil.java:65)
    at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:211)
    at
org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest.write(AsynchronousFileIOChannel.java:344)
    at
org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$WriterThread.run(IOManagerAsync.java:502)


I do not have secondary storage limitations on the host system, so I
believe the system would be able to handle whatever is spilled to the
disk...
Perhaps this is a Docker limitation regarding the usage of the host's
secondary storage?

Or is there perhaps some configuration or setting for the TaskManager which
I am missing?
Running the label propagation of Gelly on this dataset and cluster
configuration, what would be the expected behavior if the system consumes
all the memory?


I believe the SortMerger thread is associated to the following mechanism
described in this blog post:

https://flink.apache.org/news/2015/03/13/peeking-into-Apache-Flinks-Engine-Room.html
*The Sort-Merge-Join works by first sorting both input data sets on their
join key attributes (Sort Phase) and merging the sorted data sets as a
second step (Merge Phase). The sort is done in-memory if the local
partition of a data set is small enough. Otherwise, an external merge-sort
is done by collecting data until the working memory is filled, sorting it,
writing the sorted data to the local filesystem, and starting over by
filling the working memory again with more incoming data. After all input
data has been received, sorted, and written as sorted runs to the local
file system, a fully sorted stream can be obtained. This is done by reading
the partially sorted runs from the local filesystem and sort-merging the
records on the fly. Once the sorted streams of both inputs are available,
both streams are sequentially read and merge-joined in a zig-zag fashion by
comparing the sorted join key attributes, building join element pairs for
matching keys, and advancing the sorted stream with the lower join key.*

I am still investigating the possibility that Docker is at fault regarding
secondary storage limitations, but how would I go about estimating the
amount of disk space needed for this spilling on this dataset?

Thanks for your time,

My best regards,

Miguel E. Coimbra
Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt>
Skype: miguel.e.coimbra

Reply via email to