[GitHub] [beam] lukecwik commented on a change in pull request #12430: [BEAM-10303] Scale progress with respect to windows observation.

2020-08-14 Thread GitBox


lukecwik commented on a change in pull request #12430:
URL: https://github.com/apache/beam/pull/12430#discussion_r470772348



##
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##
@@ -1029,12 +1048,36 @@ public double getProgress() {
   private Progress getProgress() {
 synchronized (splitLock) {
   if (currentTracker instanceof RestrictionTracker.HasProgress) {
-return ((HasProgress) currentTracker).getProgress();
+return scaleProgress(
+((HasProgress) currentTracker).getProgress(),
+currentWindowIterator.previousIndex(),
+windowStopIndex);
+  }
+}
+return null;
+  }
+
+  private Progress getProgressFromWindowObservingTruncate(double 
elementCompleted) {
+synchronized (splitLock) {
+  if (currentWindow != null) {
+return scaleProgress(
+Progress.from(elementCompleted, 1 - elementCompleted),
+currentWindowIterator.previousIndex(),
+windowStopIndex);
   }
 }
 return null;
   }
 
+  private Progress scaleProgress(Progress progress, int currentWindowIndex, 
int stopWindowIndex) {

Review comment:
   nit: make the method static





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] lukecwik commented on a change in pull request #12430: [BEAM-10303] Scale progress with respect to windows observation.

2020-08-07 Thread GitBox


lukecwik commented on a change in pull request #12430:
URL: https://github.com/apache/beam/pull/12430#discussion_r467272382



##
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##
@@ -1029,7 +1040,27 @@ public double getProgress() {
   private Progress getProgress() {
 synchronized (splitLock) {
   if (currentTracker instanceof RestrictionTracker.HasProgress) {
-return ((HasProgress) currentTracker).getProgress();
+Progress progress = ((HasProgress) currentTracker).getProgress();
+double totalWork = progress.getWorkCompleted() + 
progress.getWorkRemaining();
+double completed =
+totalWork * currentWindowIterator.previousIndex() + 
progress.getWorkCompleted();
+double remaining =
+totalWork * (currentElement.getWindows().size() - 
currentWindowIterator.nextIndex())
++ progress.getWorkRemaining();
+return Progress.from(completed, remaining);
+  }
+}
+return null;
+  }
+
+  private Progress getProgressFromWindowObservingTruncate(double 
elementCompleted) {
+synchronized (splitLock) {
+  if (currentWindowIterator != null) {

Review comment:
   Yes, assuming you meant to say that truncate is on window2.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] lukecwik commented on a change in pull request #12430: [BEAM-10303] Scale progress with respect to windows observation.

2020-08-07 Thread GitBox


lukecwik commented on a change in pull request #12430:
URL: https://github.com/apache/beam/pull/12430#discussion_r467182990



##
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##
@@ -1029,7 +1040,27 @@ public double getProgress() {
   private Progress getProgress() {
 synchronized (splitLock) {
   if (currentTracker instanceof RestrictionTracker.HasProgress) {
-return ((HasProgress) currentTracker).getProgress();
+Progress progress = ((HasProgress) currentTracker).getProgress();
+double totalWork = progress.getWorkCompleted() + 
progress.getWorkRemaining();
+double completed =
+totalWork * currentWindowIterator.previousIndex() + 
progress.getWorkCompleted();
+double remaining =
+totalWork * (currentElement.getWindows().size() - 
currentWindowIterator.nextIndex())
++ progress.getWorkRemaining();
+return Progress.from(completed, remaining);
+  }
+}
+return null;
+  }
+
+  private Progress getProgressFromWindowObservingTruncate(double 
elementCompleted) {
+synchronized (splitLock) {
+  if (currentWindowIterator != null) {

Review comment:
   I did forget something, it turns out that 
`work_completed`/`work_remaining` are `lists` of progresses so you can report 
one for each `active` element so it turns out that the protocol didn't forget 
about this inprogress issue.
   
   This could change our suggestion of how to report progress for window 
observing things or we can go with the idea of pushing it up to the gRPC read 
still.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] lukecwik commented on a change in pull request #12430: [BEAM-10303] Scale progress with respect to windows observation.

2020-08-07 Thread GitBox


lukecwik commented on a change in pull request #12430:
URL: https://github.com/apache/beam/pull/12430#discussion_r467182990



##
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##
@@ -1029,7 +1040,27 @@ public double getProgress() {
   private Progress getProgress() {
 synchronized (splitLock) {
   if (currentTracker instanceof RestrictionTracker.HasProgress) {
-return ((HasProgress) currentTracker).getProgress();
+Progress progress = ((HasProgress) currentTracker).getProgress();
+double totalWork = progress.getWorkCompleted() + 
progress.getWorkRemaining();
+double completed =
+totalWork * currentWindowIterator.previousIndex() + 
progress.getWorkCompleted();
+double remaining =
+totalWork * (currentElement.getWindows().size() - 
currentWindowIterator.nextIndex())
++ progress.getWorkRemaining();
+return Progress.from(completed, remaining);
+  }
+}
+return null;
+  }
+
+  private Progress getProgressFromWindowObservingTruncate(double 
elementCompleted) {
+synchronized (splitLock) {
+  if (currentWindowIterator != null) {

Review comment:
   I did forget something, it turns out that 
`work_completed`/`work_remaining` are `lists` of progresses so you can report 
one for each `active` element so it turns out that the protocol didn't forget 
about this inprogress issue.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] lukecwik commented on a change in pull request #12430: [BEAM-10303] Scale progress with respect to windows observation.

2020-08-07 Thread GitBox


lukecwik commented on a change in pull request #12430:
URL: https://github.com/apache/beam/pull/12430#discussion_r467178630



##
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##
@@ -1029,12 +1040,35 @@ public double getProgress() {
   private Progress getProgress() {
 synchronized (splitLock) {
   if (currentTracker instanceof RestrictionTracker.HasProgress) {
-return ((HasProgress) currentTracker).getProgress();
+return scaleProgress(
+((HasProgress) currentTracker).getProgress(),
+currentWindowIterator.previousIndex(),
+currentElement.getWindows().size());
   }
 }
 return null;
   }
 
+  private Progress getProgressFromWindowObservingTruncate(double 
elementCompleted) {
+synchronized (splitLock) {
+  if (currentWindowIterator != null) {
+return scaleProgress(
+Progress.from(elementCompleted, 1 - elementCompleted),
+currentWindowIterator.previousIndex(),
+currentElement.getWindows().size());
+  }
+}
+return null;
+  }
+
+  private Progress scaleProgress(Progress progress, int completedWindowIndex, 
int windowCount) {

Review comment:
   For the future:
   ```suggestion
 private Progress scaleProgress(Progress progress, int currentWindowIndex, 
int stopWindowIndex) {
   ```
   
   Note that it is important that it isn't the number of completed windows but 
the current window index otherwise the math doesn't work out.

##
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##
@@ -1029,12 +1040,35 @@ public double getProgress() {
   private Progress getProgress() {
 synchronized (splitLock) {
   if (currentTracker instanceof RestrictionTracker.HasProgress) {
-return ((HasProgress) currentTracker).getProgress();
+return scaleProgress(
+((HasProgress) currentTracker).getProgress(),
+currentWindowIterator.previousIndex(),
+currentElement.getWindows().size());
   }
 }
 return null;
   }
 
+  private Progress getProgressFromWindowObservingTruncate(double 
elementCompleted) {
+synchronized (splitLock) {
+  if (currentWindowIterator != null) {
+return scaleProgress(
+Progress.from(elementCompleted, 1 - elementCompleted),
+currentWindowIterator.previousIndex(),
+currentElement.getWindows().size());
+  }
+}
+return null;
+  }
+
+  private Progress scaleProgress(Progress progress, int completedWindowIndex, 
int windowCount) {
+double totalWork = progress.getWorkCompleted() + 
progress.getWorkRemaining();

Review comment:
   `totalWork` -> `totalWorkPerWindow`

##
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##
@@ -1029,12 +1040,35 @@ public double getProgress() {
   private Progress getProgress() {
 synchronized (splitLock) {
   if (currentTracker instanceof RestrictionTracker.HasProgress) {
-return ((HasProgress) currentTracker).getProgress();
+return scaleProgress(
+((HasProgress) currentTracker).getProgress(),
+currentWindowIterator.previousIndex(),
+currentElement.getWindows().size());

Review comment:
   This should be the `stop index` since splitting limits the current 
window iterator so the progress will be wrong after a split happens.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] lukecwik commented on a change in pull request #12430: [BEAM-10303] Scale progress with respect to windows observation.

2020-08-05 Thread GitBox


lukecwik commented on a change in pull request #12430:
URL: https://github.com/apache/beam/pull/12430#discussion_r465902065



##
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##
@@ -1029,7 +1040,27 @@ public double getProgress() {
   private Progress getProgress() {
 synchronized (splitLock) {
   if (currentTracker instanceof RestrictionTracker.HasProgress) {
-return ((HasProgress) currentTracker).getProgress();
+Progress progress = ((HasProgress) currentTracker).getProgress();
+double totalWork = progress.getWorkCompleted() + 
progress.getWorkRemaining();
+double completed =
+totalWork * currentWindowIterator.previousIndex() + 
progress.getWorkCompleted();
+double remaining =
+totalWork * (currentElement.getWindows().size() - 
currentWindowIterator.nextIndex())
++ progress.getWorkRemaining();
+return Progress.from(completed, remaining);
+  }
+}
+return null;
+  }
+
+  private Progress getProgressFromWindowObservingTruncate(double 
elementCompleted) {
+synchronized (splitLock) {
+  if (currentWindowIterator != null) {

Review comment:
   Originally the idea was that we didn't want the SDK to have to perform 
these calculations and it is why each operator was going to report 
work_completed/work_remaining if it had them but it seems like accurate 
splitting by fraction needs to take it into account.
   
   Using the graph to compute the progress shouldn't be any more/less difficult 
then the work that is being put into the SDK.
   
   Is there still value in reporting the work_completed/work_remaining metrics 
then?
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] lukecwik commented on a change in pull request #12430: [BEAM-10303] Scale progress with respect to windows observation.

2020-08-05 Thread GitBox


lukecwik commented on a change in pull request #12430:
URL: https://github.com/apache/beam/pull/12430#discussion_r465855425



##
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##
@@ -1029,7 +1040,27 @@ public double getProgress() {
   private Progress getProgress() {
 synchronized (splitLock) {
   if (currentTracker instanceof RestrictionTracker.HasProgress) {
-return ((HasProgress) currentTracker).getProgress();
+Progress progress = ((HasProgress) currentTracker).getProgress();
+double totalWork = progress.getWorkCompleted() + 
progress.getWorkRemaining();
+double completed =
+totalWork * currentWindowIterator.previousIndex() + 
progress.getWorkCompleted();
+double remaining =
+totalWork * (currentElement.getWindows().size() - 
currentWindowIterator.nextIndex())
++ progress.getWorkRemaining();
+return Progress.from(completed, remaining);
+  }
+}
+return null;
+  }
+
+  private Progress getProgressFromWindowObservingTruncate(double 
elementCompleted) {
+synchronized (splitLock) {
+  if (currentWindowIterator != null) {

Review comment:
   Yes, truncate exposes some of the issues where a non root SDF makes 
things interesting.
   
   For option2, if we assume that `inprogress` is included in `remaining` then:
   ```
   total = completed + remaining
   fraction_completed = (inprogress / remaining) * 
downstream_fraction_completed + completed / total
   ```
   `downstream_fraction_completed` would be computed recursively and could also 
be computed effectively if there were multiple consumers with 
`(completed_conumers + downstream_fraction_completed) / total_num_consumers`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] lukecwik commented on a change in pull request #12430: [BEAM-10303] Scale progress with respect to windows observation.

2020-08-04 Thread GitBox


lukecwik commented on a change in pull request #12430:
URL: https://github.com/apache/beam/pull/12430#discussion_r465304871



##
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##
@@ -1029,7 +1040,27 @@ public double getProgress() {
   private Progress getProgress() {
 synchronized (splitLock) {
   if (currentTracker instanceof RestrictionTracker.HasProgress) {
-return ((HasProgress) currentTracker).getProgress();
+Progress progress = ((HasProgress) currentTracker).getProgress();
+double totalWork = progress.getWorkCompleted() + 
progress.getWorkRemaining();
+double completed =
+totalWork * currentWindowIterator.previousIndex() + 
progress.getWorkCompleted();
+double remaining =
+totalWork * (currentElement.getWindows().size() - 
currentWindowIterator.nextIndex())
++ progress.getWorkRemaining();
+return Progress.from(completed, remaining);
+  }
+}
+return null;
+  }
+
+  private Progress getProgressFromWindowObservingTruncate(double 
elementCompleted) {
+synchronized (splitLock) {
+  if (currentWindowIterator != null) {

Review comment:
   Thinking about this more, I do believe that the progress does need to be 
reported as a metric so a runner can choose a split fraction and also compute 
the amount of remaining work and/or completion time estimate. It looks like 
either:
   1) Need to make work completed/remaining take into account downstream 
processing
   OR
   2) Need to add a metric that represents work in progress so that a runner 
can compute the amount of work being done (without this we can't figure out how 
much the work remaining downstream is relative to an upstream node).
   
   I'm not a big fan of 1) since it means that this metric is intrinsically 
tied to the state of other transforms while in 2) we are adding something new.

##
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##
@@ -1029,7 +1040,27 @@ public double getProgress() {
   private Progress getProgress() {
 synchronized (splitLock) {
   if (currentTracker instanceof RestrictionTracker.HasProgress) {
-return ((HasProgress) currentTracker).getProgress();
+Progress progress = ((HasProgress) currentTracker).getProgress();
+double totalWork = progress.getWorkCompleted() + 
progress.getWorkRemaining();
+double completed =
+totalWork * currentWindowIterator.previousIndex() + 
progress.getWorkCompleted();
+double remaining =
+totalWork * (currentElement.getWindows().size() - 
currentWindowIterator.nextIndex())
++ progress.getWorkRemaining();
+return Progress.from(completed, remaining);
+  }
+}
+return null;
+  }
+
+  private Progress getProgressFromWindowObservingTruncate(double 
elementCompleted) {
+synchronized (splitLock) {
+  if (currentWindowIterator != null) {

Review comment:
   Thinking about this more, I do believe that the progress does need to be 
reported as a metric so a runner can choose a split fraction and also compute 
the amount of remaining work and/or completion time estimate. It looks like 
either:
   1) Need to make work completed/remaining take into account downstream 
processing
   2) Need to add a metric that represents work in progress so that a runner 
can compute the amount of work being done (without this we can't figure out how 
much the work remaining downstream is relative to an upstream node).
   
   I'm not a big fan of 1) since it means that this metric is intrinsically 
tied to the state of other transforms while in 2) we are adding something new.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] lukecwik commented on a change in pull request #12430: [BEAM-10303] Scale progress with respect to windows observation.

2020-08-03 Thread GitBox


lukecwik commented on a change in pull request #12430:
URL: https://github.com/apache/beam/pull/12430#discussion_r464673014



##
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##
@@ -515,6 +515,9 @@
   && Iterables.get(mainOutputConsumers, 0) instanceof 
HandlesSplits) {
 mainInputConsumer =
 new SplittableFnDataReceiver() {
+  private final HandlesSplits splitDelegate =
+  (HandlesSplits) Iterables.get(mainOutputConsumers, 0);

Review comment:
   nit: here and below around line 558
   ```suggestion
 (HandlesSplits) 
Iterables.getOnlyElement(mainOutputConsumers);
   ```

##
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##
@@ -1029,7 +1040,27 @@ public double getProgress() {
   private Progress getProgress() {
 synchronized (splitLock) {
   if (currentTracker instanceof RestrictionTracker.HasProgress) {
-return ((HasProgress) currentTracker).getProgress();
+Progress progress = ((HasProgress) currentTracker).getProgress();
+double totalWork = progress.getWorkCompleted() + 
progress.getWorkRemaining();
+double completed =
+totalWork * currentWindowIterator.previousIndex() + 
progress.getWorkCompleted();
+double remaining =
+totalWork * (currentElement.getWindows().size() - 
currentWindowIterator.nextIndex())
++ progress.getWorkRemaining();
+return Progress.from(completed, remaining);
+  }
+}
+return null;
+  }
+
+  private Progress getProgressFromWindowObservingTruncate(double 
elementCompleted) {
+synchronized (splitLock) {
+  if (currentWindowIterator != null) {

Review comment:
   Should we register this with `addProgressRequestCallback` so we generate 
monitoring infos?
   
   I'm not sure if truncate should be using the downstream progress as part of 
its calculation when reporting it as a monitoring info. I know that this 
differs from how we calculate the progress/split point for the 
SplittableFnDataReceiver since the singular fraction needs to take into account 
the downstream progress accurately.
   
   I was always envisioning that work completed/work remaining for the 
monitoring infos always represented the local knowledge of work and didn't take 
into account any downstream/upstream knowledge. We can avoid this issue if we 
merge this logic into the `getProgress` method around line 533.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org