[GitHub] [beam] lukecwik commented on a change in pull request #12419: [BEAM-10303] Handle split when truncate observes windows.

2020-08-26 Thread GitBox


lukecwik commented on a change in pull request #12419:
URL: https://github.com/apache/beam/pull/12419#discussion_r477506877



##
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##
@@ -1080,136 +1121,484 @@ private static Progress scaleProgress(
 return Progress.from(completed, remaining);
   }
 
+  private WindowedSplitResult calculateRestrictionSize(
+  WindowedSplitResult splitResult, String errorContext) {
+double fullSize =
+splitResult.getResidualInUnprocessedWindowsRoot() == null
+&& splitResult.getPrimaryInFullyProcessedWindowsRoot() == null
+? 0
+: doFnInvoker.invokeGetSize(
+new DelegatingArgumentProvider(processContext, errorContext) {
+  @Override
+  public Object restriction() {
+return currentRestriction;
+  }
+
+  @Override
+  public RestrictionTracker restrictionTracker() {
+return doFnInvoker.invokeNewTracker(this);
+  }
+});
+double primarySize =
+splitResult.getPrimarySplitRoot() == null
+? 0
+: doFnInvoker.invokeGetSize(
+new DelegatingArgumentProvider(processContext, errorContext) {
+  @Override
+  public Object restriction() {
+return ((KV>) 
splitResult.getPrimarySplitRoot().getValue())
+.getValue()
+.getKey();
+  }
+
+  @Override
+  public RestrictionTracker restrictionTracker() {
+return doFnInvoker.invokeNewTracker(this);
+  }
+});
+double residualSize =
+splitResult.getResidualSplitRoot() == null
+? 0
+: doFnInvoker.invokeGetSize(
+new DelegatingArgumentProvider(processContext, errorContext) {
+  @Override
+  public Object restriction() {
+return ((KV>) 
splitResult.getResidualSplitRoot().getValue())
+.getValue()
+.getKey();
+  }
+
+  @Override
+  public RestrictionTracker restrictionTracker() {
+return doFnInvoker.invokeNewTracker(this);
+  }
+});
+return WindowedSplitResult.forRoots(
+splitResult.getPrimaryInFullyProcessedWindowsRoot() == null
+? null
+: WindowedValue.of(
+
KV.of(splitResult.getPrimaryInFullyProcessedWindowsRoot().getValue(), fullSize),
+
splitResult.getPrimaryInFullyProcessedWindowsRoot().getTimestamp(),
+
splitResult.getPrimaryInFullyProcessedWindowsRoot().getWindows(),
+splitResult.getPrimaryInFullyProcessedWindowsRoot().getPane()),
+splitResult.getPrimarySplitRoot() == null
+? null
+: WindowedValue.of(
+KV.of(splitResult.getPrimarySplitRoot().getValue(), 
primarySize),
+splitResult.getPrimarySplitRoot().getTimestamp(),
+splitResult.getPrimarySplitRoot().getWindows(),
+splitResult.getPrimarySplitRoot().getPane()),
+splitResult.getResidualSplitRoot() == null
+? null
+: WindowedValue.of(
+KV.of(splitResult.getResidualSplitRoot().getValue(), 
residualSize),
+splitResult.getResidualSplitRoot().getTimestamp(),
+splitResult.getResidualSplitRoot().getWindows(),
+splitResult.getResidualSplitRoot().getPane()),
+splitResult.getResidualInUnprocessedWindowsRoot() == null
+? null
+: WindowedValue.of(
+
KV.of(splitResult.getResidualInUnprocessedWindowsRoot().getValue(), fullSize),
+
splitResult.getResidualInUnprocessedWindowsRoot().getTimestamp(),
+splitResult.getResidualInUnprocessedWindowsRoot().getWindows(),
+splitResult.getResidualInUnprocessedWindowsRoot().getPane()));
+  }
+
+  @VisibleForTesting
+  static 
+  KV, Integer> 
trySplitForTruncate(

Review comment:
   I think since we have all the tests we can merge it as is and do a 
follow-up exploring clean-up options.
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] lukecwik commented on a change in pull request #12419: [BEAM-10303] Handle split when truncate observes windows.

