Not sure if you've seen this, but Flinks file systems do support connection limiting.
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/common/#connection-limiting Seth On Wed, Dec 8, 2021 at 12:18 PM Kevin Lam <kevin....@shopify.com> wrote: > Hey David, > > Thanks for the response. The retry eventually succeeds, but I was > wondering if there was anything that people in the community have done to > avoid GCS/S3 rate-limiting issues. The retries do result in it taking > longer for all the task managers to recover and register. > > On Mon, Dec 6, 2021 at 3:42 AM David Morávek <d...@apache.org> wrote: > >> Hi Kevin, >> >> Flink comes with two schedulers for streaming: >> - Default >> - Adaptive (opt-in) >> >> Adaptive is still in experimental phase and doesn't support local >> recover. You're most likely using the first one, so you should be OK. >> >> Can you elaborate on this a bit? We aren't changing the parallelism when >>> restoring. >>> >> >> Splitting / merging of the rocksdb based operator checkpoint is currently >> an expensive operation. If the parallelism remains unchanged, you should be >> OK, the majority of time for the operator state restore will be spend on >> download of the rocksdb snapshot. >> >> Our checkpoint is about 900GB, and we have 256 TaskManagers with a >>> parallelism of 512. >>> >> >> This could definitely generate lot of concurrent requests when restoring >> the state. >> >> Does the restore operation fail, or the retry mechanism is sufficient to >> work around this? >> >> D. >> >> On Thu, Dec 2, 2021 at 7:54 PM Kevin Lam <kevin....@shopify.com> wrote: >> >>> HI David, >>> >>> Thanks for your response. >>> >>> What's the DefaultScheduler you're referring to? Is that available in >>> Flink 1.13.1 (the version we are using)? >>> >>> How large is the state you're restoring from / how many TMs does the job >>>> consume / what is the parallelism? >>> >>> >>> Our checkpoint is about 900GB, and we have 256 TaskManagers with a >>> parallelism of 512. >>> >>> Also things could get even worse if the parallelism that has been used >>>> for taking the checkpoint is different from the one you're trying to >>>> restore with (especially with RocksDB). >>>> >>> >>> Can you elaborate on this a bit? We aren't changing the parallelism when >>> restoring. >>> >>> On Thu, Dec 2, 2021 at 10:48 AM David Morávek <d...@apache.org> wrote: >>> >>>> Hi Kevin, >>>> >>>> this happens only when the pipeline is started up from savepoint / >>>> retained checkpoint right? Guessing from the "path" you've shared it seems >>>> like a RockDB based retained checkpoint. In this case all task managers >>>> need to pull state files from the object storage in order to restore. This >>>> can indeed be a heavy operation especially when restore a large state with >>>> high parallelism. >>>> >>>> Recovery from failure should be faster (with DefaultScheduler) as we >>>> can re-use the local files that are already present on TaskManagers. >>>> >>>> How large is the state you're restoring from / how many TMs does the >>>> job consume / what is the parallelism? >>>> >>>> Also things could get even worse if the parallelism that has been used >>>> for taking the checkpoint is different from the one you're trying to >>>> restore with (especially with RocksDB). >>>> >>>> Best, >>>> D. >>>> >>>> On Thu, Dec 2, 2021 at 4:29 PM Kevin Lam <kevin....@shopify.com> wrote: >>>> >>>>> Hi all, >>>>> >>>>> We're running a large (256 task managers with 4 task slots each) Flink >>>>> Cluster with High Availability enabled, on Kubernetes, and use Google >>>>> Cloud >>>>> Storage (GCS) as our object storage for the HA metadata. In addition, our >>>>> Flink application writes out to GCS from one of its sinks via streaming >>>>> file sink + GCS connector. >>>>> >>>>> We observed the following types of errors when running our application: >>>>> >>>>> ``` >>>>> >>>>> INFO: Encountered status code 429 when sending GET request to URL ' >>>>> https://storage.googleapis.com/download/storage/v1/b/<redacted>/o/<redacted>checkpoints%2F00000000000000000000000000000000%2Fshared%2F13721c52-18d8-4782-80ab-1ed8a15d9ad5?alt=media&generation=1638448883568946'. >>>>> Delegating to response handler for possible retry. [CONTEXT >>>>> ratelimit_period="10 SECONDS [skipped: 8]" ] >>>>> >>>>> ``` >>>>> >>>>> ``` >>>>> INFO: Encountered status code 503 when sending POST request to URL ' >>>>> https://storage.googleapis.com/upload/storage/v1/b/<redacted>/o?uploadType=multipart'. >>>>> Delegating to response handler for possible retry. >>>>> ``` >>>>> >>>>> They typically happen upon cluster start-up, when all the task >>>>> managers are registering with the jobmanager. We've also seen them occur >>>>> as >>>>> a result of output from our sink operator as well. >>>>> >>>>> Has anyone else encountered similar issues? Any practices you can >>>>> suggest? >>>>> >>>>> Advice appreciated! >>>>> >>>>> Thanks >>>>> >>>>