[GitHub] [beam] lukecwik commented on a change in pull request #12430: [BEAM-10303] Scale progress with respect to windows observation.
lukecwik commented on a change in pull request #12430: URL: https://github.com/apache/beam/pull/12430#discussion_r470772348 ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java ## @@ -1029,12 +1048,36 @@ public double getProgress() { private Progress getProgress() { synchronized (splitLock) { if (currentTracker instanceof RestrictionTracker.HasProgress) { -return ((HasProgress) currentTracker).getProgress(); +return scaleProgress( +((HasProgress) currentTracker).getProgress(), +currentWindowIterator.previousIndex(), +windowStopIndex); + } +} +return null; + } + + private Progress getProgressFromWindowObservingTruncate(double elementCompleted) { +synchronized (splitLock) { + if (currentWindow != null) { +return scaleProgress( +Progress.from(elementCompleted, 1 - elementCompleted), +currentWindowIterator.previousIndex(), +windowStopIndex); } } return null; } + private Progress scaleProgress(Progress progress, int currentWindowIndex, int stopWindowIndex) { Review comment: nit: make the method static This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] lukecwik commented on a change in pull request #12430: [BEAM-10303] Scale progress with respect to windows observation.
lukecwik commented on a change in pull request #12430: URL: https://github.com/apache/beam/pull/12430#discussion_r467272382 ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java ## @@ -1029,7 +1040,27 @@ public double getProgress() { private Progress getProgress() { synchronized (splitLock) { if (currentTracker instanceof RestrictionTracker.HasProgress) { -return ((HasProgress) currentTracker).getProgress(); +Progress progress = ((HasProgress) currentTracker).getProgress(); +double totalWork = progress.getWorkCompleted() + progress.getWorkRemaining(); +double completed = +totalWork * currentWindowIterator.previousIndex() + progress.getWorkCompleted(); +double remaining = +totalWork * (currentElement.getWindows().size() - currentWindowIterator.nextIndex()) ++ progress.getWorkRemaining(); +return Progress.from(completed, remaining); + } +} +return null; + } + + private Progress getProgressFromWindowObservingTruncate(double elementCompleted) { +synchronized (splitLock) { + if (currentWindowIterator != null) { Review comment: Yes, assuming you meant to say that truncate is on window2. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] lukecwik commented on a change in pull request #12430: [BEAM-10303] Scale progress with respect to windows observation.
lukecwik commented on a change in pull request #12430: URL: https://github.com/apache/beam/pull/12430#discussion_r467182990 ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java ## @@ -1029,7 +1040,27 @@ public double getProgress() { private Progress getProgress() { synchronized (splitLock) { if (currentTracker instanceof RestrictionTracker.HasProgress) { -return ((HasProgress) currentTracker).getProgress(); +Progress progress = ((HasProgress) currentTracker).getProgress(); +double totalWork = progress.getWorkCompleted() + progress.getWorkRemaining(); +double completed = +totalWork * currentWindowIterator.previousIndex() + progress.getWorkCompleted(); +double remaining = +totalWork * (currentElement.getWindows().size() - currentWindowIterator.nextIndex()) ++ progress.getWorkRemaining(); +return Progress.from(completed, remaining); + } +} +return null; + } + + private Progress getProgressFromWindowObservingTruncate(double elementCompleted) { +synchronized (splitLock) { + if (currentWindowIterator != null) { Review comment: I did forget something, it turns out that `work_completed`/`work_remaining` are `lists` of progresses so you can report one for each `active` element so it turns out that the protocol didn't forget about this inprogress issue. This could change our suggestion of how to report progress for window observing things or we can go with the idea of pushing it up to the gRPC read still. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] lukecwik commented on a change in pull request #12430: [BEAM-10303] Scale progress with respect to windows observation.
lukecwik commented on a change in pull request #12430: URL: https://github.com/apache/beam/pull/12430#discussion_r467182990 ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java ## @@ -1029,7 +1040,27 @@ public double getProgress() { private Progress getProgress() { synchronized (splitLock) { if (currentTracker instanceof RestrictionTracker.HasProgress) { -return ((HasProgress) currentTracker).getProgress(); +Progress progress = ((HasProgress) currentTracker).getProgress(); +double totalWork = progress.getWorkCompleted() + progress.getWorkRemaining(); +double completed = +totalWork * currentWindowIterator.previousIndex() + progress.getWorkCompleted(); +double remaining = +totalWork * (currentElement.getWindows().size() - currentWindowIterator.nextIndex()) ++ progress.getWorkRemaining(); +return Progress.from(completed, remaining); + } +} +return null; + } + + private Progress getProgressFromWindowObservingTruncate(double elementCompleted) { +synchronized (splitLock) { + if (currentWindowIterator != null) { Review comment: I did forget something, it turns out that `work_completed`/`work_remaining` are `lists` of progresses so you can report one for each `active` element so it turns out that the protocol didn't forget about this inprogress issue. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] lukecwik commented on a change in pull request #12430: [BEAM-10303] Scale progress with respect to windows observation.
lukecwik commented on a change in pull request #12430: URL: https://github.com/apache/beam/pull/12430#discussion_r467178630 ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java ## @@ -1029,12 +1040,35 @@ public double getProgress() { private Progress getProgress() { synchronized (splitLock) { if (currentTracker instanceof RestrictionTracker.HasProgress) { -return ((HasProgress) currentTracker).getProgress(); +return scaleProgress( +((HasProgress) currentTracker).getProgress(), +currentWindowIterator.previousIndex(), +currentElement.getWindows().size()); } } return null; } + private Progress getProgressFromWindowObservingTruncate(double elementCompleted) { +synchronized (splitLock) { + if (currentWindowIterator != null) { +return scaleProgress( +Progress.from(elementCompleted, 1 - elementCompleted), +currentWindowIterator.previousIndex(), +currentElement.getWindows().size()); + } +} +return null; + } + + private Progress scaleProgress(Progress progress, int completedWindowIndex, int windowCount) { Review comment: For the future: ```suggestion private Progress scaleProgress(Progress progress, int currentWindowIndex, int stopWindowIndex) { ``` Note that it is important that it isn't the number of completed windows but the current window index otherwise the math doesn't work out. ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java ## @@ -1029,12 +1040,35 @@ public double getProgress() { private Progress getProgress() { synchronized (splitLock) { if (currentTracker instanceof RestrictionTracker.HasProgress) { -return ((HasProgress) currentTracker).getProgress(); +return scaleProgress( +((HasProgress) currentTracker).getProgress(), +currentWindowIterator.previousIndex(), +currentElement.getWindows().size()); } } return null; } + private Progress getProgressFromWindowObservingTruncate(double elementCompleted) { +synchronized (splitLock) { + if (currentWindowIterator != null) { +return scaleProgress( +Progress.from(elementCompleted, 1 - elementCompleted), +currentWindowIterator.previousIndex(), +currentElement.getWindows().size()); + } +} +return null; + } + + private Progress scaleProgress(Progress progress, int completedWindowIndex, int windowCount) { +double totalWork = progress.getWorkCompleted() + progress.getWorkRemaining(); Review comment: `totalWork` -> `totalWorkPerWindow` ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java ## @@ -1029,12 +1040,35 @@ public double getProgress() { private Progress getProgress() { synchronized (splitLock) { if (currentTracker instanceof RestrictionTracker.HasProgress) { -return ((HasProgress) currentTracker).getProgress(); +return scaleProgress( +((HasProgress) currentTracker).getProgress(), +currentWindowIterator.previousIndex(), +currentElement.getWindows().size()); Review comment: This should be the `stop index` since splitting limits the current window iterator so the progress will be wrong after a split happens. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] lukecwik commented on a change in pull request #12430: [BEAM-10303] Scale progress with respect to windows observation.
lukecwik commented on a change in pull request #12430: URL: https://github.com/apache/beam/pull/12430#discussion_r465902065 ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java ## @@ -1029,7 +1040,27 @@ public double getProgress() { private Progress getProgress() { synchronized (splitLock) { if (currentTracker instanceof RestrictionTracker.HasProgress) { -return ((HasProgress) currentTracker).getProgress(); +Progress progress = ((HasProgress) currentTracker).getProgress(); +double totalWork = progress.getWorkCompleted() + progress.getWorkRemaining(); +double completed = +totalWork * currentWindowIterator.previousIndex() + progress.getWorkCompleted(); +double remaining = +totalWork * (currentElement.getWindows().size() - currentWindowIterator.nextIndex()) ++ progress.getWorkRemaining(); +return Progress.from(completed, remaining); + } +} +return null; + } + + private Progress getProgressFromWindowObservingTruncate(double elementCompleted) { +synchronized (splitLock) { + if (currentWindowIterator != null) { Review comment: Originally the idea was that we didn't want the SDK to have to perform these calculations and it is why each operator was going to report work_completed/work_remaining if it had them but it seems like accurate splitting by fraction needs to take it into account. Using the graph to compute the progress shouldn't be any more/less difficult then the work that is being put into the SDK. Is there still value in reporting the work_completed/work_remaining metrics then? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] lukecwik commented on a change in pull request #12430: [BEAM-10303] Scale progress with respect to windows observation.
lukecwik commented on a change in pull request #12430: URL: https://github.com/apache/beam/pull/12430#discussion_r465855425 ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java ## @@ -1029,7 +1040,27 @@ public double getProgress() { private Progress getProgress() { synchronized (splitLock) { if (currentTracker instanceof RestrictionTracker.HasProgress) { -return ((HasProgress) currentTracker).getProgress(); +Progress progress = ((HasProgress) currentTracker).getProgress(); +double totalWork = progress.getWorkCompleted() + progress.getWorkRemaining(); +double completed = +totalWork * currentWindowIterator.previousIndex() + progress.getWorkCompleted(); +double remaining = +totalWork * (currentElement.getWindows().size() - currentWindowIterator.nextIndex()) ++ progress.getWorkRemaining(); +return Progress.from(completed, remaining); + } +} +return null; + } + + private Progress getProgressFromWindowObservingTruncate(double elementCompleted) { +synchronized (splitLock) { + if (currentWindowIterator != null) { Review comment: Yes, truncate exposes some of the issues where a non root SDF makes things interesting. For option2, if we assume that `inprogress` is included in `remaining` then: ``` total = completed + remaining fraction_completed = (inprogress / remaining) * downstream_fraction_completed + completed / total ``` `downstream_fraction_completed` would be computed recursively and could also be computed effectively if there were multiple consumers with `(completed_conumers + downstream_fraction_completed) / total_num_consumers` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] lukecwik commented on a change in pull request #12430: [BEAM-10303] Scale progress with respect to windows observation.
lukecwik commented on a change in pull request #12430: URL: https://github.com/apache/beam/pull/12430#discussion_r465304871 ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java ## @@ -1029,7 +1040,27 @@ public double getProgress() { private Progress getProgress() { synchronized (splitLock) { if (currentTracker instanceof RestrictionTracker.HasProgress) { -return ((HasProgress) currentTracker).getProgress(); +Progress progress = ((HasProgress) currentTracker).getProgress(); +double totalWork = progress.getWorkCompleted() + progress.getWorkRemaining(); +double completed = +totalWork * currentWindowIterator.previousIndex() + progress.getWorkCompleted(); +double remaining = +totalWork * (currentElement.getWindows().size() - currentWindowIterator.nextIndex()) ++ progress.getWorkRemaining(); +return Progress.from(completed, remaining); + } +} +return null; + } + + private Progress getProgressFromWindowObservingTruncate(double elementCompleted) { +synchronized (splitLock) { + if (currentWindowIterator != null) { Review comment: Thinking about this more, I do believe that the progress does need to be reported as a metric so a runner can choose a split fraction and also compute the amount of remaining work and/or completion time estimate. It looks like either: 1) Need to make work completed/remaining take into account downstream processing OR 2) Need to add a metric that represents work in progress so that a runner can compute the amount of work being done (without this we can't figure out how much the work remaining downstream is relative to an upstream node). I'm not a big fan of 1) since it means that this metric is intrinsically tied to the state of other transforms while in 2) we are adding something new. ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java ## @@ -1029,7 +1040,27 @@ public double getProgress() { private Progress getProgress() { synchronized (splitLock) { if (currentTracker instanceof RestrictionTracker.HasProgress) { -return ((HasProgress) currentTracker).getProgress(); +Progress progress = ((HasProgress) currentTracker).getProgress(); +double totalWork = progress.getWorkCompleted() + progress.getWorkRemaining(); +double completed = +totalWork * currentWindowIterator.previousIndex() + progress.getWorkCompleted(); +double remaining = +totalWork * (currentElement.getWindows().size() - currentWindowIterator.nextIndex()) ++ progress.getWorkRemaining(); +return Progress.from(completed, remaining); + } +} +return null; + } + + private Progress getProgressFromWindowObservingTruncate(double elementCompleted) { +synchronized (splitLock) { + if (currentWindowIterator != null) { Review comment: Thinking about this more, I do believe that the progress does need to be reported as a metric so a runner can choose a split fraction and also compute the amount of remaining work and/or completion time estimate. It looks like either: 1) Need to make work completed/remaining take into account downstream processing 2) Need to add a metric that represents work in progress so that a runner can compute the amount of work being done (without this we can't figure out how much the work remaining downstream is relative to an upstream node). I'm not a big fan of 1) since it means that this metric is intrinsically tied to the state of other transforms while in 2) we are adding something new. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] lukecwik commented on a change in pull request #12430: [BEAM-10303] Scale progress with respect to windows observation.
lukecwik commented on a change in pull request #12430: URL: https://github.com/apache/beam/pull/12430#discussion_r464673014 ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java ## @@ -515,6 +515,9 @@ && Iterables.get(mainOutputConsumers, 0) instanceof HandlesSplits) { mainInputConsumer = new SplittableFnDataReceiver() { + private final HandlesSplits splitDelegate = + (HandlesSplits) Iterables.get(mainOutputConsumers, 0); Review comment: nit: here and below around line 558 ```suggestion (HandlesSplits) Iterables.getOnlyElement(mainOutputConsumers); ``` ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java ## @@ -1029,7 +1040,27 @@ public double getProgress() { private Progress getProgress() { synchronized (splitLock) { if (currentTracker instanceof RestrictionTracker.HasProgress) { -return ((HasProgress) currentTracker).getProgress(); +Progress progress = ((HasProgress) currentTracker).getProgress(); +double totalWork = progress.getWorkCompleted() + progress.getWorkRemaining(); +double completed = +totalWork * currentWindowIterator.previousIndex() + progress.getWorkCompleted(); +double remaining = +totalWork * (currentElement.getWindows().size() - currentWindowIterator.nextIndex()) ++ progress.getWorkRemaining(); +return Progress.from(completed, remaining); + } +} +return null; + } + + private Progress getProgressFromWindowObservingTruncate(double elementCompleted) { +synchronized (splitLock) { + if (currentWindowIterator != null) { Review comment: Should we register this with `addProgressRequestCallback` so we generate monitoring infos? I'm not sure if truncate should be using the downstream progress as part of its calculation when reporting it as a monitoring info. I know that this differs from how we calculate the progress/split point for the SplittableFnDataReceiver since the singular fraction needs to take into account the downstream progress accurately. I was always envisioning that work completed/work remaining for the monitoring infos always represented the local knowledge of work and didn't take into account any downstream/upstream knowledge. We can avoid this issue if we merge this logic into the `getProgress` method around line 533. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org