Re: @RequiresTimeSortedInput adoption by runners

2024-01-20 Thread Robert Burke
Inline

On Sat, Jan 20, 2024, 9:15 AM Jan Lukavský  wrote:

> On 1/19/24 22:49, Robert Bradshaw via dev wrote:
>
> I think this standard design could still be made to work.
> Specifically, the graph would contain a DoFn that has the
> RequiresTimeSortedInput bit set, and as a single "subtransform" that
> has a different DoFn in its spec that does not require this bit to be
> set and whose implementation enforces this ordering (say, via state)
> before invoking the user's DoFn. This would work fine in Streaming for
> any runner, and would work OK for batch as long as the value set for
> any key fit into memory (or the batch state implementation spilled to
> disk, though that could get really slow). Runners that wanted to do
> better (e.g. provide timestamp sorting as part of their batch
> grouping, or even internally sort timestamps more efficiently than
> could be done via the SDK over the state API) could do so.
>
> Yes, this should work fine for portable runners outside Java.
>
> For Java, such a wrapper might be a bit messy, but could probably be
> hard coded above the ByteBuddy wrappers layer.
>
> +1
> Maybe we can delegate this to an (internal) annotation that would enable
> DoFns to define a subclass of DoFnInvokerFactory.
> E.g.
>
> class MyDoFn extends DoFn<> {
>
>   @DoFnInvokerFactory
>
>   MyDoFnInvokerFactory createDoFnInvokerFactory() {
>
> ...
>
>   }
>
> }
>
> TBD how much of our infrastructure assumes ParDo transforms do not
> contain subtransforms. (We could also provide a different URN for
> RequresTimeSortedInput DoFns whose payload would be the DoFn payload,
> rather than setting a bit on the payload itself.) Rather than
> introducing nesting, we could implement the AnyOf PTransform that
> would present the two implementations as siblings (which could be
> useful elsewhere). This can be made backward compatible by providing
> one of the alternatives as the composite structure. The primary
> hesitation I have here is that it prevents much
> introspection/manipulation of the pipeline before the runner
> capabilities are know.
>
> What we really want is a way to annotate a DoFn as
> RequestsTimeSortedInput, together with a way for the runner to
> communicate to the SDK whether or not it was able to honor this
> request. That may be a more invasive change to the protocol (e.g.
> annotating PCollections with ordering properties, which is where it
> belongs[1]). I suppose we could let a runner that supports this
> capability strip the RequestsTimeSortedInput bit (or set a new bit),
> and SDKs that get unmutated transforms would know they have to do the
> sorting themselves.
>
> That sounds more like runners that are able to provide the sorting
> themselves would have a manual override for the sorted ParDo (be it via a
> bit or specific URN), no?
>

While it offhand feels like be a backwards incompatible change, the
runner+sdk could have a pair of Capabilities: the SDK saying it has the
capability for the Backup/Alternative option, and the Runner it doesn't
have the capability for the annotation that it can report to workers

I don't like the explicit negative capability though. Better to have the
runner explicitly report a capability for sorted input so the SDK can avoid
repeating any sorting/buffering work, possibly as a "V2" of any existing
requirement or capabilities...

 Jan
>
> - Robert
>
> [1] Ordering is an under-defined concept in Beam, but if we're going
> to add it my take would be that to do it properly one would want
>
> (1) Annotations on PCollections indicating whether they're unordered
> or ordered (by a certain ordering criteria, in this case
> timestamp-within-key), which could be largely inferred by
> (2) Annotations on PTransforms indicating whether they're
> order-creating, order-preserving, or order-requiring (with the default
> being unspeciified=order-destroying), again parameterized by an
> ordering criteria of some kind, which criteria could for a hierarchy.
>
>
> On Fri, Jan 19, 2024 at 10:40 AM Kenneth Knowles  
>  wrote:
>
> In this design space, what we have done in the past is:
>
> 1) ensure that runners all reject pipelines they cannot run correctly
> 2) if there is a default/workaround/slower implementation, provide it as an 
> override
>
> This is largely ignoring portability but I think/hope it will still work. At 
> one time I put some effort into ensuring Java Pipeline objects and proto 
> representations could roundtrip with all the necessary information for 
> pre-portability runners to still work, which is the same prereqs as 
> pre-portable "Override" implementations to still work.
>
> TBH I'm 50/50 on the idea. If something is going to be implemented more 
> slowly or less scalably as a fallback, I think it may be best to simply be 
> upfront about being unable to really run it. It would depend on the 
> situation. For requiring time sorted input, the manual implementation is 
> probably similar to what a streaming runner might do, so it might make

