[GitHub] [flink] curcur edited a comment on pull request #17229: [FLINK-23381][state] Back-pressure on reaching state change to upload limit

2021-11-03 Thread GitBox


curcur edited a comment on pull request #17229:
URL: https://github.com/apache/flink/pull/17229#issuecomment-958666854






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] curcur edited a comment on pull request #17229: [FLINK-23381][state] Back-pressure on reaching state change to upload limit

2021-11-03 Thread GitBox


curcur edited a comment on pull request #17229:
URL: https://github.com/apache/flink/pull/17229#issuecomment-958666854






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] curcur edited a comment on pull request #17229: [FLINK-23381][state] Back-pressure on reaching state change to upload limit

2021-11-03 Thread GitBox


curcur edited a comment on pull request #17229:
URL: https://github.com/apache/flink/pull/17229#issuecomment-958666854






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] curcur edited a comment on pull request #17229: [FLINK-23381][state] Back-pressure on reaching state change to upload limit

2021-11-03 Thread GitBox


curcur edited a comment on pull request #17229:
URL: https://github.com/apache/flink/pull/17229#issuecomment-958666854


   Hey folks, I’ve spent some time reading through the `FsStateChangelogWriter` 
and related `DFS` uploading implementation. Now I have a bit better 
understanding but with more questions (We can discuss offline if necessary).
   
   Before posting my questions here, let me briefly describe my understanding 
(This is needed because it relates to my question).
   
   Please correct me if I misunderstand
   
   1. `StateChangeUploader` is per TM, shared amongst all tasks belonging to 
the same job (same TM)
   2. For each `ChangelogKeyedStateBackend`, it has a separate writer 
(StateChangelogWriter), maintaining `activeChangeSet`(in memory), to which the 
writer writes to
   3. persist (`persistInternal`) happens in two cases: 1). enough data in 
memory (`preEmptivePersistThresholdInBytes`); 2). checkpointing
   4. What persist does is to put an UploadTask to `StateChangeUploader` 's 
upload queue, waiting to schedule to upload
   5. The above all occur in the task thread
   6. Then scheduling thread (`SchedulerFactory.create(1, 
"ChangelogUploadScheduler", LOG`)) schedule task uploading (take UploadTasks 
away from StateChangeUploader 's upload queue
   7. IO thread (`retryingExecutor.execute(retryPolicy, () -> 
delegate.upload(tasks))`)  execute uploading tasks
   
   =
   **Questions (about availability provider)**
   
   1. The UploadThrottle (availability provider)  is shared amongst all tasks 
belonging to the same job; each time to check the availability, we have to grab 
the lock, whether it has a performance issue? From what I can see, the current 
`RecordWriter` and `InputProcessor` is per task and without lock; whether we 
should consider implementing changelog availablity provider per task as well?
   2. for check 
`changelogWriterAvailabilityProvider.isApproximatelyAvailable()`, when checking 
`getAvailableFuture() == AVAILABLE`, it is outside the lock, so when doing the 
check, the available future can be complete?
   3. Even if the available future is available, it means some task is able to 
proceed updating states, not necessary all tasks (other tasks might still be 
blocked on waiting)
   
   **Questions (about the locks)**
   I found there are three locks in three different classes (not sure whether 
there are more).
   `UploadThrottle`
   `BatchingStateChangeUploader`
   `FsStateChangelogWriter`
   
   But from the description above (the paragraph of "Please correct me if I 
misunderstand"), there is only one critical piece, that is "StateChangeUploader 
's upload queue"
   
   1. Several writers need to put uploadtasks into the queue
   2. Scheduler need to schedule the tasks from the queue
   3. AvaiablityProvider needs to check availability of the queue
   
   So, from what I can see, these three locks can be merged into one? Is there 
any reason to separate them?
   As discussed, more locks are easier to cause deadlock and make the code more 
difficult to reason about.
   
   **Questions (about the backpressure metric)**
   They are not mutually exclusive: but we can postpone this discussion when 
designing metrics.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] curcur edited a comment on pull request #17229: [FLINK-23381][state] Back-pressure on reaching state change to upload limit

2021-11-02 Thread GitBox


curcur edited a comment on pull request #17229:
URL: https://github.com/apache/flink/pull/17229#issuecomment-958666854


   Hey folks, I’ve spent some time reading through the `FsStateChangelogWriter` 
and related `DFS` uploading implementation. Now I have a bit better 
understanding but with more questions (We can discuss offline if necessary).
   
   Before posting my questions here, let me briefly describe my understanding 
(This is needed because it relates to my question).
   
   Please correct me if I misunderstand
   
   1. `StateChangeUploader` is per TM, shared amongst all tasks belonging to 
the same job (same TM)
   2. For each `ChangelogKeyedStateBackend`, it has a separate writer 
(StateChangelogWriter), maintaining `activeChangeSet`(in memory), to which the 
writer writes to
   3. persist (`persistInternal`) happens in two cases: 1). enough data in 