2020-08-26 Thread GitBox


lukecwik commented on a change in pull request #12419:
URL: https://github.com/apache/beam/pull/12419#discussion_r477472322



##
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/HandlesSplits.java
##
@@ -35,6 +36,12 @@
   /** Returns the current progress of the active element as a fraction between 
0.0 and 1.0. */
   double getProgress();
 
+  String getPtranformId();

Review comment:
   I believe we should be using the transform ids from the local transform. 
In the window observing truncate case where we have both element splits from 
`process` and window splits from `truncate` then:
   * the element splits should use the transform/input id from `process`
   * the whole window splits should use the transform/input id from `truncate`
   
   We may need changes on Dataflow runner v2 to make sure it supports this.
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] lukecwik commented on a change in pull request #12419: [BEAM-10303] Handle split when truncate observes windows.

2020-08-26 Thread GitBox


lukecwik commented on a change in pull request #12419:
URL: https://github.com/apache/beam/pull/12419#discussion_r477470639



##
File path: 
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
##
@@ -133, +138,4337 @@
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
 import org.hamcrest.collection.IsMapContaining;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
+import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /** Tests for {@link FnApiDoFnRunner}. */
-@RunWith(JUnit4.class)
+@RunWith(Enclosed.class)

Review comment:
   I did the review, we can disregard this comment.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] lukecwik commented on a change in pull request #12419: [BEAM-10303] Handle split when truncate observes windows.

2020-08-25 Thread GitBox


lukecwik commented on a change in pull request #12419:
URL: https://github.com/apache/beam/pull/12419#discussion_r477022288



