[GitHub] [beam] y1chi commented on a change in pull request #11756: [BEAM-9603] Add timer family support to FnApiDoFnRunner
y1chi commented on a change in pull request #11756: URL: https://github.com/apache/beam/pull/11756#discussion_r428754395 ## File path: sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java ## @@ -947,49 +910,213 @@ public void testTimers() throws Exception { timerInGlobalWindow("A", new Instant(1600L), new Instant(10012L)), timerInGlobalWindow("X", new Instant(1700L), new Instant(10022L)), timerInGlobalWindow("C", new Instant(1800L), new Instant(10022L)), -timerInGlobalWindow("B", new Instant(1900L), new Instant(10022L; +timerInGlobalWindow("B", new Instant(1900L), new Instant(10022L)), +timerInGlobalWindow("B", new Instant(2000L), new Instant(10032L)), +timerInGlobalWindow("Y", new Instant(2100L), new Instant(10042L; +assertThat( +fakeTimerClient.getTimers(eventFamilyTimer), +contains( +timerInGlobalWindow("X", "event-timer1", new Instant(1000L), new Instant(1003L)), +timerInGlobalWindow("Y", "event-timer1", new Instant(1100L), new Instant(1103L)), +timerInGlobalWindow("X", "event-timer1", new Instant(1200L), new Instant(1203L)), +timerInGlobalWindow("Y", "event-timer1", new Instant(1300L), new Instant(1303L)), +timerInGlobalWindow("A", "event-timer1", new Instant(1400L), new Instant(2413L)), +timerInGlobalWindow("B", "event-timer1", new Instant(1500L), new Instant(2513L)), +timerInGlobalWindow("A", "event-timer1", new Instant(1600L), new Instant(2613L)), +timerInGlobalWindow("X", "event-timer1", new Instant(1700L), new Instant(1723L)), +timerInGlobalWindow("C", "event-timer1", new Instant(1800L), new Instant(1823L)), +timerInGlobalWindow("B", "event-timer1", new Instant(1900L), new Instant(1923L)), +timerInGlobalWindow("B", "event-timer1", new Instant(2000L), new Instant(2033L)), +timerInGlobalWindow("Y", "event-timer1", new Instant(2100L), new Instant(2143L; +assertThat( +fakeTimerClient.getTimers(processingFamilyTimer), +contains( +timerInGlobalWindow("X", "processing-timer1", new Instant(1000L), new Instant(10004L)), +timerInGlobalWindow("Y", "processing-timer1", new Instant(1100L), new Instant(10004L)), +timerInGlobalWindow("X", "processing-timer1", new Instant(1200L), new Instant(10004L)), +timerInGlobalWindow("Y", "processing-timer1", new Instant(1300L), new Instant(10004L)), +timerInGlobalWindow("A", "processing-timer1", new Instant(1400L), new Instant(10014L)), +timerInGlobalWindow("B", "processing-timer1", new Instant(1500L), new Instant(10014L)), +timerInGlobalWindow("A", "processing-timer1", new Instant(1600L), new Instant(10014L)), +timerInGlobalWindow("X", "processing-timer1", new Instant(1700L), new Instant(10024L)), +timerInGlobalWindow("C", "processing-timer1", new Instant(1800L), new Instant(10024L)), +timerInGlobalWindow("B", "processing-timer1", new Instant(1900L), new Instant(10024L)), +timerInGlobalWindow("B", "processing-timer1", new Instant(2000L), new Instant(10034L)), +timerInGlobalWindow( +"Y", "processing-timer1", new Instant(2100L), new Instant(10044L; mainOutputValues.clear(); assertFalse(fakeTimerClient.isOutboundClosed(eventTimer)); assertFalse(fakeTimerClient.isOutboundClosed(processingTimer)); +assertFalse(fakeTimerClient.isOutboundClosed(eventFamilyTimer)); +assertFalse(fakeTimerClient.isOutboundClosed(processingFamilyTimer)); fakeTimerClient.closeInbound(eventTimer); fakeTimerClient.closeInbound(processingTimer); +fakeTimerClient.closeInbound(eventFamilyTimer); +fakeTimerClient.closeInbound(processingFamilyTimer); Iterables.getOnlyElement(finishFunctionRegistry.getFunctions()).run(); assertThat(mainOutputValues, empty()); assertTrue(fakeTimerClient.isOutboundClosed(eventTimer)); assertTrue(fakeTimerClient.isOutboundClosed(processingTimer)); +assertTrue(fakeTimerClient.isOutboundClosed(eventFamilyTimer)); +assertTrue(fakeTimerClient.isOutboundClosed(processingFamilyTimer)); Iterables.getOnlyElement(teardownFunctions).run(); assertThat(mainOutputValues, empty()); assertEquals( ImmutableMap.builder() .put(bagUserStateKey("bag", "X"), encode("X0", "X1", "X2", "processing")) -.put(bagUserStateKey("bag", "Y"), encode("Y1", "Y2")) +.put(bagUserStateKey("bag", "Y"), encode("Y1", "Y2", "processing-family")) .put(bagUserStateKey("bag", "A"), encode("A0", "event", "event")) -.put(bagUserStateKey("bag", "B"), encode("event", "processing")) +.put(bagUserStateKey("bag", "B"), encode("event", "
[GitHub] [beam] y1chi commented on a change in pull request #11756: [BEAM-9603] Add timer family support to FnApiDoFnRunner
y1chi commented on a change in pull request #11756: URL: https://github.com/apache/beam/pull/11756#discussion_r428242655 ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java ## @@ -460,14 +461,22 @@ public void accept(WindowedValue input) throws Exception { // Extract out relevant TimerFamilySpec information in preparation for execution. for (Map.Entry entry : parDoPayload.getTimerFamilySpecsMap().entrySet()) { -String timerFamilyId = entry.getKey(); -TimeDomain timeDomain = -DoFnSignatures.getTimerSpecOrThrow( -doFnSignature.timerDeclarations().get(timerFamilyId), doFn) -.getTimeDomain(); +String timerIdOrTimerFamilyId = entry.getKey(); +TimeDomain timeDomain; +if (timerIdOrTimerFamilyId.startsWith(TimerFamilyDeclaration.PREFIX)) { + timeDomain = Review comment: done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] y1chi commented on a change in pull request #11756: [BEAM-9603] Add timer family support to FnApiDoFnRunner
y1chi commented on a change in pull request #11756: URL: https://github.com/apache/beam/pull/11756#discussion_r428241263 ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java ## @@ -1207,15 +1224,51 @@ private void output(Instant scheduledTime) { } } - private static class FnApiTimerMap implements TimerMap { -FnApiTimerMap() {} + private class FnApiTimerMap implements TimerMap { +private final String timerFamilyId; +private final K userKey; +private final TimeDomain timeDomain; +private final Instant elementTimestampOrTimerHoldTimestamp; +private final Instant elementTimestampOrTimerFireTimestamp; +private final BoundedWindow boundedWindow; +private final PaneInfo paneInfo; + +FnApiTimerMap( +String timerFamilyId, +K userKey, +BoundedWindow boundedWindow, +Instant elementTimestampOrTimerHoldTimestamp, +Instant elementTimestampOrTimerFireTimestamp, +PaneInfo paneInfo) { + this.timerFamilyId = timerFamilyId; + this.userKey = userKey; + this.elementTimestampOrTimerHoldTimestamp = elementTimestampOrTimerHoldTimestamp; + this.elementTimestampOrTimerFireTimestamp = elementTimestampOrTimerFireTimestamp; + this.boundedWindow = boundedWindow; + this.paneInfo = paneInfo; + + TimerFamilyDeclaration timerFamilyDeclaration = + doFnSignature.timerFamilyDeclarations().get(timerFamilyId); + this.timeDomain = Review comment: done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] y1chi commented on a change in pull request #11756: [BEAM-9603] Add timer family support to FnApiDoFnRunner
y1chi commented on a change in pull request #11756: URL: https://github.com/apache/beam/pull/11756#discussion_r428241083 ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java ## @@ -460,14 +461,22 @@ public void accept(WindowedValue input) throws Exception { // Extract out relevant TimerFamilySpec information in preparation for execution. for (Map.Entry entry : parDoPayload.getTimerFamilySpecsMap().entrySet()) { -String timerFamilyId = entry.getKey(); -TimeDomain timeDomain = -DoFnSignatures.getTimerSpecOrThrow( -doFnSignature.timerDeclarations().get(timerFamilyId), doFn) -.getTimeDomain(); +String timerIdOrTimerFamilyId = entry.getKey(); +TimeDomain timeDomain; +if (timerIdOrTimerFamilyId.startsWith(TimerFamilyDeclaration.PREFIX)) { + timeDomain = + DoFnSignatures.getTimerFamilySpecOrThrow( + doFnSignature.timerFamilyDeclarations().get(timerIdOrTimerFamilyId), doFn) + .getTimeDomain(); +} else { + timeDomain = + DoFnSignatures.getTimerSpecOrThrow( + doFnSignature.timerDeclarations().get(timerIdOrTimerFamilyId), doFn) + .getTimeDomain(); +} Coder> timerCoder = (Coder) rehydratedComponents.getCoder(entry.getValue().getTimerFamilyCoderId()); -timerFamilyInfosBuilder.put(timerFamilyId, KV.of(timeDomain, timerCoder)); +timerFamilyInfosBuilder.put(timerIdOrTimerFamilyId, KV.of(timeDomain, timerCoder)); Review comment: done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] y1chi commented on a change in pull request #11756: [BEAM-9603] Add timer family support to FnApiDoFnRunner
y1chi commented on a change in pull request #11756: URL: https://github.com/apache/beam/pull/11756#discussion_r427759019 ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java ## @@ -1207,15 +1224,51 @@ private void output(Instant scheduledTime) { } } - private static class FnApiTimerMap implements TimerMap { -FnApiTimerMap() {} + private class FnApiTimerMap implements TimerMap { +private final String timerFamilyId; +private final K userKey; +private final TimeDomain timeDomain; +private final Instant elementTimestampOrTimerHoldTimestamp; +private final Instant elementTimestampOrTimerFireTimestamp; +private final BoundedWindow boundedWindow; +private final PaneInfo paneInfo; + +FnApiTimerMap( +String timerFamilyId, +K userKey, +BoundedWindow boundedWindow, +Instant elementTimestampOrTimerHoldTimestamp, +Instant elementTimestampOrTimerFireTimestamp, +PaneInfo paneInfo) { + this.timerFamilyId = timerFamilyId; + this.userKey = userKey; + this.elementTimestampOrTimerHoldTimestamp = elementTimestampOrTimerHoldTimestamp; + this.elementTimestampOrTimerFireTimestamp = elementTimestampOrTimerFireTimestamp; + this.boundedWindow = boundedWindow; + this.paneInfo = paneInfo; + + TimerFamilyDeclaration timerFamilyDeclaration = + doFnSignature.timerFamilyDeclarations().get(timerFamilyId); + this.timeDomain = + DoFnSignatures.getTimerFamilySpecOrThrow(timerFamilyDeclaration, doFn).getTimeDomain(); +} @Override -public void set(String timerId, Instant absoluteTime) {} +public void set(String dynamicTimerTag, Instant absoluteTime) { Review comment: probably makes sense to keep the set shortcut since it is the most frequently used one. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] y1chi commented on a change in pull request #11756: [BEAM-9603] Add timer family support to FnApiDoFnRunner
y1chi commented on a change in pull request #11756: URL: https://github.com/apache/beam/pull/11756#discussion_r427758607 ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java ## @@ -1207,15 +1224,51 @@ private void output(Instant scheduledTime) { } } - private static class FnApiTimerMap implements TimerMap { -FnApiTimerMap() {} + private class FnApiTimerMap implements TimerMap { +private final String timerFamilyId; +private final K userKey; +private final TimeDomain timeDomain; +private final Instant elementTimestampOrTimerHoldTimestamp; +private final Instant elementTimestampOrTimerFireTimestamp; +private final BoundedWindow boundedWindow; +private final PaneInfo paneInfo; + +FnApiTimerMap( +String timerFamilyId, +K userKey, +BoundedWindow boundedWindow, +Instant elementTimestampOrTimerHoldTimestamp, +Instant elementTimestampOrTimerFireTimestamp, +PaneInfo paneInfo) { + this.timerFamilyId = timerFamilyId; + this.userKey = userKey; + this.elementTimestampOrTimerHoldTimestamp = elementTimestampOrTimerHoldTimestamp; + this.elementTimestampOrTimerFireTimestamp = elementTimestampOrTimerFireTimestamp; + this.boundedWindow = boundedWindow; + this.paneInfo = paneInfo; + + TimerFamilyDeclaration timerFamilyDeclaration = + doFnSignature.timerFamilyDeclarations().get(timerFamilyId); + this.timeDomain = + DoFnSignatures.getTimerFamilySpecOrThrow(timerFamilyDeclaration, doFn).getTimeDomain(); +} @Override -public void set(String timerId, Instant absoluteTime) {} +public void set(String dynamicTimerTag, Instant absoluteTime) { Review comment: This is the interface required by TimerMap though: https://github.com/apache/beam/blob/9108832cf1cb57161997e16190dbc6eccdc10492/sdks/java/core/src/main/java/org/apache/beam/sdk/state/TimerMap.java#L25 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] y1chi commented on a change in pull request #11756: [BEAM-9603] Add timer family support to FnApiDoFnRunner
y1chi commented on a change in pull request #11756: URL: https://github.com/apache/beam/pull/11756#discussion_r427738041 ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java ## @@ -1207,15 +1224,51 @@ private void output(Instant scheduledTime) { } } - private static class FnApiTimerMap implements TimerMap { -FnApiTimerMap() {} + private class FnApiTimerMap implements TimerMap { +private final String timerFamilyId; +private final K userKey; +private final TimeDomain timeDomain; +private final Instant elementTimestampOrTimerHoldTimestamp; +private final Instant elementTimestampOrTimerFireTimestamp; +private final BoundedWindow boundedWindow; +private final PaneInfo paneInfo; + +FnApiTimerMap( +String timerFamilyId, +K userKey, +BoundedWindow boundedWindow, +Instant elementTimestampOrTimerHoldTimestamp, +Instant elementTimestampOrTimerFireTimestamp, +PaneInfo paneInfo) { + this.timerFamilyId = timerFamilyId; + this.userKey = userKey; + this.elementTimestampOrTimerHoldTimestamp = elementTimestampOrTimerHoldTimestamp; + this.elementTimestampOrTimerFireTimestamp = elementTimestampOrTimerFireTimestamp; + this.boundedWindow = boundedWindow; + this.paneInfo = paneInfo; + + TimerFamilyDeclaration timerFamilyDeclaration = + doFnSignature.timerFamilyDeclarations().get(timerFamilyId); + this.timeDomain = + DoFnSignatures.getTimerFamilySpecOrThrow(timerFamilyDeclaration, doFn).getTimeDomain(); +} @Override -public void set(String timerId, Instant absoluteTime) {} +public void set(String dynamicTimerTag, Instant absoluteTime) { Review comment: I believe we can always call the get() first to access FnApiTimer and call it's APIs. Probably that's sufficient enough? I feel adding more shortcuts only makes the API slightly more user-friendly. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [beam] y1chi commented on a change in pull request #11756: [BEAM-9603] Add timer family support to FnApiDoFnRunner
y1chi commented on a change in pull request #11756: URL: https://github.com/apache/beam/pull/11756#discussion_r427737289 ## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java ## @@ -962,16 +971,25 @@ private Progress getProgress() { .build()); } - private void processTimer(String timerId, TimeDomain timeDomain, Timer timer) { + private void processTimer( + String timerIdOrTimerFamilyId, TimeDomain timeDomain, Timer timer) { currentTimer = timer; currentTimeDomain = timeDomain; onTimerContext = new OnTimerContext<>(timer.getUserKey()); +String timerId = +timerIdOrTimerFamilyId.startsWith(TimerFamilyDeclaration.PREFIX) Review comment: it will be ignored anyway, apparently only one of timerId or the timerFamilyId takes effect. https://github.com/apache/beam/blob/591de3473144de54beef0932131025e2a4d8504b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java#L223 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org