Re: @RequiresTimeSortedInput adoption by runners

2024-01-20 Thread Jan Lukavský

On 1/19/24 22:49, Robert Bradshaw via dev wrote:

I think this standard design could still be made to work.
Specifically, the graph would contain a DoFn that has the
RequiresTimeSortedInput bit set, and as a single "subtransform" that
has a different DoFn in its spec that does not require this bit to be
set and whose implementation enforces this ordering (say, via state)
before invoking the user's DoFn. This would work fine in Streaming for
any runner, and would work OK for batch as long as the value set for
any key fit into memory (or the batch state implementation spilled to
disk, though that could get really slow). Runners that wanted to do
better (e.g. provide timestamp sorting as part of their batch
grouping, or even internally sort timestamps more efficiently than
could be done via the SDK over the state API) could do so.

Yes, this should work fine for portable runners outside Java.


For Java, such a wrapper might be a bit messy, but could probably be
hard coded above the ByteBuddy wrappers layer.

+1
Maybe we can delegate this to an (internal) annotation that would enable 
DoFns to define a subclass of DoFnInvokerFactory.

E.g.

class MyDoFn extends DoFn<> {

  @DoFnInvokerFactory

  MyDoFnInvokerFactory createDoFnInvokerFactory() {

    ...

  }

}



TBD how much of our infrastructure assumes ParDo transforms do not
contain subtransforms. (We could also provide a different URN for
RequresTimeSortedInput DoFns whose payload would be the DoFn payload,
rather than setting a bit on the payload itself.) Rather than
introducing nesting, we could implement the AnyOf PTransform that
would present the two implementations as siblings (which could be
useful elsewhere). This can be made backward compatible by providing
one of the alternatives as the composite structure. The primary
hesitation I have here is that it prevents much
introspection/manipulation of the pipeline before the runner
capabilities are know.

What we really want is a way to annotate a DoFn as
RequestsTimeSortedInput, together with a way for the runner to
communicate to the SDK whether or not it was able to honor this
request. That may be a more invasive change to the protocol (e.g.
annotating PCollections with ordering properties, which is where it
belongs[1]). I suppose we could let a runner that supports this
capability strip the RequestsTimeSortedInput bit (or set a new bit),
and SDKs that get unmutated transforms would know they have to do the
sorting themselves.


That sounds more like runners that are able to provide the sorting 
themselves would have a manual override for the sorted ParDo (be it via 
a bit or specific URN), no?


 Jan



- Robert

[1] Ordering is an under-defined concept in Beam, but if we're going
to add it my take would be that to do it properly one would want

(1) Annotations on PCollections indicating whether they're unordered
or ordered (by a certain ordering criteria, in this case
timestamp-within-key), which could be largely inferred by
(2) Annotations on PTransforms indicating whether they're
order-creating, order-preserving, or order-requiring (with the default
being unspeciified=order-destroying), again parameterized by an
ordering criteria of some kind, which criteria could for a hierarchy.


On Fri, Jan 19, 2024 at 10:40 AM Kenneth Knowles  wrote:

In this design space, what we have done in the past is:

1) ensure that runners all reject pipelines they cannot run correctly
2) if there is a default/workaround/slower implementation, provide it as an 
override

