Yes, we can try the same in 1.11. Meanwhile is there any network or threads
related config that we can tweak for this?

On Fri, Sep 4, 2020 at 12:48 PM Till Rohrmann <trohrm...@apache.org> wrote:

> From the log snippet it is hard to tell. Flink is not only interacting
> with GCS but also with ZooKeeper to store a pointer to the serialized
> JobGraph. This can also take some time. Then of course, there could be an
> issue with the GS filesystem implementation you are using. The fs
> throughput could also change a bit with time. In the logs you shared,
> uploading of the blobs takes at most 7s. In another run you stated that it
> would take 10s.
>
> Have you tried whether the same behaviour is observable with the latest
> Flink version?
>
> Cheers,
> Till
>
> On Fri, Sep 4, 2020 at 6:44 AM Prakhar Mathur <prakha...@gojek.com> wrote:
>
>> Yes, I will check that, but any pointers on why Flink is taking more time
>> than gsutil upload?
>>
>> On Thu, Sep 3, 2020 at 10:14 PM Till Rohrmann <trohrm...@apache.org>
>> wrote:
>>
>>> Hmm then it probably rules GCS out. What about ZooKeeper? Have you
>>> experienced slow response times from your ZooKeeper cluster?
>>>
>>> Cheers,
>>> Till
>>>
>>> On Thu, Sep 3, 2020 at 6:23 PM Prakhar Mathur <prakha...@gojek.com>
>>> wrote:
>>>
>>>> We tried uploading the same blob from Job Manager k8s pod directly to
>>>> GCS using gsutils and it took 2 seconds. The upload speed was 166.8
>>>> MiB/s. Thanks.
>>>>
>>>> On Wed, Sep 2, 2020 at 6:14 PM Till Rohrmann <trohrm...@apache.org>
>>>> wrote:
>>>>
>>>>> The logs don't look suspicious. Could you maybe check what the write
>>>>> bandwidth to your GCS bucket is from the machine you are running Flink on?
>>>>> It should be enough to generate a 200 MB file and write it to GCS. Thanks 
>>>>> a
>>>>> lot for your help in debugging this matter.
>>>>>
>>>>> Cheers,
>>>>> Till
>>>>>
>>>>> On Wed, Sep 2, 2020 at 1:04 PM Prakhar Mathur <prakha...@gojek.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> Thanks for the response. Yes, we are running Flink in HA mode. We
>>>>>> checked there are no such quota limits for GCS for us. Please find the 
>>>>>> logs
>>>>>> below, here you can see the copying of blob started at 11:50:39,455 and 
>>>>>> it
>>>>>> got JobGraph submission at 11:50:46,400.
>>>>>>
>>>>>> 2020-09-01 11:50:37,061 DEBUG
>>>>>> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl  -
>>>>>> Release TaskExecutor 2e20ee286a3fee5831fefc0ab427ba92 because it exceeded
>>>>>> the idle timeout.
>>>>>> 2020-09-01 11:50:37,061 DEBUG
>>>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
>>>>>> Worker 4e4ae8b90f911787ac112c2847759512 could not be stopped.
>>>>>> 2020-09-01 11:50:37,062 DEBUG
>>>>>> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl  -
>>>>>> Release TaskExecutor 032a62eff2a7d8067f25b8fc943f262f because it exceeded
>>>>>> the idle timeout.
>>>>>> 2020-09-01 11:50:37,062 DEBUG
>>>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
>>>>>> Worker 2532ed667a2566e06c3d9e3bc85c6ed6 could not be stopped.
>>>>>> 2020-09-01 11:50:37,305 DEBUG
>>>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
>>>>>> Trigger heartbeat request.
>>>>>> 2020-09-01 11:50:37,305 DEBUG
>>>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
>>>>>> Trigger heartbeat request.
>>>>>> 2020-09-01 11:50:37,354 DEBUG
>>>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
>>>>>> Received heartbeat from 4e4ae8b90f911787ac112c2847759512.
>>>>>> 2020-09-01 11:50:37,354 DEBUG
>>>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
>>>>>> Received heartbeat from 2532ed667a2566e06c3d9e3bc85c6ed6.
>>>>>> 2020-09-01 11:50:39,455 DEBUG
>>>>>> org.apache.flink.runtime.blob.FileSystemBlobStore             - Copying
>>>>>> from
>>>>>> /tmp/flink-blobs/blobStore-6e468470-ba5f-4ea0-a8fd-cf31af663f11/job_980d3ff229b7fbfe889e2bc93e526da0/blob_p-90776d1fe82af438f6fe2c4385461fe6cb96d25a-86f63972fbf1e10b502f1640fe01b426
>>>>>> to
>>>>>> gs://<test-bucket>/blob/job_980d3ff229b7fbfe889e2bc93e526da0/blob_p-90776d1fe82af438f6fe2c4385461fe6cb96d25a-86f63972fbf1e10b502f1640fe01b426.
>>>>>> 2020-09-01 11:50:43,904 DEBUG
>>>>>> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Got
>>>>>> ping response for sessionid: 0x30be3d929102460 after 2ms
>>>>>> 2020-09-01 11:50:46,400 INFO
>>>>>>  org.apache.flink.runtime.dispatcher.StandaloneDispatcher      - Received
>>>>>> JobGraph submission 980d3ff229b7fbfe889e2bc93e526da0 (cli-test-001).
>>>>>> 2020-09-01 11:50:46,403 INFO
>>>>>>  org.apache.flink.runtime.dispatcher.StandaloneDispatcher      - 
>>>>>> Submitting
>>>>>> job 980d3ff229b7fbfe889e2bc93e526da0 (cli-test-001).
>>>>>> 2020-09-01 11:50:46,405 DEBUG
>>>>>> org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  -
>>>>>> Adding job graph 980d3ff229b7fbfe889e2bc93e526da0 to
>>>>>> flink/cluster/jobgraphs/980d3ff229b7fbfe889e2bc93e526da0.
>>>>>> 2020-09-01 11:50:47,325 DEBUG
>>>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
>>>>>> Trigger heartbeat request.
>>>>>> 2020-09-01 11:50:47,325 DEBUG
>>>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
>>>>>> Trigger heartbeat request.
>>>>>> 2020-09-01 11:50:47,325 DEBUG
>>>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
>>>>>> Trigger heartbeat request.
>>>>>> 2020-09-01 11:50:47,325 DEBUG
>>>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
>>>>>> Trigger heartbeat request.
>>>>>> 2020-09-01 11:50:47,330 DEBUG
>>>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
>>>>>> Received heartbeat from 4e4ae8b90f911787ac112c2847759512.
>>>>>> 2020-09-01 11:50:47,331 DEBUG
>>>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
>>>>>> Received heartbeat from 2532ed667a2566e06c3d9e3bc85c6ed6.
>>>>>> 2020-09-01 11:50:52,880 DEBUG
>>>>>> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Got
>>>>>> notification sessionid:0x30be3d929102460
>>>>>> 2020-09-01 11:50:52,880 DEBUG
>>>>>> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Got
>>>>>> WatchedEvent state:SyncConnected type:NodeChildrenChanged
>>>>>> path:/flink/cluster/jobgraphs for sessionid 0x30be3d929102460
>>>>>> 2020-09-01 11:50:52,882 INFO
>>>>>>  org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore  -
>>>>>> Added SubmittedJobGraph(980d3ff229b7fbfe889e2bc93e526da0) to ZooKeeper.
>>>>>>
>>>>>> Thank You.
>>>>>>
>>>>>> On Wed, Sep 2, 2020 at 2:06 PM Till Rohrmann <trohrm...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Prakhar,
>>>>>>>
>>>>>>> have you enabled HA for your cluster? If yes, then Flink will try to
>>>>>>> store the job graph to the configured high-availability.storageDir in 
>>>>>>> order
>>>>>>> to be able to recover it. If this operation takes long, then it is 
>>>>>>> either
>>>>>>> the filesystem which is slow or storing the pointer in ZooKeeper. If it 
>>>>>>> is
>>>>>>> the filesystem, then I would suggest to check whether you have some
>>>>>>> read/write quotas which might slow the operation down.
>>>>>>>
>>>>>>> If you haven't enabled HA or persisting the jobGraph is not what
>>>>>>> takes long, then the next most likely candidate is the recovery from a
>>>>>>> previous checkpoint. Here again, Flink needs to read from the remote
>>>>>>> storage (in your case GCS). Depending on the size of the checkpoint and 
>>>>>>> the
>>>>>>> read bandwidth, this can be faster or slower. The best way to figure out
>>>>>>> what takes long is to share the logs with us so that we can confirm what
>>>>>>> takes long.
>>>>>>>
>>>>>>> To sum it up, the job submission is most likely slow because of the
>>>>>>> interplay of Flink with the external system (most likely your configured
>>>>>>> filesystem). If the filesystem is somewhat throttled, then Flink cannot 
>>>>>>> do
>>>>>>> much about it.
>>>>>>>
>>>>>>> What you could try to do is to check whether your jar contains
>>>>>>> dependencies which are not needed (e.g. Flink dependencies which are
>>>>>>> usually provided by the system). That way you could decrease the size of
>>>>>>> the jar a bit.
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Till
>>>>>>>
>>>>>>> On Wed, Sep 2, 2020 at 9:48 AM Prakhar Mathur <prakha...@gojek.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> We are currently running Flink 1.9.0. We see a delay of around 20
>>>>>>>> seconds in order to start a job on a session Flink cluster. We start 
>>>>>>>> the
>>>>>>>> job using Flink's monitoring REST API where our jar is already 
>>>>>>>> uploaded on
>>>>>>>> Job Manager. Our jar file size is around 200 MB. We are using memory 
>>>>>>>> state
>>>>>>>> backend having GCS as remote storage.
>>>>>>>>
>>>>>>>> On running the cluster in debug mode, we observed that generating
>>>>>>>> the plan itself takes around 6 seconds and copying job graph from 
>>>>>>>> local to
>>>>>>>> the remote folder takes around 10 seconds.
>>>>>>>>
>>>>>>>> We were wondering whether this delay is expected or if it can be
>>>>>>>> reduced via tweaking any configuration?
>>>>>>>>
>>>>>>>> Thank you. Regards
>>>>>>>> Prakhar Mathur
>>>>>>>>
>>>>>>>

Reply via email to