memory (`preEmptivePersistThresholdInBytes`); 2). checkpointing
   4. What persist does is to put an UploadTask to `StateChangeUploader` 's 
upload queue, waiting to schedule to upload
   5. The above all occur in the task thread
   6. Then scheduling thread (`SchedulerFactory.create(1, 
"ChangelogUploadScheduler", LOG`)) schedule task uploading (take UploadTasks 
away from StateChangeUploader 's upload queue
   7. IO thread (`retryingExecutor.execute(retryPolicy, () -> 
delegate.upload(tasks))`)  execute uploading tasks
   
   =
   **Questions (about availability provider)**
   
   1. The UploadThrottle (availability provider)  is shared amongst all tasks 
belonging to the same job; each time to check the availability, we have to grab 
the lock, whether it has a performance issue. From what I can see, the current 
`RecordWriter` and `InputProcessor` is per task and without lock; whether we 
should consider implementing changelog availablity provider per task as well?
   2. for check 
`changelogWriterAvailabilityProvider.isApproximatelyAvailable()`, when checking 
`getAvailableFuture() == AVAILABLE`, it is outside the lock, so when doing the 
check, the available future can be complete?
   3. Even if the available future is available, it means some task is able to 
proceed updating states, not necessary all tasks (other tasks might still be 
blocked on waiting)
   
   **Questions (about the locks)**
   I found there are three locks in three different classes (not sure whether 
there are more).
   `UploadThrottle`
   `BatchingStateChangeUploader`
   `FsStateChangelogWriter`
   
   But from the description above (the paragraph of "Please correct me if I 
misunderstand"), there is only one critical piece, that is "StateChangeUploader 
's upload queue"
   
   1. Several writers need to put uploadtasks into the queue
   2. Scheduler need to schedule the tasks from the queue
   3. AvaiablityProvider needs to check availability of the queue
   
   So, from what I can see, these three locks can be merged into one? Is there 
any reason to separate them?
   As discussed, more locks are easier to cause deadlock and make the code more 
difficult to reason about.
   
   **Questions (about the backpressure metric)**
   They are not mutually exclusive: but we can postpone this discussion when 
designing metrics.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] curcur edited a comment on pull request #17229: [FLINK-23381][state] Back-pressure on reaching state change to upload limit

2021-11-02 Thread GitBox


curcur edited a comment on pull request #17229:
URL: https://github.com/apache/flink/pull/17229#issuecomment-958666854


   Hey folks, I’ve spent some time reading through the `FsStateChangelogWriter` 
and related `DFS` uploading implementation. Now I have a bit better 
understanding but with more questions (We can discuss offline if necessary).
   
   Before posting my questions here, let me briefly describe my understanding 
(This is needed because it relates to my question).
   
   Please correct me if I misunderstand
   
   1. `StateChangeUploader` is per TM, shared amongst all tasks belonging to 
the same job (same TM)
   2. For each `ChangelogKeyedStateBackend`, it has a separate writer 
(StateChangelogWriter), maintaining `activeChangeSet`(in memory), to which the 
writer writes to
   3. persist (`persistInternal`) happens in two cases: 1). enough data in 
memory (`preEmptivePersistThresholdInBytes`); 2). checkpointing
   4. What persist does is to put an UploadTask to `StateChangeUploader` 's 
