Not sure if you've seen this, but Flinks file systems do support connection
limiting.

https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/common/#connection-limiting

Seth

On Wed, Dec 8, 2021 at 12:18 PM Kevin Lam <kevin....@shopify.com> wrote:

> Hey David,
>
> Thanks for the response. The retry eventually succeeds, but I was
> wondering if there was anything that people in the community have done to
> avoid GCS/S3 rate-limiting issues. The retries do result in it taking
> longer for all the task managers to recover and register.
>
> On Mon, Dec 6, 2021 at 3:42 AM David Morávek <d...@apache.org> wrote:
>
>> Hi Kevin,
>>
>> Flink comes with two schedulers for streaming:
>> - Default
>> - Adaptive (opt-in)
>>
>> Adaptive is still in experimental phase and doesn't support local
>> recover. You're most likely using the first one, so you should be OK.
>>
>> Can you elaborate on this a bit? We aren't changing the parallelism when
>>> restoring.
>>>
>>
>> Splitting / merging of the rocksdb based operator checkpoint is currently
>> an expensive operation. If the parallelism remains unchanged, you should be
>> OK, the majority of time for the operator state restore will be spend on
>> download of the rocksdb snapshot.
>>
>> Our checkpoint is about 900GB, and we have 256 TaskManagers with a
>>> parallelism of 512.
>>>
>>
>> This could definitely generate lot of concurrent requests when restoring
>> the state.
>>
>> Does the restore operation fail, or the retry mechanism is sufficient to
>> work around this?
>>
>> D.
>>
>> On Thu, Dec 2, 2021 at 7:54 PM Kevin Lam <kevin....@shopify.com> wrote:
>>
>>> HI David,
>>>
>>> Thanks for your response.
>>>
>>> What's the DefaultScheduler you're referring to? Is that available in
>>> Flink 1.13.1 (the version we are using)?
>>>
>>> How large is the state you're restoring from / how many TMs does the job
>>>> consume / what is the parallelism?
>>>
>>>
>>> Our checkpoint is about 900GB, and we have 256 TaskManagers with a
>>> parallelism of 512.
>>>
>>> Also things could get even worse if the parallelism that has been used
>>>> for taking the checkpoint is different from the one you're trying to
>>>> restore with (especially with RocksDB).
>>>>
>>>
>>> Can you elaborate on this a bit? We aren't changing the parallelism when
>>> restoring.
>>>
>>> On Thu, Dec 2, 2021 at 10:48 AM David Morávek <d...@apache.org> wrote:
>>>
>>>> Hi Kevin,
>>>>
>>>> this happens only when the pipeline is started up from savepoint /
>>>> retained checkpoint right? Guessing from the "path" you've shared it seems
>>>> like a RockDB based retained checkpoint. In this case all task managers
>>>> need to pull state files from the object storage in order to restore. This
>>>> can indeed be a heavy operation especially when restore a large state with
>>>> high parallelism.
>>>>
>>>> Recovery from failure should be faster (with DefaultScheduler) as we
>>>> can re-use the local files that are already present on TaskManagers.
>>>>
>>>> How large is the state you're restoring from / how many TMs does the
>>>> job consume / what is the parallelism?
>>>>
>>>> Also things could get even worse if the parallelism that has been used
>>>> for taking the checkpoint is different from the one you're trying to
>>>> restore with (especially with RocksDB).
>>>>
>>>> Best,
>>>> D.
>>>>
>>>> On Thu, Dec 2, 2021 at 4:29 PM Kevin Lam <kevin....@shopify.com> wrote:
>>>>
>>>>> Hi all,
>>>>>
>>>>> We're running a large (256 task managers with 4 task slots each) Flink
>>>>> Cluster with High Availability enabled, on Kubernetes, and use Google 
>>>>> Cloud
>>>>> Storage (GCS) as our object storage for the HA metadata. In addition, our
>>>>> Flink application writes out to GCS from one of its sinks via streaming
>>>>> file sink + GCS connector.
>>>>>
>>>>> We observed the following types of errors when running our application:
>>>>>
>>>>> ```
>>>>>
>>>>> INFO: Encountered status code 429 when sending GET request to URL '
>>>>> https://storage.googleapis.com/download/storage/v1/b/<redacted>/o/<redacted>checkpoints%2F00000000000000000000000000000000%2Fshared%2F13721c52-18d8-4782-80ab-1ed8a15d9ad5?alt=media&generation=1638448883568946'.
>>>>> Delegating to response handler for possible retry. [CONTEXT
>>>>> ratelimit_period="10 SECONDS [skipped: 8]" ]
>>>>>
>>>>> ```
>>>>>
>>>>> ```
>>>>>  INFO: Encountered status code 503 when sending POST request to URL '
>>>>> https://storage.googleapis.com/upload/storage/v1/b/<redacted>/o?uploadType=multipart'.
>>>>> Delegating to response handler for possible retry.
>>>>> ```
>>>>>
>>>>> They typically happen upon cluster start-up, when all the task
>>>>> managers are registering with the jobmanager. We've also seen them occur 
>>>>> as
>>>>> a result of output from our sink operator as well.
>>>>>
>>>>> Has anyone else encountered similar issues? Any practices you can
>>>>> suggest?
>>>>>
>>>>> Advice appreciated!
>>>>>
>>>>> Thanks
>>>>>
>>>>

Reply via email to