This is largely ignoring portability but I think/hope it will still work. At one time I 
put some effort into ensuring Java Pipeline objects and proto representations could 
roundtrip with all the necessary information for pre-portability runners to still work, 
which is the same prereqs as pre-portable "Override" implementations to still 
work.

TBH I'm 50/50 on the idea. If something is going to be implemented more slowly 
or less scalably as a fallback, I think it may be best to simply be upfront 
about being unable to really run it. It would depend on the situation. For 
requiring time sorted input, the manual implementation is probably similar to 
what a streaming runner might do, so it might make sense.

Kenn

On Fri, Jan 19, 2024 at 11:05 AM Robert Burke  wrote:

I certainly don't have the deeper java insight here. So one more portable based 
reply and then I'll step back on the Java specifics.

Portable runners only really have the "unknown Composite" fallback option, 
where if the Composite's URN isn't known to the runner, it should use the subgraph that 
is being wrapped.

I suppose the protocol could be expanded : If a composite transform with a 
ParDo payload, and urn has features the runner can't handle, then it could use 
the fallback graph as well.

The SDK would have then still needed to have construct the fallback graph into 
the Pipeline proto. This doesn't sound incompatible with what you've suggested 
the Java SDK could do, but

Re: @RequiresTimeSortedInput adoption by runners

2024-01-19 Thread Robert Bradshaw via dev
I think this standard design could still be made to work.
Specifically, the graph would contain a DoFn that has the
RequiresTimeSortedInput bit set, and as a single "subtransform" that
has a different DoFn in its spec that does not require this bit to be
set and whose implementation enforces this ordering (say, via state)
before invoking the user's DoFn. This would work fine in Streaming for
any runner, and would work OK for batch as long as the value set for
any key fit into memory (or the batch state implementation spilled to
disk, though that could get really slow). Runners that wanted to do
better (e.g. provide timestamp sorting as part of their batch
grouping, or even internally sort timestamps more efficiently than
could be done via the SDK over the state API) could do so.

For Java, such a wrapper might be a bit messy, but could probably be
hard coded above the ByteBuddy wrappers layer.

TBD how much of our infrastructure assumes ParDo transforms do not
contain subtransforms. (We could also provide a different URN for
RequresTimeSortedInput DoFns whose payload would be the DoFn payload,
rather than setting a bit on the payload itself.) Rather than
introducing nesting, we could implement the AnyOf PTransform that
would present the two implementations as siblings (which could be
useful elsewhere). This can be made backward compatible by providing
one of the alternatives as the composite structure. The primary
hesitation I have here is that it prevents much
introspection/manipulation of the pipeline before the runner
capabilities are know.

What we really want is a way to annotate a DoFn as
RequestsTimeSortedInput, together with a way for the runner to
communicate to the SDK whether or not it was able to honor this
request. That may be a more invasive change to the protocol (e.g.
annotating PCollections with ordering properties, which is where it
belongs[1]). I suppose we could let a runner that supports this
capability strip the RequestsTimeSortedInput bit (or set a new bit),
and SDKs that get unmutated transforms would know they have to do the
sorting themselves.

- Robert

[1] Ordering is an under-defined concept in Beam, but if we're going
to add it my take would be that to do it properly one would want

(1) Annotations on PCollections indicating whether they're unordered
or ordered (by a certain ordering criteria, in this case
timestamp-within-key), which could be largely inferred by
(2) Annotations on PTransforms indicating whether they're
order-creating, order-preserving, or order-requiring (with the default
being unspeciified=order-destroying), again parameterized by an
ordering criteria of some kind, which criteria could for a hierarchy.