upload queue, waiting to schedule to upload
   5. The above all occur in the task thread
   6. Then scheduling thread (`SchedulerFactory.create(1, 
"ChangelogUploadScheduler", LOG`)) schedule task uploading (take UploadTasks 
away from StateChangeUploader 's upload queue
   7. IO thread (`retryingExecutor.execute(retryPolicy, () -> 
delegate.upload(tasks))`)  execute uploading tasks
   
   =
   **Questions (about availability provider)**
   
   1. The UploadThrottle (availability provider)  is shared amongst all tasks 
belonging to the same job; each time to check the availability, we have to grab 
the lock, whether it has a performance issue. From what I can see, the current 
`RecordWriter` and `InputProcessor` is per task and without lock; whether we 
should consider implementing changelog availablity provider per task as well?
   2. for check 
`changelogWriterAvailabilityProvider.isApproximatelyAvailable()`, when checking 
`getAvailableFuture() == AVAILABLE`, it is outside the lock, so when doing the 
check, the available future can be complete?
   3. Even if the available future is available, it means some task is able to 
proceed updating states, not necessary all tasks (other tasks might still be 
blocked on waiting)
   
   **Questions (about the locks)**
   I found there are three locks in three different classes (not sure whether 
there are more).
   `UploadThrottle`
   `BatchingStateChangeUploader`
   `FsStateChangelogWriter`
   
   But from the description above (the paragraph of "Please correct me if I 
misunderstand"), there is only one critical piece, that is "StateChangeUploader 
's upload queue"
   
   1. Several writers need to put uploadtasks into the queue
   2. Scheduler need to schedule the tasks from the queue
   3. AvaiablityProvider needs to check availability of the queue
   So, from what I can see, these three locks can be merged into one? Is there 
any reason to separate them?
   As discussed, more locks are easier to cause deadlock and make the code more 
difficult to reason about.
   
   **Questions (about the backpressure metric)**
   They are not mutually exclusive: but we can postpone this discussion when 
designing metrics.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] curcur edited a comment on pull request #17229: [FLINK-23381][state] Back-pressure on reaching state change to upload limit

2021-11-01 Thread GitBox


curcur edited a comment on pull request #17229:
URL: https://github.com/apache/flink/pull/17229#issuecomment-955988634


   Thanks @rkhachatryan for the PR, I have a few high-level + implementation 
questions. Let's discuss offline:
   
   - Item 1
   `seizeCapacity`: multiple task thread?
   
   1. `hasCapacity()`: inFlightBytesCounter < maxBytesInFlight;
  `isAvaialble()`: what is that, how it is different from `hasCapacity()`
  
   2. Even waken up, it still not guranteed to have enough capacity
(`inFlightBytesCounter + bytes`) still possible to exceed capacity
what's the purpose to split `seizeCapacity` into these two steps?

   - Item 2
   `releaseCapacity`: IO tread, single IO thread, multiple IO thread?
   How it is related to `upload` task scheduler?
   
   `UploadThrottle` - AvailabilityProvider
   `BatchingStateChangeUploader` contains AvailabilityProvider
   
   - Item 3
   `BatchingStateChangeUploader.upload`
   even if it grabs the lock and seize capacity, it may still not able to 
upload all bytes
   Why it is designed this way
   
   - Item 4
   `BatchingStateChangeUploader` per `FsStateChangelogStorage` inlcuded in 
`TaskStateManager`
   per TM
   
   - Item 5
   Why it is allowed to be null?
   `environment.getTaskStateManager().getStateChangelogStorage() == null`
   
   - Item 6
   Now Backpressure seems has one more state:
   `idleTimeMsPerSecond`
   `busyTimeMsPerSecond`
   `backPressuredTimeMsPerSecond` 
   -- backpressure because of downstream / backpressure because of changelog IO
   
   - Item 7
   I found there are several locks in a few different places, are they all 
necessary (asking this because of the DFS PR is not reviewed by me, so needs a 
bit more input)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] curcur edited a comment on pull request #17229: [FLINK-23381][state] Back-pressure on reaching state change to upload limit

2021-11-01 Thread GitBox


curcur edited a comment on pull request #17229:
URL: https://github.com/apache/flink/pull/17229#issuecomment-955988634


   Thanks @rkhachatryan for the PR, I have a few high-level + implementation 
questions. Let's discuss offline:
   
   - Item 1
   seizeCapacity: multiple task thread?
   
   1. hasCapacity(): inFlightBytesCounter < maxBytesInFlight;
  isAvaialble(): what is that, how it is different from hasCapacity()
  
   2. Even waken up, it still not guranteed to have enough capacity
(inFlightBytesCounter + bytes) still possible to exceed capacity
what's the purpose to split seizeCapacity into these two steps?

   - Item 2
   releaseCapacity: IO tread, single IO thread, multiple IO thread?
   How it is related to upload task scheduler?
   
   UploadThrottle - AvailabilityProvider
   BatchingStateChangeUploader contains AvailabilityProvider
   
   - Item 3
   BatchingStateChangeUploader.upload
   even if it grabs the lock and seize capacity, it may still not able to 
upload all bytes
   Why it is designed this way
   
   - Item 4
   BatchingStateChangeUploader per FsStateChangelogStorage inlcuded in 
TaskStateManager
   per TM
   
   - Item 5
   Why it is allowed to be null?
   environment.getTaskStateManager().getStateChangelogStorage() == null
   
   - Item 6
   Now Backpressure seems has one more state:
   idleTimeMsPerSecond
   busyTimeMsPerSecond
   backPressuredTimeMsPerSecond -- backpressure because of downstream / 
backpressure because of changelog IO
   
   - Item 7
   I found there are several locks in a few different places, are they all 
necessary (asking this because of the DFS PR is not reviewed by me, so needs a 
bit more input)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org