[GitHub] [flink] curcur edited a comment on pull request #17229: [FLINK-23381][state] Back-pressure on reaching state change to upload limit
curcur edited a comment on pull request #17229: URL: https://github.com/apache/flink/pull/17229#issuecomment-958666854 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] curcur edited a comment on pull request #17229: [FLINK-23381][state] Back-pressure on reaching state change to upload limit
curcur edited a comment on pull request #17229: URL: https://github.com/apache/flink/pull/17229#issuecomment-958666854 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] curcur edited a comment on pull request #17229: [FLINK-23381][state] Back-pressure on reaching state change to upload limit
curcur edited a comment on pull request #17229: URL: https://github.com/apache/flink/pull/17229#issuecomment-958666854 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] curcur edited a comment on pull request #17229: [FLINK-23381][state] Back-pressure on reaching state change to upload limit
curcur edited a comment on pull request #17229: URL: https://github.com/apache/flink/pull/17229#issuecomment-958666854 Hey folks, I’ve spent some time reading through the `FsStateChangelogWriter` and related `DFS` uploading implementation. Now I have a bit better understanding but with more questions (We can discuss offline if necessary). Before posting my questions here, let me briefly describe my understanding (This is needed because it relates to my question). Please correct me if I misunderstand 1. `StateChangeUploader` is per TM, shared amongst all tasks belonging to the same job (same TM) 2. For each `ChangelogKeyedStateBackend`, it has a separate writer (StateChangelogWriter), maintaining `activeChangeSet`(in memory), to which the writer writes to 3. persist (`persistInternal`) happens in two cases: 1). enough data in memory (`preEmptivePersistThresholdInBytes`); 2). checkpointing 4. What persist does is to put an UploadTask to `StateChangeUploader` 's upload queue, waiting to schedule to upload 5. The above all occur in the task thread 6. Then scheduling thread (`SchedulerFactory.create(1, "ChangelogUploadScheduler", LOG`)) schedule task uploading (take UploadTasks away from StateChangeUploader 's upload queue 7. IO thread (`retryingExecutor.execute(retryPolicy, () -> delegate.upload(tasks))`) execute uploading tasks = **Questions (about availability provider)** 1. The UploadThrottle (availability provider) is shared amongst all tasks belonging to the same job; each time to check the availability, we have to grab the lock, whether it has a performance issue? From what I can see, the current `RecordWriter` and `InputProcessor` is per task and without lock; whether we should consider implementing changelog availablity provider per task as well? 2. for check `changelogWriterAvailabilityProvider.isApproximatelyAvailable()`, when checking `getAvailableFuture() == AVAILABLE`, it is outside the lock, so when doing the check, the available future can be complete? 3. Even if the available future is available, it means some task is able to proceed updating states, not necessary all tasks (other tasks might still be blocked on waiting) **Questions (about the locks)** I found there are three locks in three different classes (not sure whether there are more). `UploadThrottle` `BatchingStateChangeUploader` `FsStateChangelogWriter` But from the description above (the paragraph of "Please correct me if I misunderstand"), there is only one critical piece, that is "StateChangeUploader 's upload queue" 1. Several writers need to put uploadtasks into the queue 2. Scheduler need to schedule the tasks from the queue 3. AvaiablityProvider needs to check availability of the queue So, from what I can see, these three locks can be merged into one? Is there any reason to separate them? As discussed, more locks are easier to cause deadlock and make the code more difficult to reason about. **Questions (about the backpressure metric)** They are not mutually exclusive: but we can postpone this discussion when designing metrics. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] curcur edited a comment on pull request #17229: [FLINK-23381][state] Back-pressure on reaching state change to upload limit
curcur edited a comment on pull request #17229: URL: https://github.com/apache/flink/pull/17229#issuecomment-958666854 Hey folks, I’ve spent some time reading through the `FsStateChangelogWriter` and related `DFS` uploading implementation. Now I have a bit better understanding but with more questions (We can discuss offline if necessary). Before posting my questions here, let me briefly describe my understanding (This is needed because it relates to my question). Please correct me if I misunderstand 1. `StateChangeUploader` is per TM, shared amongst all tasks belonging to the same job (same TM) 2. For each `ChangelogKeyedStateBackend`, it has a separate writer (StateChangelogWriter), maintaining `activeChangeSet`(in memory), to which the writer writes to 3. persist (`persistInternal`) happens in two cases: 1). enough data in memory (`preEmptivePersistThresholdInBytes`); 2). checkpointing 4. What persist does is to put an UploadTask to `StateChangeUploader` 's upload queue, waiting to schedule to upload 5. The above all occur in the task thread 6. Then scheduling thread (`SchedulerFactory.create(1, "ChangelogUploadScheduler", LOG`)) schedule task uploading (take UploadTasks away from StateChangeUploader 's upload queue 7. IO thread (`retryingExecutor.execute(retryPolicy, () -> delegate.upload(tasks))`) execute uploading tasks = **Questions (about availability provider)** 1. The UploadThrottle (availability provider) is shared amongst all tasks belonging to the same job; each time to check the availability, we have to grab the lock, whether it has a performance issue. From what I can see, the current `RecordWriter` and `InputProcessor` is per task and without lock; whether we should consider implementing changelog availablity provider per task as well? 2. for check `changelogWriterAvailabilityProvider.isApproximatelyAvailable()`, when checking `getAvailableFuture() == AVAILABLE`, it is outside the lock, so when doing the check, the available future can be complete? 3. Even if the available future is available, it means some task is able to proceed updating states, not necessary all tasks (other tasks might still be blocked on waiting) **Questions (about the locks)** I found there are three locks in three different classes (not sure whether there are more). `UploadThrottle` `BatchingStateChangeUploader` `FsStateChangelogWriter` But from the description above (the paragraph of "Please correct me if I misunderstand"), there is only one critical piece, that is "StateChangeUploader 's upload queue" 1. Several writers need to put uploadtasks into the queue 2. Scheduler need to schedule the tasks from the queue 3. AvaiablityProvider needs to check availability of the queue So, from what I can see, these three locks can be merged into one? Is there any reason to separate them? As discussed, more locks are easier to cause deadlock and make the code more difficult to reason about. **Questions (about the backpressure metric)** They are not mutually exclusive: but we can postpone this discussion when designing metrics. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] curcur edited a comment on pull request #17229: [FLINK-23381][state] Back-pressure on reaching state change to upload limit
curcur edited a comment on pull request #17229: URL: https://github.com/apache/flink/pull/17229#issuecomment-958666854 Hey folks, I’ve spent some time reading through the `FsStateChangelogWriter` and related `DFS` uploading implementation. Now I have a bit better understanding but with more questions (We can discuss offline if necessary). Before posting my questions here, let me briefly describe my understanding (This is needed because it relates to my question). Please correct me if I misunderstand 1. `StateChangeUploader` is per TM, shared amongst all tasks belonging to the same job (same TM) 2. For each `ChangelogKeyedStateBackend`, it has a separate writer (StateChangelogWriter), maintaining `activeChangeSet`(in memory), to which the writer writes to 3. persist (`persistInternal`) happens in two cases: 1). enough data in memory (`preEmptivePersistThresholdInBytes`); 2). checkpointing 4. What persist does is to put an UploadTask to `StateChangeUploader` 's upload queue, waiting to schedule to upload 5. The above all occur in the task thread 6. Then scheduling thread (`SchedulerFactory.create(1, "ChangelogUploadScheduler", LOG`)) schedule task uploading (take UploadTasks away from StateChangeUploader 's upload queue 7. IO thread (`retryingExecutor.execute(retryPolicy, () -> delegate.upload(tasks))`) execute uploading tasks = **Questions (about availability provider)** 1. The UploadThrottle (availability provider) is shared amongst all tasks belonging to the same job; each time to check the availability, we have to grab the lock, whether it has a performance issue. From what I can see, the current `RecordWriter` and `InputProcessor` is per task and without lock; whether we should consider implementing changelog availablity provider per task as well? 2. for check `changelogWriterAvailabilityProvider.isApproximatelyAvailable()`, when checking `getAvailableFuture() == AVAILABLE`, it is outside the lock, so when doing the check, the available future can be complete? 3. Even if the available future is available, it means some task is able to proceed updating states, not necessary all tasks (other tasks might still be blocked on waiting) **Questions (about the locks)** I found there are three locks in three different classes (not sure whether there are more). `UploadThrottle` `BatchingStateChangeUploader` `FsStateChangelogWriter` But from the description above (the paragraph of "Please correct me if I misunderstand"), there is only one critical piece, that is "StateChangeUploader 's upload queue" 1. Several writers need to put uploadtasks into the queue 2. Scheduler need to schedule the tasks from the queue 3. AvaiablityProvider needs to check availability of the queue So, from what I can see, these three locks can be merged into one? Is there any reason to separate them? As discussed, more locks are easier to cause deadlock and make the code more difficult to reason about. **Questions (about the backpressure metric)** They are not mutually exclusive: but we can postpone this discussion when designing metrics. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] curcur edited a comment on pull request #17229: [FLINK-23381][state] Back-pressure on reaching state change to upload limit
curcur edited a comment on pull request #17229: URL: https://github.com/apache/flink/pull/17229#issuecomment-955988634 Thanks @rkhachatryan for the PR, I have a few high-level + implementation questions. Let's discuss offline: - Item 1 `seizeCapacity`: multiple task thread? 1. `hasCapacity()`: inFlightBytesCounter < maxBytesInFlight; `isAvaialble()`: what is that, how it is different from `hasCapacity()` 2. Even waken up, it still not guranteed to have enough capacity (`inFlightBytesCounter + bytes`) still possible to exceed capacity what's the purpose to split `seizeCapacity` into these two steps? - Item 2 `releaseCapacity`: IO tread, single IO thread, multiple IO thread? How it is related to `upload` task scheduler? `UploadThrottle` - AvailabilityProvider `BatchingStateChangeUploader` contains AvailabilityProvider - Item 3 `BatchingStateChangeUploader.upload` even if it grabs the lock and seize capacity, it may still not able to upload all bytes Why it is designed this way - Item 4 `BatchingStateChangeUploader` per `FsStateChangelogStorage` inlcuded in `TaskStateManager` per TM - Item 5 Why it is allowed to be null? `environment.getTaskStateManager().getStateChangelogStorage() == null` - Item 6 Now Backpressure seems has one more state: `idleTimeMsPerSecond` `busyTimeMsPerSecond` `backPressuredTimeMsPerSecond` -- backpressure because of downstream / backpressure because of changelog IO - Item 7 I found there are several locks in a few different places, are they all necessary (asking this because of the DFS PR is not reviewed by me, so needs a bit more input) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] curcur edited a comment on pull request #17229: [FLINK-23381][state] Back-pressure on reaching state change to upload limit
curcur edited a comment on pull request #17229: URL: https://github.com/apache/flink/pull/17229#issuecomment-955988634 Thanks @rkhachatryan for the PR, I have a few high-level + implementation questions. Let's discuss offline: - Item 1 seizeCapacity: multiple task thread? 1. hasCapacity(): inFlightBytesCounter < maxBytesInFlight; isAvaialble(): what is that, how it is different from hasCapacity() 2. Even waken up, it still not guranteed to have enough capacity (inFlightBytesCounter + bytes) still possible to exceed capacity what's the purpose to split seizeCapacity into these two steps? - Item 2 releaseCapacity: IO tread, single IO thread, multiple IO thread? How it is related to upload task scheduler? UploadThrottle - AvailabilityProvider BatchingStateChangeUploader contains AvailabilityProvider - Item 3 BatchingStateChangeUploader.upload even if it grabs the lock and seize capacity, it may still not able to upload all bytes Why it is designed this way - Item 4 BatchingStateChangeUploader per FsStateChangelogStorage inlcuded in TaskStateManager per TM - Item 5 Why it is allowed to be null? environment.getTaskStateManager().getStateChangelogStorage() == null - Item 6 Now Backpressure seems has one more state: idleTimeMsPerSecond busyTimeMsPerSecond backPressuredTimeMsPerSecond -- backpressure because of downstream / backpressure because of changelog IO - Item 7 I found there are several locks in a few different places, are they all necessary (asking this because of the DFS PR is not reviewed by me, so needs a bit more input) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org