On Fri, Jan 19, 2024 at 10:40 AM Kenneth Knowles  wrote:
>
> In this design space, what we have done in the past is:
>
> 1) ensure that runners all reject pipelines they cannot run correctly
> 2) if there is a default/workaround/slower implementation, provide it as an 
> override
>
> This is largely ignoring portability but I think/hope it will still work. At 
> one time I put some effort into ensuring Java Pipeline objects and proto 
> representations could roundtrip with all the necessary information for 
> pre-portability runners to still work, which is the same prereqs as 
> pre-portable "Override" implementations to still work.
>
> TBH I'm 50/50 on the idea. If something is going to be implemented more 
> slowly or less scalably as a fallback, I think it may be best to simply be 
> upfront about being unable to really run it. It would depend on the 
> situation. For requiring time sorted input, the manual implementation is 
> probably similar to what a streaming runner might do, so it might make sense.
>
> Kenn
>
> On Fri, Jan 19, 2024 at 11:05 AM Robert Burke  wrote:
>>
>> I certainly don't have the deeper java insight here. So one more portable 
>> based reply and then I'll step back on the Java specifics.
>>
>> Portable runners only really have the "unknown Composite" fallback option, 
>> where if the Composite's URN isn't known to the runner, it should use the 
>> subgraph that is being wrapped.
>>
>> I suppose the protocol could be expanded : If a composite transform with a 
>> ParDo payload, and urn has features the runner can't handle, then it could 
>> use the fallback graph as well.
>>
>> The SDK would have then still needed to have construct the fallback graph 
>> into the Pipeline proto. This doesn't sound incompatible with what you've 
>> suggested the Java SDK could do, but it avoids the runner needing to be 
>> aware of a specific implementation requirement around a feature it doesn't 
>> support.  If it has to do something specific to support an SDK specific 
>> mechanism, that's still supporting the feature, but I fear it's not a great 
>> road to tread on for runners to add SDK specific implementation details.
>>
>> If a (portable) runner is going to spend work on doing something to handle 
>> RequiresTimeSortedInput, it's probably easi

Re: @RequiresTimeSortedInput adoption by runners

2024-01-19 Thread Kenneth Knowles
In this design space, what we have done in the past is:

1) ensure that runners all reject pipelines they cannot run correctly
2) if there is a default/workaround/slower implementation, provide it as an
override

This is largely ignoring portability but I think/hope it will still work.
At one time I put some effort into ensuring Java Pipeline objects and proto
representations could roundtrip with all the necessary information for
pre-portability runners to still work, which is the same prereqs as
pre-portable "Override" implementations to still work.

TBH I'm 50/50 on the idea. If something is going to be implemented more
slowly or less scalably as a fallback, I think it may be best to simply be
upfront about being unable to really run it. It would depend on the
situation. For requiring time sorted input, the manual implementation is
probably similar to what a streaming runner might do, so it might make
sense.

Kenn

On Fri, Jan 19, 2024 at 11:05 AM Robert Burke  wrote:

