[GitHub] [beam] y1chi commented on a change in pull request #11756: [BEAM-9603] Add timer family support to FnApiDoFnRunner

2020-05-21 Thread GitBox


y1chi commented on a change in pull request #11756:
URL: https://github.com/apache/beam/pull/11756#discussion_r428754395



##
File path: 
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
##
@@ -947,49 +910,213 @@ public void testTimers() throws Exception {
 timerInGlobalWindow("A", new Instant(1600L), new Instant(10012L)),
 timerInGlobalWindow("X", new Instant(1700L), new Instant(10022L)),
 timerInGlobalWindow("C", new Instant(1800L), new Instant(10022L)),
-timerInGlobalWindow("B", new Instant(1900L), new 
Instant(10022L;
+timerInGlobalWindow("B", new Instant(1900L), new Instant(10022L)),
+timerInGlobalWindow("B", new Instant(2000L), new Instant(10032L)),
+timerInGlobalWindow("Y", new Instant(2100L), new 
Instant(10042L;
+assertThat(
+fakeTimerClient.getTimers(eventFamilyTimer),
+contains(
+timerInGlobalWindow("X", "event-timer1", new Instant(1000L), new 
Instant(1003L)),
+timerInGlobalWindow("Y", "event-timer1", new Instant(1100L), new 
Instant(1103L)),
+timerInGlobalWindow("X", "event-timer1", new Instant(1200L), new 
Instant(1203L)),
+timerInGlobalWindow("Y", "event-timer1", new Instant(1300L), new 
Instant(1303L)),
+timerInGlobalWindow("A", "event-timer1", new Instant(1400L), new 
Instant(2413L)),
+timerInGlobalWindow("B", "event-timer1", new Instant(1500L), new 
Instant(2513L)),
+timerInGlobalWindow("A", "event-timer1", new Instant(1600L), new 
Instant(2613L)),
+timerInGlobalWindow("X", "event-timer1", new Instant(1700L), new 
Instant(1723L)),
+timerInGlobalWindow("C", "event-timer1", new Instant(1800L), new 
Instant(1823L)),
+timerInGlobalWindow("B", "event-timer1", new Instant(1900L), new 
Instant(1923L)),
+timerInGlobalWindow("B", "event-timer1", new Instant(2000L), new 
Instant(2033L)),
+timerInGlobalWindow("Y", "event-timer1", new Instant(2100L), new 
Instant(2143L;
+assertThat(
+fakeTimerClient.getTimers(processingFamilyTimer),
+contains(
+timerInGlobalWindow("X", "processing-timer1", new Instant(1000L), 
new Instant(10004L)),
+timerInGlobalWindow("Y", "processing-timer1", new Instant(1100L), 
new Instant(10004L)),
+timerInGlobalWindow("X", "processing-timer1", new Instant(1200L), 
new Instant(10004L)),
+timerInGlobalWindow("Y", "processing-timer1", new Instant(1300L), 
new Instant(10004L)),
+timerInGlobalWindow("A", "processing-timer1", new Instant(1400L), 
new Instant(10014L)),
+timerInGlobalWindow("B", "processing-timer1", new Instant(1500L), 
new Instant(10014L)),
+timerInGlobalWindow("A", "processing-timer1", new Instant(1600L), 
new Instant(10014L)),
+timerInGlobalWindow("X", "processing-timer1", new Instant(1700L), 
new Instant(10024L)),
+timerInGlobalWindow("C", "processing-timer1", new Instant(1800L), 
new Instant(10024L)),
+timerInGlobalWindow("B", "processing-timer1", new Instant(1900L), 
new Instant(10024L)),
+timerInGlobalWindow("B", "processing-timer1", new Instant(2000L), 
new Instant(10034L)),
+timerInGlobalWindow(
+"Y", "processing-timer1", new Instant(2100L), new 
Instant(10044L;
 mainOutputValues.clear();
 
 assertFalse(fakeTimerClient.isOutboundClosed(eventTimer));
 assertFalse(fakeTimerClient.isOutboundClosed(processingTimer));
+assertFalse(fakeTimerClient.isOutboundClosed(eventFamilyTimer));
+assertFalse(fakeTimerClient.isOutboundClosed(processingFamilyTimer));
 fakeTimerClient.closeInbound(eventTimer);
 fakeTimerClient.closeInbound(processingTimer);
+fakeTimerClient.closeInbound(eventFamilyTimer);
+fakeTimerClient.closeInbound(processingFamilyTimer);
 
 Iterables.getOnlyElement(finishFunctionRegistry.getFunctions()).run();
 assertThat(mainOutputValues, empty());
 
 assertTrue(fakeTimerClient.isOutboundClosed(eventTimer));
 assertTrue(fakeTimerClient.isOutboundClosed(processingTimer));
+assertTrue(fakeTimerClient.isOutboundClosed(eventFamilyTimer));
+assertTrue(fakeTimerClient.isOutboundClosed(processingFamilyTimer));
 
 Iterables.getOnlyElement(teardownFunctions).run();
 assertThat(mainOutputValues, empty());
 
 assertEquals(
 ImmutableMap.builder()
 .put(bagUserStateKey("bag", "X"), encode("X0", "X1", "X2", 
"processing"))
-.put(bagUserStateKey("bag", "Y"), encode("Y1", "Y2"))
+.put(bagUserStateKey("bag", "Y"), encode("Y1", "Y2", 
"processing-family"))
 .put(bagUserStateKey("bag", "A"), encode("A0", "event", "event"))
-.put(bagUserStateKey("bag", "B"), encode("event", "processing"))
+.put(bagUserStateKey("bag", "B"), encode("event", "

[GitHub] [beam] y1chi commented on a change in pull request #11756: [BEAM-9603] Add timer family support to FnApiDoFnRunner

2020-05-20 Thread GitBox


y1chi commented on a change in pull request #11756:
URL: https://github.com/apache/beam/pull/11756#discussion_r428242655



##
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##
@@ -460,14 +461,22 @@ public void accept(WindowedValue input) throws Exception {
   // Extract out relevant TimerFamilySpec information in preparation for 
execution.
   for (Map.Entry entry :
   parDoPayload.getTimerFamilySpecsMap().entrySet()) {
-String timerFamilyId = entry.getKey();
-TimeDomain timeDomain =
-DoFnSignatures.getTimerSpecOrThrow(
-doFnSignature.timerDeclarations().get(timerFamilyId), doFn)
-.getTimeDomain();
+String timerIdOrTimerFamilyId = entry.getKey();
+TimeDomain timeDomain;
+if (timerIdOrTimerFamilyId.startsWith(TimerFamilyDeclaration.PREFIX)) {
+  timeDomain =

Review comment:
   done.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] y1chi commented on a change in pull request #11756: [BEAM-9603] Add timer family support to FnApiDoFnRunner

2020-05-20 Thread GitBox


y1chi commented on a change in pull request #11756:
URL: https://github.com/apache/beam/pull/11756#discussion_r428241263



##
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##
@@ -1207,15 +1224,51 @@ private void output(Instant scheduledTime) {
 }
   }
 
-  private static class FnApiTimerMap implements TimerMap {
-FnApiTimerMap() {}
+  private class FnApiTimerMap implements TimerMap {
+private final String timerFamilyId;
+private final K userKey;
+private final TimeDomain timeDomain;
+private final Instant elementTimestampOrTimerHoldTimestamp;
+private final Instant elementTimestampOrTimerFireTimestamp;
+private final BoundedWindow boundedWindow;
+private final PaneInfo paneInfo;
+
+FnApiTimerMap(
+String timerFamilyId,
+K userKey,
+BoundedWindow boundedWindow,
+Instant elementTimestampOrTimerHoldTimestamp,
+Instant elementTimestampOrTimerFireTimestamp,
+PaneInfo paneInfo) {
+  this.timerFamilyId = timerFamilyId;
+  this.userKey = userKey;
+  this.elementTimestampOrTimerHoldTimestamp = 
elementTimestampOrTimerHoldTimestamp;
+  this.elementTimestampOrTimerFireTimestamp = 
elementTimestampOrTimerFireTimestamp;
+  this.boundedWindow = boundedWindow;
+  this.paneInfo = paneInfo;
+
+  TimerFamilyDeclaration timerFamilyDeclaration =
+  doFnSignature.timerFamilyDeclarations().get(timerFamilyId);
+  this.timeDomain =

Review comment:
   done.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] y1chi commented on a change in pull request #11756: [BEAM-9603] Add timer family support to FnApiDoFnRunner

2020-05-20 Thread GitBox


y1chi commented on a change in pull request #11756:
URL: https://github.com/apache/beam/pull/11756#discussion_r428241083



##
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##
@@ -460,14 +461,22 @@ public void accept(WindowedValue input) throws Exception {
   // Extract out relevant TimerFamilySpec information in preparation for 
execution.
   for (Map.Entry entry :
   parDoPayload.getTimerFamilySpecsMap().entrySet()) {
-String timerFamilyId = entry.getKey();
-TimeDomain timeDomain =
-DoFnSignatures.getTimerSpecOrThrow(
-doFnSignature.timerDeclarations().get(timerFamilyId), doFn)
-.getTimeDomain();
+String timerIdOrTimerFamilyId = entry.getKey();
+TimeDomain timeDomain;
+if (timerIdOrTimerFamilyId.startsWith(TimerFamilyDeclaration.PREFIX)) {
+  timeDomain =
+  DoFnSignatures.getTimerFamilySpecOrThrow(
+  
doFnSignature.timerFamilyDeclarations().get(timerIdOrTimerFamilyId), doFn)
+  .getTimeDomain();
+} else {
+  timeDomain =
+  DoFnSignatures.getTimerSpecOrThrow(
+  
doFnSignature.timerDeclarations().get(timerIdOrTimerFamilyId), doFn)
+  .getTimeDomain();
+}
 Coder> timerCoder =
 (Coder) 
rehydratedComponents.getCoder(entry.getValue().getTimerFamilyCoderId());
-timerFamilyInfosBuilder.put(timerFamilyId, KV.of(timeDomain, 
timerCoder));
+timerFamilyInfosBuilder.put(timerIdOrTimerFamilyId, KV.of(timeDomain, 
timerCoder));

Review comment:
   done.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] y1chi commented on a change in pull request #11756: [BEAM-9603] Add timer family support to FnApiDoFnRunner

2020-05-19 Thread GitBox


y1chi commented on a change in pull request #11756:
URL: https://github.com/apache/beam/pull/11756#discussion_r427759019



##
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##
@@ -1207,15 +1224,51 @@ private void output(Instant scheduledTime) {
 }
   }
 
-  private static class FnApiTimerMap implements TimerMap {
-FnApiTimerMap() {}
+  private class FnApiTimerMap implements TimerMap {
+private final String timerFamilyId;
+private final K userKey;
+private final TimeDomain timeDomain;
+private final Instant elementTimestampOrTimerHoldTimestamp;
+private final Instant elementTimestampOrTimerFireTimestamp;
+private final BoundedWindow boundedWindow;
+private final PaneInfo paneInfo;
+
+FnApiTimerMap(
+String timerFamilyId,
+K userKey,
+BoundedWindow boundedWindow,
+Instant elementTimestampOrTimerHoldTimestamp,
+Instant elementTimestampOrTimerFireTimestamp,
+PaneInfo paneInfo) {
+  this.timerFamilyId = timerFamilyId;
+  this.userKey = userKey;
+  this.elementTimestampOrTimerHoldTimestamp = 
elementTimestampOrTimerHoldTimestamp;
+  this.elementTimestampOrTimerFireTimestamp = 
elementTimestampOrTimerFireTimestamp;
+  this.boundedWindow = boundedWindow;
+  this.paneInfo = paneInfo;
+
+  TimerFamilyDeclaration timerFamilyDeclaration =
+  doFnSignature.timerFamilyDeclarations().get(timerFamilyId);
+  this.timeDomain =
+  DoFnSignatures.getTimerFamilySpecOrThrow(timerFamilyDeclaration, 
doFn).getTimeDomain();
+}
 
 @Override
-public void set(String timerId, Instant absoluteTime) {}
+public void set(String dynamicTimerTag, Instant absoluteTime) {

Review comment:
   probably makes sense to keep the set shortcut since it is the most 
frequently used one.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] y1chi commented on a change in pull request #11756: [BEAM-9603] Add timer family support to FnApiDoFnRunner

2020-05-19 Thread GitBox


y1chi commented on a change in pull request #11756:
URL: https://github.com/apache/beam/pull/11756#discussion_r427758607



##
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##
@@ -1207,15 +1224,51 @@ private void output(Instant scheduledTime) {
 }
   }
 
-  private static class FnApiTimerMap implements TimerMap {
-FnApiTimerMap() {}
+  private class FnApiTimerMap implements TimerMap {
+private final String timerFamilyId;
+private final K userKey;
+private final TimeDomain timeDomain;
+private final Instant elementTimestampOrTimerHoldTimestamp;
+private final Instant elementTimestampOrTimerFireTimestamp;
+private final BoundedWindow boundedWindow;
+private final PaneInfo paneInfo;
+
+FnApiTimerMap(
+String timerFamilyId,
+K userKey,
+BoundedWindow boundedWindow,
+Instant elementTimestampOrTimerHoldTimestamp,
+Instant elementTimestampOrTimerFireTimestamp,
+PaneInfo paneInfo) {
+  this.timerFamilyId = timerFamilyId;
+  this.userKey = userKey;
+  this.elementTimestampOrTimerHoldTimestamp = 
elementTimestampOrTimerHoldTimestamp;
+  this.elementTimestampOrTimerFireTimestamp = 
elementTimestampOrTimerFireTimestamp;
+  this.boundedWindow = boundedWindow;
+  this.paneInfo = paneInfo;
+
+  TimerFamilyDeclaration timerFamilyDeclaration =
+  doFnSignature.timerFamilyDeclarations().get(timerFamilyId);
+  this.timeDomain =
+  DoFnSignatures.getTimerFamilySpecOrThrow(timerFamilyDeclaration, 
doFn).getTimeDomain();
+}
 
 @Override
-public void set(String timerId, Instant absoluteTime) {}
+public void set(String dynamicTimerTag, Instant absoluteTime) {

Review comment:
   This is the interface required by TimerMap though: 
https://github.com/apache/beam/blob/9108832cf1cb57161997e16190dbc6eccdc10492/sdks/java/core/src/main/java/org/apache/beam/sdk/state/TimerMap.java#L25





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] y1chi commented on a change in pull request #11756: [BEAM-9603] Add timer family support to FnApiDoFnRunner

2020-05-19 Thread GitBox


y1chi commented on a change in pull request #11756:
URL: https://github.com/apache/beam/pull/11756#discussion_r427738041



##
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##
@@ -1207,15 +1224,51 @@ private void output(Instant scheduledTime) {
 }
   }
 
-  private static class FnApiTimerMap implements TimerMap {
-FnApiTimerMap() {}
+  private class FnApiTimerMap implements TimerMap {
+private final String timerFamilyId;
+private final K userKey;
+private final TimeDomain timeDomain;
+private final Instant elementTimestampOrTimerHoldTimestamp;
+private final Instant elementTimestampOrTimerFireTimestamp;
+private final BoundedWindow boundedWindow;
+private final PaneInfo paneInfo;
+
+FnApiTimerMap(
+String timerFamilyId,
+K userKey,
+BoundedWindow boundedWindow,
+Instant elementTimestampOrTimerHoldTimestamp,
+Instant elementTimestampOrTimerFireTimestamp,
+PaneInfo paneInfo) {
+  this.timerFamilyId = timerFamilyId;
+  this.userKey = userKey;
+  this.elementTimestampOrTimerHoldTimestamp = 
elementTimestampOrTimerHoldTimestamp;
+  this.elementTimestampOrTimerFireTimestamp = 
elementTimestampOrTimerFireTimestamp;
+  this.boundedWindow = boundedWindow;
+  this.paneInfo = paneInfo;
+
+  TimerFamilyDeclaration timerFamilyDeclaration =
+  doFnSignature.timerFamilyDeclarations().get(timerFamilyId);
+  this.timeDomain =
+  DoFnSignatures.getTimerFamilySpecOrThrow(timerFamilyDeclaration, 
doFn).getTimeDomain();
+}
 
 @Override
-public void set(String timerId, Instant absoluteTime) {}
+public void set(String dynamicTimerTag, Instant absoluteTime) {

Review comment:
   I believe we can always call the get() first to access FnApiTimer and 
call it's APIs. Probably that's sufficient enough? I feel adding more shortcuts 
only makes the API slightly more user-friendly.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] y1chi commented on a change in pull request #11756: [BEAM-9603] Add timer family support to FnApiDoFnRunner

2020-05-19 Thread GitBox


y1chi commented on a change in pull request #11756:
URL: https://github.com/apache/beam/pull/11756#discussion_r427737289



##
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##
@@ -962,16 +971,25 @@ private Progress getProgress() {
 .build());
   }
 
-  private  void processTimer(String timerId, TimeDomain timeDomain, 
Timer timer) {
+  private  void processTimer(
+  String timerIdOrTimerFamilyId, TimeDomain timeDomain, Timer timer) {
 currentTimer = timer;
 currentTimeDomain = timeDomain;
 onTimerContext = new OnTimerContext<>(timer.getUserKey());
+String timerId =
+timerIdOrTimerFamilyId.startsWith(TimerFamilyDeclaration.PREFIX)

Review comment:
   it will be ignored anyway, apparently only one of timerId or the 
timerFamilyId takes effect.
   
https://github.com/apache/beam/blob/591de3473144de54beef0932131025e2a4d8504b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java#L223





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org