It was due to too low parallelism.
I increase parallelism large enough (actually set it to the total number of 
task slots on the cluster) and it makes restore from a savepoint much faster.

This is somewhat related to the previous discussion I had with Robert and 
Aljoscha.
Having a standalone cluster consisting of 7 taskmanagers, I wanted to schedule 
session window tasks (each of which holds large state in memory) evenly over 7 
taskmanagers.
To that end, I had to set the number of tasks for session window to 7 as a 
single core seems enough for the simple computation logic of our application.
Luckily enough 7 is smaller than 
ExecutionVertex.MAX_DISTINCT_LOCATIONS_TO_CONSIDER which is hard-coded to 8, so 
7 tasks happen to be evenly scheduled on 7 taskmanagers.
However, I found that, when restoring from a savepoint, each of HDFS clients, 
which are 7 session window tasks, reads a large file (operator state) from HDFS 
for a long period of time at 4MB per second.
The slow speed seems to result from a single thread reads and de-seriealize 
each state entry from state stored on HDFS.
So I use up the total number of task slots for a single streaming job on the 
standalone cluster.
Note that, if I set the number of session window tasks to somewhere between 7 
and the total number of task slots, tasks are scheduled on few taskmanagers and 
the taskmanagers are dead due to lack of memory. And I do not have SSDs so I 
prefer FsStateBackend over RocksDBStateBackend.
Of course the standalone cluster cannot be shared across multiple jobs as we 
don't have free slots anymore.

As it seems like GA release of flink-1.5.0 is around the corner, I divert my 
attention to Mesos and Flip-6 for per-job clusters.
One concern is that multiple taskmanagers can be scheduled on the same node on 
Mesos AFAIK. 

@Eron Is it still a not-yet-solved issue? 
If so, do you think it requires a lot of work to add Fenzo's uniqueness 
constraint to Flink's Mesos ResourceManager?
I want to open an issue and figure it out (hopefully with your kind advice).
p.s. this time my Flink application has nothing to do with GPUs.

Best,

- Dongwon

> 2018. 4. 2. 오후 3:33, Dongwon Kim <eastcirc...@gmail.com> 작성:
> 
> Attached is a log file from a taskmanager.
> Please take a look at the log file considering the below events:
> - Around 01:10:47 : the job is submitted to the job manager.
> - Around 01:16:30 : suddenly source starts to read from and sink starts to 
> write data to Kafka
> 
> Any help would be greatly appreciated! T.T
> 
> Best,
> - Dongwon
> 
> <tm.log>
> 
>> 2018. 4. 2. 오후 2:30, Dongwon Kim <eastcirc...@gmail.com> 작성:
>> 
>> Hi,
>> 
>> While restoring from the latest checkpoint starts immediately after the job 
>> is restarted, restoring from a savepoint takes more than five minutes until 
>> the job makes progress.
>> During the blackout, I cannot observe any resource usage over the cluster.
>> After that period of time, I observe that Flink tries to catch up with the 
>> progress in the source topic via various metrics including 
>> flink_taskmanager_job_task_currentLowWatermark.
>> 
>> FYI, I'm using
>> - Flink-1.4.2
>> - FsStateBackend configured with HDFS
>> - EventTime with BoundedOutOfOrdernessTimestampExtractor
>> 
>> The size of an instance of checkpoint/savepoint is ~50GB and we have 7 
>> servers for taskmanagers.
>> 
>> Best,
>> 
>> - Dongwon
> 

Reply via email to