> I certainly don't have the deeper java insight here. So one more portable
> based reply and then I'll step back on the Java specifics.
>
> Portable runners only really have the "unknown Composite" fallback option,
> where if the Composite's URN isn't known to the runner, it should use the
> subgraph that is being wrapped.
>
> I suppose the protocol could be expanded : If a composite transform with a
> ParDo payload, and urn has features the runner can't handle, then it could
> use the fallback graph as well.
>
> The SDK would have then still needed to have construct the fallback graph
> into the Pipeline proto. This doesn't sound incompatible with what you've
> suggested the Java SDK could do, but it avoids the runner needing to be
> aware of a specific implementation requirement around a feature it doesn't
> support.  If it has to do something specific to support an SDK specific
> mechanism, that's still supporting the feature, but I fear it's not a great
> road to tread on for runners to add SDK specific implementation details.
>
> If a (portable) runner is going to spend work on doing something to handle
> RequiresTimeSortedInput, it's probably easier to handle it generally than
> to try to enable a Java specific work around. I'm not even sure how that
> could work since the SDK would then need a special interpretation of what a
> runner sent back for it to do any SDK side special backup handling, vs the
> simple execution of the given transform.
>
> It's entirely possible I've over simplified the "fallback" protocol
> described above, so this thread is still useful for my Prism work,
> especially if I see any similar situations once I start on the Java
> Validates Runner suite.
>
> Robert Burke
> Beam Go Busybody
>
> On Fri, Jan 19, 2024, 6:41 AM Jan Lukavský  wrote:
>
>> I was primarily focused on Java SDK (and core-contruction-java), but
>> generally speaking, any SDK can provide default expansion that runners can
>> use so that it is not (should not be) required to implement this manually.
>> Currently, in Java SDK, the annotation is wired up into
>> StatefulDoFnRunner, which (as name suggests) can be used for running
>> stateful DoFns. The problem is that not every runner is using this
>> facility. Java SDK generally supports providing default expansions of
>> transforms, but _only for transforms that do not have to work with dynamic
>> state_. This is not the case for this annotation - a default implementation
>> for @RequiresTimeSortedInput has to take another DoFn as input, and wire
>> its lifecycle in a way that elements are buffered in (dynamically created)
>> buffer and fed into the downstream DoFn only when timer fires.
>>
>> If I narrow down my line of thinking, it would be possible to:
>>  a) create something like "dynamic pipeline expansion", which would make
>> it possible work with PTransforms in this way (probably would require some
>> ByteBuddy magic)
>>  b) wire this up to DoFnInvoker, which takes DoFn and creates class that
>> is used by runners for feeding data
>>
>> Option b) would ensure that actually all runners support such expansion,
>> but seems to be somewhat hacky and too specific to this case. Moreover, it
>> would require knowledge if the expansion is actually required by the runner
>> (e.g. if the annotation is supported explicitly - most likely for batch
>> execution). Therefore I'd be in favor of option a), this might be reusable
>> by a broader range of default expansions.
>>
>> In other SDKs than Java this might have different implications, the
>> reason why it is somewhat more complicated to do dynamic (or generic?)
>> expansions of PTransforms in Java is mostly due to how DoFns are
>> implemented in terms of annotations and the DoFnInvokers involved for
>> efficiency.
>>
>>  Jan
>>
>> On 1/18/24 18:35, Robert Burke wrote:
>>
>> I agree that variable support across Runners does limit the adoption of a 
>> feature.  But it's also then limited if the SDKs and their local / direct 
>> runners don't y

Re: @RequiresTimeSortedInput adoption by runners

2024-01-19 Thread Robert Burke
I certainly don't have the deeper java insight here. So one more portable
based reply and then I'll step back on the Java specifics.

Portable runners only really have the "unknown Composite" fallback option,
where if the Composite's URN isn't known to the runner, it should use the
subgraph that is being wrapped.

I suppose the protocol could be expanded : If a composite transform with a
ParDo payload, and urn has features the runner can't handle, then it could
use the fallback graph as well.

The SDK would have then still needed to have construct the fallback graph
into the Pipeline proto. This doesn't sound incompatible with what you've
suggested the Java SDK could do, but it avoids the runner needing to be
aware of a specific implementation requirement around a feature it doesn't
support.  If it has to do something specific to support an SDK specific
mechanism, that's still supporting the feature, but I fear it's not a great
road to tread on for runners to add SDK specific implementation details.

If a (portable) runner is going to spend work on doing something to handle
RequiresTimeSortedInput, it's probably easier to handle it generally than
to try to enable a Java specific work around. I'm not even sure how that
could work since the SDK would then need a special interpretation of what a
runner sent back for it to do any SDK side special backup handling, vs the
simple execution of the given transform.

It's entirely possible I've over simplified the "fallback" protocol
described above, so this thread is still useful for my Prism work,
especially if I see any similar situations once I start on the Java
Validates Runner suite.

Robert Burke
Beam Go Busybody

On Fri, Jan 19, 2024, 6:41 AM Jan Lukavský  wrote:

