>From the log snippet it is hard to tell. Flink is not only interacting with
GCS but also with ZooKeeper to store a pointer to the serialized JobGraph.
This can also take some time. Then of course, there could be an issue with
the GS filesystem implementation you are using. The fs throughput could
also change a bit with time. In the logs you shared, uploading of the blobs
takes at most 7s. In another run you stated that it would take 10s.

Have you tried whether the same behaviour is observable with the latest
Flink version?

Cheers,
Till

On Fri, Sep 4, 2020 at 6:44 AM Prakhar Mathur <prakha...@gojek.com> wrote:

> Yes, I will check that, but any pointers on why Flink is taking more time
> than gsutil upload?
>
> On Thu, Sep 3, 2020 at 10:14 PM Till Rohrmann <trohrm...@apache.org>
> wrote:
>
>> Hmm then it probably rules GCS out. What about ZooKeeper? Have you
>> experienced slow response times from your ZooKeeper cluster?
>>
>> Cheers,
>> Till
>>
>> On Thu, Sep 3, 2020 at 6:23 PM Prakhar Mathur <prakha...@gojek.com>
>> wrote:
>>
>>> We tried uploading the same blob from Job Manager k8s pod directly to
>>> GCS using gsutils and it took 2 seconds. The upload speed was 166.8
>>> MiB/s. Thanks.
>>>
>>> On Wed, Sep 2, 2020 at 6:14 PM Till Rohrmann <trohrm...@apache.org>
>>> wrote:
>>>
>>>> The logs don't look suspicious. Could you maybe check what the write
>>>> bandwidth to your GCS bucket is from the machine you are running Flink on?
>>>> It should be enough to generate a 200 MB file and write it to GCS. Thanks a
>>>> lot for your help in debugging this matter.
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Wed, Sep 2, 2020 at 1:04 PM Prakhar Mathur <prakha...@gojek.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> Thanks for the response. Yes, we are running Flink in HA mode. We
>>>>> checked there are no such quota limits for GCS for us. Please find the 
>>>>> logs
>>>>> below, here you can see the copying of blob started at 11:50:39,455 and it
>>>>> got JobGraph submission at 11:50:46,400.
>>>>>
>>>>> 2020-09-01 11:50:37,061 DEBUG
>>>>> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl  -
>>>>> Release TaskExecutor 2e20ee286a3fee5831fefc0ab427ba92 because it exceeded
>>>>> the idle timeout.
>>>>> 2020-09-01 11:50:37,061 DEBUG
>>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
>>>>> Worker 4e4ae8b90f911787ac112c2847759512 could not be stopped.
>>>>> 2020-09-01 11:50:37,062 DEBUG
>>>>> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl  -
>>>>> Release TaskExecutor 032a62eff2a7d8067f25b8fc943f262f because it exceeded
>>>>> the idle timeout.
>>>>> 2020-09-01 11:50:37,062 DEBUG
>>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
>>>>> Worker 2532ed667a2566e06c3d9e3bc85c6ed6 could not be stopped.
>>>>> 2020-09-01 11:50:37,305 DEBUG
>>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
>>>>> Trigger heartbeat request.
>>>>> 2020-09-01 11:50:37,305 DEBUG
>>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
>>>>> Trigger heartbeat request.
>>>>> 2020-09-01 11:50:37,354 DEBUG
>>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
>>>>> Received heartbeat from 4e4ae8b90f911787ac112c2847759512.
>>>>> 2020-09-01 11:50:37,354 DEBUG
>>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
>>>>> Received heartbeat from 2532ed667a2566e06c3d9e3bc85c6ed6.
>>>>> 2020-09-01 11:50:39,455 DEBUG
>>>>> org.apache.flink.runtime.blob.FileSystemBlobStore             - Copying
>>>>> from
>>>>> /tmp/flink-blobs/blobStore-6e468470-ba5f-4ea0-a8fd-cf31af663f11/job_980d3ff229b7fbfe889e2bc93e526da0/blob_p-90776d1fe82af438f6fe2c4385461fe6cb96d25a-86f63972fbf1e10b502f1640fe01b426
>>>>> to
>>>>> gs://<test-bucket>/blob/job_980d3ff229b7fbfe889e2bc93e526da0/blob_p-90776d1fe82af438f6fe2c4385461fe6cb96d25a-86f63972fbf1e10b502f1640fe01b426.
>>>>> 2020-09-01 11:50:43,904 DEBUG
>>>>> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Got
>>>>> ping response for sessionid: 0x30be3d929102460 after 2ms
>>>>> 2020-09-01 11:50:46,400 INFO
>>>>>  org.apache.flink.runtime.dispatcher.StandaloneDispatcher      - Received
>>>>> JobGraph submission 980d3ff229b7fbfe889e2bc93e526da0 (cli-test-001).
>>>>> 2020-09-01 11:50:46,403 INFO
>>>>>  org.apache.flink.runtime.dispatcher.StandaloneDispatcher      - 
>>>>> Submitting
>>>>> job 980d3ff229b7fbfe889e2bc93e526da0 (cli-test-001).
>>>>> 2020-09-01 11:50:46,405 DEBUG
>>>>> org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  -
>>>>> Adding job graph 980d3ff229b7fbfe889e2bc93e526da0 to
>>>>> flink/cluster/jobgraphs/980d3ff229b7fbfe889e2bc93e526da0.
>>>>> 2020-09-01 11:50:47,325 DEBUG
>>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
>>>>> Trigger heartbeat request.
>>>>> 2020-09-01 11:50:47,325 DEBUG
>>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
>>>>> Trigger heartbeat request.
>>>>> 2020-09-01 11:50:47,325 DEBUG
>>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
>>>>> Trigger heartbeat request.
>>>>> 2020-09-01 11:50:47,325 DEBUG
>>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
>>>>> Trigger heartbeat request.
>>>>> 2020-09-01 11:50:47,330 DEBUG
>>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
>>>>> Received heartbeat from 4e4ae8b90f911787ac112c2847759512.
>>>>> 2020-09-01 11:50:47,331 DEBUG
>>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
>>>>> Received heartbeat from 2532ed667a2566e06c3d9e3bc85c6ed6.
>>>>> 2020-09-01 11:50:52,880 DEBUG
>>>>> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Got
>>>>> notification sessionid:0x30be3d929102460
>>>>> 2020-09-01 11:50:52,880 DEBUG
>>>>> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Got
>>>>> WatchedEvent state:SyncConnected type:NodeChildrenChanged
>>>>> path:/flink/cluster/jobgraphs for sessionid 0x30be3d929102460
>>>>> 2020-09-01 11:50:52,882 INFO
>>>>>  org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  -
>>>>> Added SubmittedJobGraph(980d3ff229b7fbfe889e2bc93e526da0) to ZooKeeper.
>>>>>
>>>>> Thank You.
>>>>>
>>>>> On Wed, Sep 2, 2020 at 2:06 PM Till Rohrmann <trohrm...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Hi Prakhar,
>>>>>>
>>>>>> have you enabled HA for your cluster? If yes, then Flink will try to
>>>>>> store the job graph to the configured high-availability.storageDir in 
>>>>>> order
>>>>>> to be able to recover it. If this operation takes long, then it is either
>>>>>> the filesystem which is slow or storing the pointer in ZooKeeper. If it 
>>>>>> is
>>>>>> the filesystem, then I would suggest to check whether you have some
>>>>>> read/write quotas which might slow the operation down.
>>>>>>
>>>>>> If you haven't enabled HA or persisting the jobGraph is not what
>>>>>> takes long, then the next most likely candidate is the recovery from a
>>>>>> previous checkpoint. Here again, Flink needs to read from the remote
>>>>>> storage (in your case GCS). Depending on the size of the checkpoint and 
>>>>>> the
>>>>>> read bandwidth, this can be faster or slower. The best way to figure out
>>>>>> what takes long is to share the logs with us so that we can confirm what
>>>>>> takes long.
>>>>>>
>>>>>> To sum it up, the job submission is most likely slow because of the
>>>>>> interplay of Flink with the external system (most likely your configured
>>>>>> filesystem). If the filesystem is somewhat throttled, then Flink cannot 
>>>>>> do
>>>>>> much about it.
>>>>>>
>>>>>> What you could try to do is to check whether your jar contains
>>>>>> dependencies which are not needed (e.g. Flink dependencies which are
>>>>>> usually provided by the system). That way you could decrease the size of
>>>>>> the jar a bit.
>>>>>>
>>>>>> Cheers,
>>>>>> Till
>>>>>>
>>>>>> On Wed, Sep 2, 2020 at 9:48 AM Prakhar Mathur <prakha...@gojek.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> We are currently running Flink 1.9.0. We see a delay of around 20
>>>>>>> seconds in order to start a job on a session Flink cluster. We start the
>>>>>>> job using Flink's monitoring REST API where our jar is already uploaded 
>>>>>>> on
>>>>>>> Job Manager. Our jar file size is around 200 MB. We are using memory 
>>>>>>> state
>>>>>>> backend having GCS as remote storage.
>>>>>>>
>>>>>>> On running the cluster in debug mode, we observed that generating
>>>>>>> the plan itself takes around 6 seconds and copying job graph from local 
>>>>>>> to
>>>>>>> the remote folder takes around 10 seconds.
>>>>>>>
>>>>>>> We were wondering whether this delay is expected or if it can be
>>>>>>> reduced via tweaking any configuration?
>>>>>>>
>>>>>>> Thank you. Regards
>>>>>>> Prakhar Mathur
>>>>>>>
>>>>>>

Reply via email to