Yes, we can try the same in 1.11. Meanwhile is there any network or threads related config that we can tweak for this?
On Fri, Sep 4, 2020 at 12:48 PM Till Rohrmann <trohrm...@apache.org> wrote: > From the log snippet it is hard to tell. Flink is not only interacting > with GCS but also with ZooKeeper to store a pointer to the serialized > JobGraph. This can also take some time. Then of course, there could be an > issue with the GS filesystem implementation you are using. The fs > throughput could also change a bit with time. In the logs you shared, > uploading of the blobs takes at most 7s. In another run you stated that it > would take 10s. > > Have you tried whether the same behaviour is observable with the latest > Flink version? > > Cheers, > Till > > On Fri, Sep 4, 2020 at 6:44 AM Prakhar Mathur <prakha...@gojek.com> wrote: > >> Yes, I will check that, but any pointers on why Flink is taking more time >> than gsutil upload? >> >> On Thu, Sep 3, 2020 at 10:14 PM Till Rohrmann <trohrm...@apache.org> >> wrote: >> >>> Hmm then it probably rules GCS out. What about ZooKeeper? Have you >>> experienced slow response times from your ZooKeeper cluster? >>> >>> Cheers, >>> Till >>> >>> On Thu, Sep 3, 2020 at 6:23 PM Prakhar Mathur <prakha...@gojek.com> >>> wrote: >>> >>>> We tried uploading the same blob from Job Manager k8s pod directly to >>>> GCS using gsutils and it took 2 seconds. The upload speed was 166.8 >>>> MiB/s. Thanks. >>>> >>>> On Wed, Sep 2, 2020 at 6:14 PM Till Rohrmann <trohrm...@apache.org> >>>> wrote: >>>> >>>>> The logs don't look suspicious. Could you maybe check what the write >>>>> bandwidth to your GCS bucket is from the machine you are running Flink on? >>>>> It should be enough to generate a 200 MB file and write it to GCS. Thanks >>>>> a >>>>> lot for your help in debugging this matter. >>>>> >>>>> Cheers, >>>>> Till >>>>> >>>>> On Wed, Sep 2, 2020 at 1:04 PM Prakhar Mathur <prakha...@gojek.com> >>>>> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> Thanks for the response. Yes, we are running Flink in HA mode. We >>>>>> checked there are no such quota limits for GCS for us. Please find the >>>>>> logs >>>>>> below, here you can see the copying of blob started at 11:50:39,455 and >>>>>> it >>>>>> got JobGraph submission at 11:50:46,400. >>>>>> >>>>>> 2020-09-01 11:50:37,061 DEBUG >>>>>> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl - >>>>>> Release TaskExecutor 2e20ee286a3fee5831fefc0ab427ba92 because it exceeded >>>>>> the idle timeout. >>>>>> 2020-09-01 11:50:37,061 DEBUG >>>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - >>>>>> Worker 4e4ae8b90f911787ac112c2847759512 could not be stopped. >>>>>> 2020-09-01 11:50:37,062 DEBUG >>>>>> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl - >>>>>> Release TaskExecutor 032a62eff2a7d8067f25b8fc943f262f because it exceeded >>>>>> the idle timeout. >>>>>> 2020-09-01 11:50:37,062 DEBUG >>>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - >>>>>> Worker 2532ed667a2566e06c3d9e3bc85c6ed6 could not be stopped. >>>>>> 2020-09-01 11:50:37,305 DEBUG >>>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - >>>>>> Trigger heartbeat request. >>>>>> 2020-09-01 11:50:37,305 DEBUG >>>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - >>>>>> Trigger heartbeat request. >>>>>> 2020-09-01 11:50:37,354 DEBUG >>>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - >>>>>> Received heartbeat from 4e4ae8b90f911787ac112c2847759512. >>>>>> 2020-09-01 11:50:37,354 DEBUG >>>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - >>>>>> Received heartbeat from 2532ed667a2566e06c3d9e3bc85c6ed6. >>>>>> 2020-09-01 11:50:39,455 DEBUG >>>>>> org.apache.flink.runtime.blob.FileSystemBlobStore - Copying >>>>>> from >>>>>> /tmp/flink-blobs/blobStore-6e468470-ba5f-4ea0-a8fd-cf31af663f11/job_980d3ff229b7fbfe889e2bc93e526da0/blob_p-90776d1fe82af438f6fe2c4385461fe6cb96d25a-86f63972fbf1e10b502f1640fe01b426 >>>>>> to >>>>>> gs://<test-bucket>/blob/job_980d3ff229b7fbfe889e2bc93e526da0/blob_p-90776d1fe82af438f6fe2c4385461fe6cb96d25a-86f63972fbf1e10b502f1640fe01b426. >>>>>> 2020-09-01 11:50:43,904 DEBUG >>>>>> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - Got >>>>>> ping response for sessionid: 0x30be3d929102460 after 2ms >>>>>> 2020-09-01 11:50:46,400 INFO >>>>>> org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Received >>>>>> JobGraph submission 980d3ff229b7fbfe889e2bc93e526da0 (cli-test-001). >>>>>> 2020-09-01 11:50:46,403 INFO >>>>>> org.apache.flink.runtime.dispatcher.StandaloneDispatcher - >>>>>> Submitting >>>>>> job 980d3ff229b7fbfe889e2bc93e526da0 (cli-test-001). >>>>>> 2020-09-01 11:50:46,405 DEBUG >>>>>> org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore - >>>>>> Adding job graph 980d3ff229b7fbfe889e2bc93e526da0 to >>>>>> flink/cluster/jobgraphs/980d3ff229b7fbfe889e2bc93e526da0. >>>>>> 2020-09-01 11:50:47,325 DEBUG >>>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - >>>>>> Trigger heartbeat request. >>>>>> 2020-09-01 11:50:47,325 DEBUG >>>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - >>>>>> Trigger heartbeat request. >>>>>> 2020-09-01 11:50:47,325 DEBUG >>>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - >>>>>> Trigger heartbeat request. >>>>>> 2020-09-01 11:50:47,325 DEBUG >>>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - >>>>>> Trigger heartbeat request. >>>>>> 2020-09-01 11:50:47,330 DEBUG >>>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - >>>>>> Received heartbeat from 4e4ae8b90f911787ac112c2847759512. >>>>>> 2020-09-01 11:50:47,331 DEBUG >>>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager - >>>>>> Received heartbeat from 2532ed667a2566e06c3d9e3bc85c6ed6. >>>>>> 2020-09-01 11:50:52,880 DEBUG >>>>>> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - Got >>>>>> notification sessionid:0x30be3d929102460 >>>>>> 2020-09-01 11:50:52,880 DEBUG >>>>>> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - Got >>>>>> WatchedEvent state:SyncConnected type:NodeChildrenChanged >>>>>> path:/flink/cluster/jobgraphs for sessionid 0x30be3d929102460 >>>>>> 2020-09-01 11:50:52,882 INFO >>>>>> org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore - >>>>>> Added SubmittedJobGraph(980d3ff229b7fbfe889e2bc93e526da0) to ZooKeeper. >>>>>> >>>>>> Thank You. >>>>>> >>>>>> On Wed, Sep 2, 2020 at 2:06 PM Till Rohrmann <trohrm...@apache.org> >>>>>> wrote: >>>>>> >>>>>>> Hi Prakhar, >>>>>>> >>>>>>> have you enabled HA for your cluster? If yes, then Flink will try to >>>>>>> store the job graph to the configured high-availability.storageDir in >>>>>>> order >>>>>>> to be able to recover it. If this operation takes long, then it is >>>>>>> either >>>>>>> the filesystem which is slow or storing the pointer in ZooKeeper. If it >>>>>>> is >>>>>>> the filesystem, then I would suggest to check whether you have some >>>>>>> read/write quotas which might slow the operation down. >>>>>>> >>>>>>> If you haven't enabled HA or persisting the jobGraph is not what >>>>>>> takes long, then the next most likely candidate is the recovery from a >>>>>>> previous checkpoint. Here again, Flink needs to read from the remote >>>>>>> storage (in your case GCS). Depending on the size of the checkpoint and >>>>>>> the >>>>>>> read bandwidth, this can be faster or slower. The best way to figure out >>>>>>> what takes long is to share the logs with us so that we can confirm what >>>>>>> takes long. >>>>>>> >>>>>>> To sum it up, the job submission is most likely slow because of the >>>>>>> interplay of Flink with the external system (most likely your configured >>>>>>> filesystem). If the filesystem is somewhat throttled, then Flink cannot >>>>>>> do >>>>>>> much about it. >>>>>>> >>>>>>> What you could try to do is to check whether your jar contains >>>>>>> dependencies which are not needed (e.g. Flink dependencies which are >>>>>>> usually provided by the system). That way you could decrease the size of >>>>>>> the jar a bit. >>>>>>> >>>>>>> Cheers, >>>>>>> Till >>>>>>> >>>>>>> On Wed, Sep 2, 2020 at 9:48 AM Prakhar Mathur <prakha...@gojek.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> We are currently running Flink 1.9.0. We see a delay of around 20 >>>>>>>> seconds in order to start a job on a session Flink cluster. We start >>>>>>>> the >>>>>>>> job using Flink's monitoring REST API where our jar is already >>>>>>>> uploaded on >>>>>>>> Job Manager. Our jar file size is around 200 MB. We are using memory >>>>>>>> state >>>>>>>> backend having GCS as remote storage. >>>>>>>> >>>>>>>> On running the cluster in debug mode, we observed that generating >>>>>>>> the plan itself takes around 6 seconds and copying job graph from >>>>>>>> local to >>>>>>>> the remote folder takes around 10 seconds. >>>>>>>> >>>>>>>> We were wondering whether this delay is expected or if it can be >>>>>>>> reduced via tweaking any configuration? >>>>>>>> >>>>>>>> Thank you. Regards >>>>>>>> Prakhar Mathur >>>>>>>> >>>>>>>