> I was primarily focused on Java SDK (and core-contruction-java), but
> generally speaking, any SDK can provide default expansion that runners can
> use so that it is not (should not be) required to implement this manually.
> Currently, in Java SDK, the annotation is wired up into
> StatefulDoFnRunner, which (as name suggests) can be used for running
> stateful DoFns. The problem is that not every runner is using this
> facility. Java SDK generally supports providing default expansions of
> transforms, but _only for transforms that do not have to work with dynamic
> state_. This is not the case for this annotation - a default implementation
> for @RequiresTimeSortedInput has to take another DoFn as input, and wire
> its lifecycle in a way that elements are buffered in (dynamically created)
> buffer and fed into the downstream DoFn only when timer fires.
>
> If I narrow down my line of thinking, it would be possible to:
>  a) create something like "dynamic pipeline expansion", which would make
> it possible work with PTransforms in this way (probably would require some
> ByteBuddy magic)
>  b) wire this up to DoFnInvoker, which takes DoFn and creates class that
> is used by runners for feeding data
>
> Option b) would ensure that actually all runners support such expansion,
> but seems to be somewhat hacky and too specific to this case. Moreover, it
> would require knowledge if the expansion is actually required by the runner
> (e.g. if the annotation is supported explicitly - most likely for batch
> execution). Therefore I'd be in favor of option a), this might be reusable
> by a broader range of default expansions.
>
> In other SDKs than Java this might have different implications, the reason
> why it is somewhat more complicated to do dynamic (or generic?) expansions
> of PTransforms in Java is mostly due to how DoFns are implemented in terms
> of annotations and the DoFnInvokers involved for efficiency.
>
>  Jan
>
> On 1/18/24 18:35, Robert Burke wrote:
>
> I agree that variable support across Runners does limit the adoption of a 
> feature.  But it's also then limited if the SDKs and their local / direct 
> runners don't yet support the feature. The Go SDK doesn't currently have a 
> way of specifying that annotation, preventing use.  (The lack of mention of 
> the Python direct runner your list implies it's not yet supported by the 
> Python SDK, and a quick search shows that's likely [0])
>
> While not yet widely available to the other SDKs, Prism, the new Go SDK Local 
> Runner, maintains data in event time sorted heaps [1]. The intent was to 
> implement the annotation (among other features) once I start running the Java 
> and Python Validates Runner suites against it.
>
> I think stateful transforms are getting the event ordering on values for 
> "free" as a result [2], but there's no special/behavior at present if the 
> DoFn is consuming the result of a Group By Key.
>
> Part of the issue is that by definition, a GBK "loses" the timestamps of the 
> values, and doesn't emit them, outside of using them to determine the 
> resulting timestamp of the Key... [3]. To make use of the timestamp in the 
> aggregation stage a runner would need to do something different in the GBK, 
> namely sor

Re: @RequiresTimeSortedInput adoption by runners

2024-01-19 Thread Jan Lukavský
I was primarily focused on Java SDK (and core-contruction-java), but 
generally speaking, any SDK can provide default expansion that runners 
can use so that it is not (should not be) required to implement this 
manually.
Currently, in Java SDK, the annotation is wired up into 
StatefulDoFnRunner, which (as name suggests) can be used for running 
stateful DoFns. The problem is that not every runner is using this 
facility. Java SDK generally supports providing default expansions of 
transforms, but _only for transforms that do not have to work with 
dynamic state_. This is not the case for this annotation - a default 
implementation for @RequiresTimeSortedInput has to take another DoFn as 
input, and wire its lifecycle in a way that elements are buffered in 
(dynamically created) buffer and fed into the downstream DoFn only when 
timer fires.


If I narrow down my line of thinking, it would be possible to:
 a) create something like "dynamic pipeline expansion", which would 
make it possible work with PTransforms in this way (probably would 
require some ByteBuddy magic)
 b) wire this up to DoFnInvoker, which takes DoFn and creates class 