##
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##
@@ -1237,8 +1639,10 @@ public Object restriction() {
   .setTransformId(pTransformId)
   .setInputId(mainInputId)
   .setElement(bytesOut.toByteString());
-  // We don't want to change the output watermarks or set the checkpoint 
resume time since
-  // that applies to the current window.
+  if (!outputWatermarkMapForUnprocessedWindows.isEmpty()) {

Review comment:
   putAllOutputWatermarks should do nothing if the input map is empty so 
the `if` check is extraneous.

##
File path: 
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
##
@@ -133, +138,4337 @@
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
 import org.hamcrest.collection.IsMapContaining;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
+import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /** Tests for {@link FnApiDoFnRunner}. */
-@RunWith(JUnit4.class)
+@RunWith(Enclosed.class)

Review comment:
   This is very hard to review, could we separate out the creation of the 
single enclosed class containing the existing tests as a separate commit.

##
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##
@@ -247,8 +247,7 @@
   /** Only valid during {@code processElement...} methods, null otherwise. */
   private WindowedValue currentElement;
 
-  /** Only valid during {@link #processElementForSizedElementAndRestriction}. 
*/
-  private ListIterator currentWindowIterator;
+  private List currentWindows;

Review comment:
   Please add a comment stating the lifetime of currentWindows

##
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/HandlesSplits.java
##
@@ -35,6 +36,12 @@
   /** Returns the current progress of the active element as a fraction between 
0.0 and 1.0. */
   double getProgress();
 
+  String getPtranformId();

Review comment:
   I was under the impression that we would be able to pass this 
information forward locally through the method without needing to expose it 
within HandleSplits.

##
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##
@@ -1237,8 +1639,10 @@ public Object restriction() {
   .setTransformId(pTransformId)
   .setInputId(mainInputId)
   .setElement(bytesOut.toByteString());
-  // We don't want to change the output watermarks or set the checkpoint 
resume time since
-  // that applies to the current window.
+  if (!outputWatermarkMapForUnprocessedWindows.isEmpty()) {

Review comment:
   We should leave the comment since it still makes sense. We could update 
it to just state that we are using the initial watermark for the output 
watermarks.

##
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##
@@ -1080,136 +1121,484 @@ private static Progress scaleProgress(
 return Progress.from(completed, remaining);
   }
 
+  private WindowedSplitResult calculateRestrictionSize(
+  WindowedSplitResult splitResult, String errorContext) {
+double fullSize =
+splitResult.getResidualInUnprocessedWindowsRoot() == null
+&& splitResult.getPrimaryInFullyProcessedWindowsRoot() == null
+? 0
+: doFnInvoker.invokeGetSize(
+new DelegatingArgumentProvider(processContext, errorContext) {
+  @Override
+  public Object restriction() {
+return currentRestriction;
+  }
+
+  @Override
+  public RestrictionTracker restrictionTracker() {
+return doFnInvoker.invokeNewTracker(this);
+  }
+});
+double primarySize =
+splitResult.getPrimarySplitRoot() == null
+? 0
+: doFnInvoker.invokeGetSize(
+new DelegatingArgumentProvider(processContext, errorContext) {
+  @Override
+  public Object restriction() {
+return ((KV>) 

[GitHub] [beam] lukecwik commented on a change in pull request #12419: [BEAM-10303] Handle split when truncate observes windows.

2020-08-03 Thread GitBox


lukecwik commented on a change in pull request #12419:
URL: https://github.com/apache/beam/pull/12419#discussion_r464699645



##
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##
@@ -1193,6 +1328,7 @@ public Object restriction() {
   .setElement(bytesOut.toByteString());
   // We don't want to change the output watermarks or set the checkpoint 
resume time since
   // that applies to the current window.
+  // TODO: Consider using currentWatermark in unprocessed window?

Review comment:
   resolved since this suggestion was not optimal.

##
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##
@@ -1193,6 +1328,7 @@ public Object restriction() {
   .setElement(bytesOut.toByteString());
   // We don't want to change the output watermarks or set the checkpoint 
resume time since
   // that applies to the current window.
+  // TODO: Consider using currentWatermark in unprocessed window?

Review comment:
   resolved since this suggestion was not right.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] lukecwik commented on a change in pull request #12419: [BEAM-10303] Handle split when truncate observes windows.

2020-08-03 Thread GitBox


lukecwik commented on a change in pull request #12419:
URL: https://github.com/apache/beam/pull/12419#discussion_r464699501



##
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##
@@ -1035,6 +1046,130 @@ private Progress getProgress() {
 return null;
   }
 
+  private HandlesSplits.SplitResult 
trySplitForWindowObservingTruncateRestriction(
+  double fractionOfRemainder, HandlesSplits splitDelegate) {

Review comment:
   Yeah, that would make sense and would allow us to share a bunch of code 
between the truncate split logic and the process sized elements split logic.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] lukecwik commented on a change in pull request #12419: [BEAM-10303] Handle split when truncate observes windows.

2020-08-03 Thread GitBox


lukecwik commented on a change in pull request #12419:
URL: https://github.com/apache/beam/pull/12419#discussion_r464699157



##
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##
@@ -1035,6 +1046,130 @@ private Progress getProgress() {
 return null;
   }
 
+  private HandlesSplits.SplitResult 
trySplitForWindowObservingTruncateRestriction(
+  double fractionOfRemainder, HandlesSplits splitDelegate) {
+WindowedValue primaryInFullyProcessedWindowsRoot;
+WindowedValue residualInUnprocessedWindowsRoot;
+// Note that the assumption here is the fullInputCoder of the Truncate 
transform should be the
+// the same as the SDF/Process transform.
+Coder fullInputCoder = WindowedValue.getFullCoder(inputCoder, windowCoder);
+BundleApplication windowedPrimaryRoot;
+DelayedBundleApplication windowedResidualRoot;
+synchronized (splitLock) {
+  // There is nothing to split if we are between truncate processing calls.
+  if (currentWindowIterator == null) {
+return null;
+  }
+  HandlesSplits.SplitResult splitResult = 
splitDelegate.trySplit(fractionOfRemainder);
+  if (splitResult == null) {
+return null;
+  }
+
+  windowedPrimaryRoot = 
Iterables.getOnlyElement(splitResult.getPrimaryRoots());
+  windowedResidualRoot = 
Iterables.getOnlyElement(splitResult.getResidualRoots());
+
+  // We have a successful split from downstream sdf process.
+  // Convert the split taking into account the processed windows, the 
current window and the
+  // yet to be processed windows.
+  List primaryFullyProcessedWindows =
+  ImmutableList.copyOf(
+  Iterables.limit(currentElement.getWindows(), 
currentWindowIterator.previousIndex()));
+  // Advances the iterator consuming the remaining windows.
+  List residualUnprocessedWindows = 
ImmutableList.copyOf(currentWindowIterator);
+  // If the window has been observed then the splitAndSize method would 
have already
+  // output sizes for each window separately.
+  //
+  // TODO: Consider using the original size on the element instead of 
recomputing
+  // this here.
+  double fullSize =
+  primaryFullyProcessedWindows.isEmpty() && 
residualUnprocessedWindows.isEmpty()
+  ? 0
+  : doFnInvoker.invokeGetSize(
+  new DelegatingArgumentProvider(
+  processContext,
+  
PTransformTranslation.SPLITTABLE_TRUNCATE_SIZED_RESTRICTION_URN
+  + "/GetPrimarySize") {
+@Override
+public Object restriction() {
+  return currentRestriction;
+}
+
+@Override
+public RestrictionTracker restrictionTracker() {
+  return doFnInvoker.invokeNewTracker(this);
+}
+  });
+  primaryInFullyProcessedWindowsRoot =
+  primaryFullyProcessedWindows.isEmpty()
+  ? null
+  : WindowedValue.of(
+  KV.of(
+  KV.of(
+  currentElement.getValue(),
+  KV.of(currentRestriction, 
currentWatermarkEstimatorState)),
+  fullSize),
+  currentElement.getTimestamp(),
+  primaryFullyProcessedWindows,
+  currentElement.getPane());
+  residualInUnprocessedWindowsRoot =
+  residualUnprocessedWindows.isEmpty()
+  ? null
+  : WindowedValue.of(
+  KV.of(
+  KV.of(
+  currentElement.getValue(),
+  KV.of(currentRestriction, 
currentWatermarkEstimatorState)),
+  fullSize),
+  currentElement.getTimestamp(),
+  residualUnprocessedWindows,
+  currentElement.getPane());
+}
+
+List primaryRoots = new ArrayList<>();
+List residualRoots = new ArrayList<>();
+
+if (primaryInFullyProcessedWindowsRoot != null) {
+  ByteString.Output primaryInOtherWindowsBytes = ByteString.newOutput();
+  try {
+fullInputCoder.encode(primaryInFullyProcessedWindowsRoot, 
primaryInOtherWindowsBytes);
+  } catch (IOException e) {
+throw new RuntimeException(e);
+  }
+  BundleApplication.Builder primaryApplicationInOtherWindows =
+  BundleApplication.newBuilder()
+  .setTransformId(windowedPrimaryRoot.getTransformId())
+  .setInputId(windowedPrimaryRoot.getInputId())
+  .setElement(primaryInOtherWindowsBytes.toByteString());
+  primaryRoots.add(primaryApplicationInOtherWindows.build());
+}
+if (residualInUnprocessedWindowsRoot != null) {
+  ByteString.Output residualInUnprocessedWindowsBytesOut = 

[GitHub] [beam] lukecwik commented on a change in pull request #12419: [BEAM-10303] Handle split when truncate observes windows.

2020-08-03 Thread GitBox


lukecwik commented on a change in pull request #12419:
URL: https://github.com/apache/beam/pull/12419#discussion_r464651698



##
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##
@@ -515,15 +515,18 @@
   && Iterables.get(mainOutputConsumers, 0) instanceof 
HandlesSplits) {
 mainInputConsumer =
 new SplittableFnDataReceiver() {
+  private final HandlesSplits splitDelegate =
+  (HandlesSplits) Iterables.get(mainOutputConsumers, 0);

Review comment:
   nit: here and below on 550
   ```suggestion
 (HandlesSplits) 
Iterables.getOnlyElement(mainOutputConsumers);
   ```

##
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##
@@ -1035,6 +1046,130 @@ private Progress getProgress() {
 return null;
   }
 
+  private HandlesSplits.SplitResult 
trySplitForWindowObservingTruncateRestriction(
+  double fractionOfRemainder, HandlesSplits splitDelegate) {
+WindowedValue primaryInFullyProcessedWindowsRoot;
+WindowedValue residualInUnprocessedWindowsRoot;
+// Note that the assumption here is the fullInputCoder of the Truncate 
transform should be the
+// the same as the SDF/Process transform.
+Coder fullInputCoder = WindowedValue.getFullCoder(inputCoder, windowCoder);
+BundleApplication windowedPrimaryRoot;
+DelayedBundleApplication windowedResidualRoot;
+synchronized (splitLock) {
+  // There is nothing to split if we are between truncate processing calls.
+  if (currentWindowIterator == null) {
+return null;
+  }
+  HandlesSplits.SplitResult splitResult = 
splitDelegate.trySplit(fractionOfRemainder);
+  if (splitResult == null) {
+return null;
+  }
+
+  windowedPrimaryRoot = 
Iterables.getOnlyElement(splitResult.getPrimaryRoots());
+  windowedResidualRoot = 
Iterables.getOnlyElement(splitResult.getResidualRoots());
+
+  // We have a successful split from downstream sdf process.
+  // Convert the split taking into account the processed windows, the 
current window and the
+  // yet to be processed windows.
+  List primaryFullyProcessedWindows =
+  ImmutableList.copyOf(
+  Iterables.limit(currentElement.getWindows(), 
currentWindowIterator.previousIndex()));
+  // Advances the iterator consuming the remaining windows.
+  List residualUnprocessedWindows = 
ImmutableList.copyOf(currentWindowIterator);
+  // If the window has been observed then the splitAndSize method would 
have already
+  // output sizes for each window separately.
+  //
+  // TODO: Consider using the original size on the element instead of 
recomputing
+  // this here.
+  double fullSize =
+  primaryFullyProcessedWindows.isEmpty() && 
residualUnprocessedWindows.isEmpty()
+  ? 0
+  : doFnInvoker.invokeGetSize(
+  new DelegatingArgumentProvider(
+  processContext,
+  
PTransformTranslation.SPLITTABLE_TRUNCATE_SIZED_RESTRICTION_URN
+  + "/GetPrimarySize") {
+@Override
+public Object restriction() {
+  return currentRestriction;
+}
+
+@Override
+public RestrictionTracker restrictionTracker() {
+  return doFnInvoker.invokeNewTracker(this);
+}
+  });
+  primaryInFullyProcessedWindowsRoot =
+  primaryFullyProcessedWindows.isEmpty()
+  ? null
+  : WindowedValue.of(
+  KV.of(
+  KV.of(
+  currentElement.getValue(),
+  KV.of(currentRestriction, 
currentWatermarkEstimatorState)),
+  fullSize),
+  currentElement.getTimestamp(),
+  primaryFullyProcessedWindows,
+  currentElement.getPane());
+  residualInUnprocessedWindowsRoot =
+  residualUnprocessedWindows.isEmpty()
+  ? null
+  : WindowedValue.of(
+  KV.of(
+  KV.of(
+  currentElement.getValue(),
+  KV.of(currentRestriction, 
currentWatermarkEstimatorState)),
+  fullSize),
+  currentElement.getTimestamp(),
+  residualUnprocessedWindows,
+  currentElement.getPane());
+}
+
+List primaryRoots = new ArrayList<>();
+List residualRoots = new ArrayList<>();
+
+if (primaryInFullyProcessedWindowsRoot != null) {
+  ByteString.Output primaryInOtherWindowsBytes = ByteString.newOutput();
+  try {
+fullInputCoder.encode(primaryInFullyProcessedWindowsRoot,