that is used by runners for feeding data


Option b) would ensure that actually all runners support such expansion, 
but seems to be somewhat hacky and too specific to this case. Moreover, 
it would require knowledge if the expansion is actually required by the 
runner (e.g. if the annotation is supported explicitly - most likely for 
batch execution). Therefore I'd be in favor of option a), this might be 
reusable by a broader range of default expansions.


In other SDKs than Java this might have different implications, the 
reason why it is somewhat more complicated to do dynamic (or generic?) 
expansions of PTransforms in Java is mostly due to how DoFns are 
implemented in terms of annotations and the DoFnInvokers involved for 
efficiency.


 Jan

On 1/18/24 18:35, Robert Burke wrote:

I agree that variable support across Runners does limit the adoption of a 
feature.  But it's also then limited if the SDKs and their local / direct 
runners don't yet support the feature. The Go SDK doesn't currently have a way 
of specifying that annotation, preventing use.  (The lack of mention of the 
Python direct runner your list implies it's not yet supported by the Python 
SDK, and a quick search shows that's likely [0])
While not yet widely available to the other SDKs, Prism, the new Go SDK Local 
Runner, maintains data in event time sorted heaps [1]. The intent was to 
implement the annotation (among other features) once I start running the Java 
and Python Validates Runner suites against it.

I think stateful transforms are getting the event ordering on values for "free" 
as a result [2], but there's no special/behavior at present if the DoFn is consuming the 
result of a Group By Key.

Part of the issue is that by definition, a GBK "loses" the timestamps of the 
values, and doesn't emit them, outside of using them to determine the resulting timestamp 
of the Key... [3]. To make use of the timestamp in the aggregation stage a runner would 
need to do something different in the GBK, namely sorting by the timestamp as the data is 
ingested, and keeping that timestamp around to continue the sort. This prevents a more 
efficient implementation of directly arranging the received element bytes into the 
Iterator format, requiring a post process filtering. Not hard, but a little dissatisfying.

Skimming through the discussion, I agree with the general utility goal of the 
annotation, but as with many Beam features, there may be a discoverability 
problem. The feature isn't mentioned in the Programming Guide (AFAICT), and 
trying to find anything on the beam site, the top result is the Javadoc for the 
annotation (which is good, but you still need to know to look for it), and then 
the next time related bit is OrderedListState which doesn't yet have a 
meaningful portable representation last I checked [4], once again limiting 
adoption.

Probably the most critical bit is, while we have broad "handling" of the annotation, I'm hard 
pressed to say we even use the annotation outside of tests. A search [5] doesn't show any 
"Transforms" or "IOs" making use of it with the only markdown/documentation about it 
being the Beam 2.20.0 release notes saying it's now supported in Flink and Spark [6].

I will say, this isn't grounds for removing the feature, as I can only check what's in 
the repo, and not what end users have, but it does indicate we didn't drive the feature 
to completion and enable user adoption beyond "This Exists, and we can tell you 
about it if you ask.".

AFAICT this is just one of those features we built, but then proceeded not to 
use within Beam, and evangelize. This is a point we could certainly do better 
on in Beam as a whole.

Robert Burke
Beam Go Busybody

[0]https://github.com/search?q=repo%3Aapache%2Fbeam+TIME_SORTED_INPUT+language%3APython&type=code

[1]https://gi

Re: @RequiresTimeSortedInput adoption by runners

2024-01-18 Thread Robert Burke
I agree that variable support across Runners does limit the adoption of a 
feature.  But it's also then limited if the SDKs and their local / direct 
runners don't yet support the feature. The Go SDK doesn't currently have a way 
of specifying that annotation, preventing use.  (The lack of mention of the 
Python direct runner your list implies it's not yet supported by the Python 
SDK, and a quick search shows that's likely [0])

While not yet widely available to the other SDKs, Prism, the new Go SDK Local 
Runner, maintains data in event time sorted heaps [1]. The intent was to 
implement the annotation (among other features) once I start running the Java 
and Python Validates Runner suites against it.

I think stateful transforms are getting the event ordering on values for "free" 
as a result [2], but there's no special/behavior at present if the DoFn is 
consuming the result of a Group By Key.

Part of the issue is that by definition, a GBK "loses" the timestamps of the 
values, and doesn't emit them, outside of using them to determine the resulting 
timestamp of the Key... [3]. To make use of the timestamp in the aggregation 
stage a runner would need to do something different in the GBK, namely sorting 
by the timestamp as the data is ingested, and keeping that timestamp around to 
continue the sort. This prevents a more efficient implementation of directly 
arranging the received element bytes into the Iterator format, requiring a post 
process filtering. Not hard, but a little dissatisfying.

Skimming through the discussion, I agree with the general utility goal of the 
annotation, but as with many Beam features, there may be a discoverability 
problem. The feature isn't mentioned in the Programming Guide (AFAICT), and 
trying to find anything on the beam site, the top result is the Javadoc for the 
annotation (which is good, but you still need to know to look for it), and then 
the next time related bit is OrderedListState which doesn't yet have a 
meaningful portable representation last I checked [4], once again limiting 
adoption.

Probably the most critical bit is, while we have broad "handling" of the 
annotation, I'm hard pressed to say we even use the annotation outside of 
tests. A search [5] doesn't show any "Transforms" or "IOs" making use of it 
with the only markdown/documentation about it being the Beam 2.20.0 release 
notes saying it's now supported in Flink and Spark [6].

I will say, this isn't grounds for removing the feature, as I can only check 
what's in the repo, and not what end users have, but it does indicate we didn't 
drive the feature to completion and enable user adoption beyond "This Exists, 
and we can tell you about it if you ask.".

AFAICT this is just one of those features we built, but then proceeded not to 
use within Beam, and evangelize. This is a point we could certainly do better 
on in Beam as a whole.

Robert Burke
Beam Go Busybody

[0]  
https://github.com/search?q=repo%3Aapache%2Fbeam+TIME_SORTED_INPUT+language%3APython&type=code

[1] 
https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go#L93

[2] 
https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go#L1094

[3] 
https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto#L1132

[4] 
https://github.com/apache/beam/issues?q=is%3Aissue+is%3Aopen+OrderedListState

[5] 
https://github.com/search?q=repo%3Aapache%2Fbeam+RequiresTimeSortedInput&type=code&p=2

[6] 
https://github.com/apache/beam/blob/b4c23b32f2b80ce052c8a235e5064c69f37df992/website/www/site/content/en/blog/beam-2.20.0.md?plain=1#L46

On 2024/01/18 16:14:56 Jan Lukavský wrote:
> Hi,
> 
> recently I came across the fact that most runners do not support 
> @RequiresTimeSortedInput annotation for sorting per-key data by event 
> timestamp [1]. Actually, runners supporting it seem to be Direct java, 
> Flink and Dataflow batch (as it is a noop there). The annotation has 
> use-cases in time-series data processing, in transaction processing and 
> more. Though it is absolutely possible to implement the time-sorting 
> manually (e.g. [2]), this is actually efficient only in streaming mode, 
> in batch mode the runner typically wants to leverage the internal 
> sort-grouping it already does.
> 
> The original idea was to implement this annotation inside 
> StatefulDoFnRunner, which would be used by majority of runners. It turns 
> out that this is not the case. The question now is, should we use an 
> alternative place to implement the annotation (e.g. Pipeline expansion, 
> or DoFnInvoker) so that more runners can benefit from it automatically 
> (at least for streaming case, batch case needs to be implemented 
> manually)? Do the community find the annotation useful? I'm linking a 
> rather old (and long :)) thread that preceded introduction of the 
> annotation [3] for more cont