Re: [External] Re: DISCUSS: Sorted MapState API

2020-09-28 Thread Kenneth Knowles
On Wed, Sep 16, 2020 at 8:48 AM Tyson Hamilton  wrote:

> The use case is to support an unbounded stream-stream join, where the
> elements are arriving in roughly time sorted order. Removing a specific
> element from the timestamp indexed collection is necessary when a match is
> found.
>

Just checking - this is an optimization when you already know that the join
is 1:1?

Kenn


> Having clearRange is helpful to expire elements that are no longer
> relevant according to a user-provided time based join predicate (e.g. WHEN
> ABS(leftElement.timestamp - rightElement.timestamp) < 5 minutes).
>
> I'll think a bit more on how to use MapState instead if having a remove()
> like method for a single element isn't an option.
>
> On Tue, Sep 15, 2020 at 8:52 PM Reuven Lax  wrote:
>
>> Hi,
>>
>> Currently we only support removing a timestamp range. You can remove a
>> single timestamp of course by removing [ts, ts+1), however if there are
>> multiple elements with the same timestamp this will remove all of those
>> elements.
>>
>> Does this fit your use case? If not, I wonder if MapState is closer to
>> what you are looking for?
>>
>> Reuven
>>
>> On Tue, Sep 15, 2020 at 2:33 PM Tyson Hamilton 
>> wrote:
>>
>>> Hi Reuven,
>>>
>>> I noticed that there was an implementation of the in-memory
>>> OrderedListState introduced [1]. Where can I find out more regarding the
>>> plan and design? Is there a design doc? I'd like to know more details about
>>> the implementation to see if it fits my use case. I was hoping it would
>>> have a remove(TimestampedValue e) method.
>>>
>>> Thanks,
>>> -Tyson
>>>
>>>
>>> [1]:
>>> https://github.com/apache/beam/commit/9d0d0b0c4506b288164b155c5ce3a23d76db3c41
>>>
>>>
>>> On 2020/08/03 21:41:46, Catlyn Kong  wrote:
>>> > Hey folks,
>>> >
>>> > Sry I'm late to this thread but this might be very helpful for the
>>> problem
>>> > we're dealing with. Do we have a design doc or a jira ticket I can
>>> follow?
>>> >
>>> > Cheers,
>>> > Catlyn
>>> >
>>> > On Thu, Jun 18, 2020 at 1:11 PM Jan Lukavský  wrote:
>>> >
>>> > > My questions were just an example. I fully agree there is a
>>> fundamental
>>> > > need for a sorted state (of some form, and I also think this links to
>>> > > efficient implementation of retrations) - I was reacting to Kenn's
>>> question
>>> > > about BIP. This one would be pretty nice example why it would be
>>> good to
>>> > > have such a "process" - not everything can be solved on ML and there
>>> are
>>> > > fundamental decisions that might need a closer attention.
>>> > > On 6/18/20 5:28 PM, Reuven Lax wrote:
>>> > >
>>> > > Jan - my proposal is exactly TimeSortedBagState (more accurately -
>>> > > TimeSortedListState), though I went a bit further and also proposed
>>> a way
>>> > > to have a dynamic number of tagged TimeSortedBagStates.
>>> > >
>>> > > You are correct that the runner doesn't really have to store the
>>> data time
>>> > > sorted - what's actually needed is the ability to fetch and remove
>>> > > timestamp ranges of data (though that does include fetching the
>>> entire
>>> > > list); TimeOrderedState is probably a more accurate name then
>>> > > TimeSortedState. I don't think we could get away with operations
>>> that only
>>> > > act on the smallest timestamp, however we could limit the API to
>>> only being
>>> > > able to fetch and remove prefixes of data (ordered by timestamp).
>>> However
>>> > > if we support prefixes, we might as well support arbitrary subranges.
>>> > >
>>> > > On Thu, Jun 18, 2020 at 7:26 AM Jan Lukavský 
>>> wrote:
>>> > >
>>> > >> Big +1 for a BIP, as this might really help clarify all the pros
>>> and cons
>>> > >> of all possibilities. There seem to be questions that need
>>> answering and
>>> > >> motivating use cases - do we need sorted map state or can we solve
>>> our use
>>> > >> cases by something simpler - e.g. the mentioned TimeSortedBagState?
>>> Does
>>> > >> that really have to be time-sorted structure, or does it "only"
>>> have to
>>> > >> have operations that can efficiently find and remove element with
>>> smallest
>>> > >> timestamp (like a PriorityQueue)?
>>> > >>
>>> > >> Jan
>>> > >> On 6/18/20 5:32 AM, Kenneth Knowles wrote:
>>> > >>
>>> > >> Zooming in from generic philosophy to be clear: adding time ordered
>>> > >> buffer to the Fn state API is *not* a shortcut.It has benefits that
>>> will
>>> > >> not be achieved by SDK-side implementation on top of either ordered
>>> or
>>> > >> unordered multimap. Are those benefits worth expanding the API? I
>>> don't
>>> > >> know.
>>> > >>
>>> > >> A change to allow a runner to have a specialized implementation for
>>> > >> time-buffered state would be one or more StateKey types, right?
>>> Reuven,
>>> > >> maybe put this and your Java API in a doc? A BIP? Seems like
>>> there's at
>>> > >> least the following to explore:
>>> > >>
>>> > >>  - how that Java API would map to an SDK-side implementation on top
>>> of
>>> > >> multimap state key
>>> > >>  - how that 

Re: [External] Re: DISCUSS: Sorted MapState API

2020-09-16 Thread Tyson Hamilton
The use case is to support an unbounded stream-stream join, where the
elements are arriving in roughly time sorted order. Removing a specific
element from the timestamp indexed collection is necessary when a match is
found. Having clearRange is helpful to expire elements that are no longer
relevant according to a user-provided time based join predicate (e.g. WHEN
ABS(leftElement.timestamp - rightElement.timestamp) < 5 minutes).

I'll think a bit more on how to use MapState instead if having a remove()
like method for a single element isn't an option.

On Tue, Sep 15, 2020 at 8:52 PM Reuven Lax  wrote:

> Hi,
>
> Currently we only support removing a timestamp range. You can remove a
> single timestamp of course by removing [ts, ts+1), however if there are
> multiple elements with the same timestamp this will remove all of those
> elements.
>
> Does this fit your use case? If not, I wonder if MapState is closer to
> what you are looking for?
>
> Reuven
>
> On Tue, Sep 15, 2020 at 2:33 PM Tyson Hamilton  wrote:
>
>> Hi Reuven,
>>
>> I noticed that there was an implementation of the in-memory
>> OrderedListState introduced [1]. Where can I find out more regarding the
>> plan and design? Is there a design doc? I'd like to know more details about
>> the implementation to see if it fits my use case. I was hoping it would
>> have a remove(TimestampedValue e) method.
>>
>> Thanks,
>> -Tyson
>>
>>
>> [1]:
>> https://github.com/apache/beam/commit/9d0d0b0c4506b288164b155c5ce3a23d76db3c41
>>
>>
>> On 2020/08/03 21:41:46, Catlyn Kong  wrote:
>> > Hey folks,
>> >
>> > Sry I'm late to this thread but this might be very helpful for the
>> problem
>> > we're dealing with. Do we have a design doc or a jira ticket I can
>> follow?
>> >
>> > Cheers,
>> > Catlyn
>> >
>> > On Thu, Jun 18, 2020 at 1:11 PM Jan Lukavský  wrote:
>> >
>> > > My questions were just an example. I fully agree there is a
>> fundamental
>> > > need for a sorted state (of some form, and I also think this links to
>> > > efficient implementation of retrations) - I was reacting to Kenn's
>> question
>> > > about BIP. This one would be pretty nice example why it would be good
>> to
>> > > have such a "process" - not everything can be solved on ML and there
>> are
>> > > fundamental decisions that might need a closer attention.
>> > > On 6/18/20 5:28 PM, Reuven Lax wrote:
>> > >
>> > > Jan - my proposal is exactly TimeSortedBagState (more accurately -
>> > > TimeSortedListState), though I went a bit further and also proposed a
>> way
>> > > to have a dynamic number of tagged TimeSortedBagStates.
>> > >
>> > > You are correct that the runner doesn't really have to store the data
>> time
>> > > sorted - what's actually needed is the ability to fetch and remove
>> > > timestamp ranges of data (though that does include fetching the entire
>> > > list); TimeOrderedState is probably a more accurate name then
>> > > TimeSortedState. I don't think we could get away with operations that
>> only
>> > > act on the smallest timestamp, however we could limit the API to only
>> being
>> > > able to fetch and remove prefixes of data (ordered by timestamp).
>> However
>> > > if we support prefixes, we might as well support arbitrary subranges.
>> > >
>> > > On Thu, Jun 18, 2020 at 7:26 AM Jan Lukavský  wrote:
>> > >
>> > >> Big +1 for a BIP, as this might really help clarify all the pros and
>> cons
>> > >> of all possibilities. There seem to be questions that need answering
>> and
>> > >> motivating use cases - do we need sorted map state or can we solve
>> our use
>> > >> cases by something simpler - e.g. the mentioned TimeSortedBagState?
>> Does
>> > >> that really have to be time-sorted structure, or does it "only" have
>> to
>> > >> have operations that can efficiently find and remove element with
>> smallest
>> > >> timestamp (like a PriorityQueue)?
>> > >>
>> > >> Jan
>> > >> On 6/18/20 5:32 AM, Kenneth Knowles wrote:
>> > >>
>> > >> Zooming in from generic philosophy to be clear: adding time ordered
>> > >> buffer to the Fn state API is *not* a shortcut.It has benefits that
>> will
>> > >> not be achieved by SDK-side implementation on top of either ordered
>> or
>> > >> unordered multimap. Are those benefits worth expanding the API? I
>> don't
>> > >> know.
>> > >>
>> > >> A change to allow a runner to have a specialized implementation for
>> > >> time-buffered state would be one or more StateKey types, right?
>> Reuven,
>> > >> maybe put this and your Java API in a doc? A BIP? Seems like there's
>> at
>> > >> least the following to explore:
>> > >>
>> > >>  - how that Java API would map to an SDK-side implementation on top
>> of
>> > >> multimap state key
>> > >>  - how that Java API would map to a new StateKey
>> > >>  - whether there's actually more than one relevant implementation of
>> that
>> > >> StateKey
>> > >>  - whether SDK-side implementation on some other state key would be
>> > >> performant enough in all SDK languages (present and future)
>> > >>
>> > 

Re: [External] Re: DISCUSS: Sorted MapState API

2020-09-15 Thread Reuven Lax
Hi,

Currently we only support removing a timestamp range. You can remove a
single timestamp of course by removing [ts, ts+1), however if there are
multiple elements with the same timestamp this will remove all of those
elements.

Does this fit your use case? If not, I wonder if MapState is closer to what
you are looking for?

Reuven

On Tue, Sep 15, 2020 at 2:33 PM Tyson Hamilton  wrote:

> Hi Reuven,
>
> I noticed that there was an implementation of the in-memory
> OrderedListState introduced [1]. Where can I find out more regarding the
> plan and design? Is there a design doc? I'd like to know more details about
> the implementation to see if it fits my use case. I was hoping it would
> have a remove(TimestampedValue e) method.
>
> Thanks,
> -Tyson
>
>
> [1]:
> https://github.com/apache/beam/commit/9d0d0b0c4506b288164b155c5ce3a23d76db3c41
>
>
> On 2020/08/03 21:41:46, Catlyn Kong  wrote:
> > Hey folks,
> >
> > Sry I'm late to this thread but this might be very helpful for the
> problem
> > we're dealing with. Do we have a design doc or a jira ticket I can
> follow?
> >
> > Cheers,
> > Catlyn
> >
> > On Thu, Jun 18, 2020 at 1:11 PM Jan Lukavský  wrote:
> >
> > > My questions were just an example. I fully agree there is a fundamental
> > > need for a sorted state (of some form, and I also think this links to
> > > efficient implementation of retrations) - I was reacting to Kenn's
> question
> > > about BIP. This one would be pretty nice example why it would be good
> to
> > > have such a "process" - not everything can be solved on ML and there
> are
> > > fundamental decisions that might need a closer attention.
> > > On 6/18/20 5:28 PM, Reuven Lax wrote:
> > >
> > > Jan - my proposal is exactly TimeSortedBagState (more accurately -
> > > TimeSortedListState), though I went a bit further and also proposed a
> way
> > > to have a dynamic number of tagged TimeSortedBagStates.
> > >
> > > You are correct that the runner doesn't really have to store the data
> time
> > > sorted - what's actually needed is the ability to fetch and remove
> > > timestamp ranges of data (though that does include fetching the entire
> > > list); TimeOrderedState is probably a more accurate name then
> > > TimeSortedState. I don't think we could get away with operations that
> only
> > > act on the smallest timestamp, however we could limit the API to only
> being
> > > able to fetch and remove prefixes of data (ordered by timestamp).
> However
> > > if we support prefixes, we might as well support arbitrary subranges.
> > >
> > > On Thu, Jun 18, 2020 at 7:26 AM Jan Lukavský  wrote:
> > >
> > >> Big +1 for a BIP, as this might really help clarify all the pros and
> cons
> > >> of all possibilities. There seem to be questions that need answering
> and
> > >> motivating use cases - do we need sorted map state or can we solve
> our use
> > >> cases by something simpler - e.g. the mentioned TimeSortedBagState?
> Does
> > >> that really have to be time-sorted structure, or does it "only" have
> to
> > >> have operations that can efficiently find and remove element with
> smallest
> > >> timestamp (like a PriorityQueue)?
> > >>
> > >> Jan
> > >> On 6/18/20 5:32 AM, Kenneth Knowles wrote:
> > >>
> > >> Zooming in from generic philosophy to be clear: adding time ordered
> > >> buffer to the Fn state API is *not* a shortcut.It has benefits that
> will
> > >> not be achieved by SDK-side implementation on top of either ordered or
> > >> unordered multimap. Are those benefits worth expanding the API? I
> don't
> > >> know.
> > >>
> > >> A change to allow a runner to have a specialized implementation for
> > >> time-buffered state would be one or more StateKey types, right?
> Reuven,
> > >> maybe put this and your Java API in a doc? A BIP? Seems like there's
> at
> > >> least the following to explore:
> > >>
> > >>  - how that Java API would map to an SDK-side implementation on top of
> > >> multimap state key
> > >>  - how that Java API would map to a new StateKey
> > >>  - whether there's actually more than one relevant implementation of
> that
> > >> StateKey
> > >>  - whether SDK-side implementation on some other state key would be
> > >> performant enough in all SDK languages (present and future)
> > >>
> > >> Zooming back out to generic philosophy: Proliferation of StateKey
> > >> types tuned by runners (which can very easily still share
> implementation)
> > >> is probably better than proliferation of complex SDK-side
> implementations
> > >> with varying completeness and performance.
> > >>
> > >> Kenn
> > >>
> > >> On Wed, Jun 17, 2020 at 3:24 PM Reuven Lax  wrote:
> > >>
> > >>> It might help for me to describe what I have in mind. I'm still
> > >>> proposing that we build multimap, just not a globally-sorted
> multimap.
> > >>>
> > >>> My previous proposal was that we provide a Multimap state
> > >>> type, sorted by key. this would have two additional operations -
> > >>> multimap.getRange(startKey, endKey) and
> 

Re: [External] Re: DISCUSS: Sorted MapState API

2020-09-15 Thread Tyson Hamilton
Hi Reuven,

I noticed that there was an implementation of the in-memory OrderedListState 
introduced [1]. Where can I find out more regarding the plan and design? Is 
there a design doc? I'd like to know more details about the implementation to 
see if it fits my use case. I was hoping it would have a 
remove(TimestampedValue e) method.

Thanks,
-Tyson


[1]: 
https://github.com/apache/beam/commit/9d0d0b0c4506b288164b155c5ce3a23d76db3c41


On 2020/08/03 21:41:46, Catlyn Kong  wrote: 
> Hey folks,
> 
> Sry I'm late to this thread but this might be very helpful for the problem
> we're dealing with. Do we have a design doc or a jira ticket I can follow?
> 
> Cheers,
> Catlyn
> 
> On Thu, Jun 18, 2020 at 1:11 PM Jan Lukavský  wrote:
> 
> > My questions were just an example. I fully agree there is a fundamental
> > need for a sorted state (of some form, and I also think this links to
> > efficient implementation of retrations) - I was reacting to Kenn's question
> > about BIP. This one would be pretty nice example why it would be good to
> > have such a "process" - not everything can be solved on ML and there are
> > fundamental decisions that might need a closer attention.
> > On 6/18/20 5:28 PM, Reuven Lax wrote:
> >
> > Jan - my proposal is exactly TimeSortedBagState (more accurately -
> > TimeSortedListState), though I went a bit further and also proposed a way
> > to have a dynamic number of tagged TimeSortedBagStates.
> >
> > You are correct that the runner doesn't really have to store the data time
> > sorted - what's actually needed is the ability to fetch and remove
> > timestamp ranges of data (though that does include fetching the entire
> > list); TimeOrderedState is probably a more accurate name then
> > TimeSortedState. I don't think we could get away with operations that only
> > act on the smallest timestamp, however we could limit the API to only being
> > able to fetch and remove prefixes of data (ordered by timestamp). However
> > if we support prefixes, we might as well support arbitrary subranges.
> >
> > On Thu, Jun 18, 2020 at 7:26 AM Jan Lukavský  wrote:
> >
> >> Big +1 for a BIP, as this might really help clarify all the pros and cons
> >> of all possibilities. There seem to be questions that need answering and
> >> motivating use cases - do we need sorted map state or can we solve our use
> >> cases by something simpler - e.g. the mentioned TimeSortedBagState? Does
> >> that really have to be time-sorted structure, or does it "only" have to
> >> have operations that can efficiently find and remove element with smallest
> >> timestamp (like a PriorityQueue)?
> >>
> >> Jan
> >> On 6/18/20 5:32 AM, Kenneth Knowles wrote:
> >>
> >> Zooming in from generic philosophy to be clear: adding time ordered
> >> buffer to the Fn state API is *not* a shortcut.It has benefits that will
> >> not be achieved by SDK-side implementation on top of either ordered or
> >> unordered multimap. Are those benefits worth expanding the API? I don't
> >> know.
> >>
> >> A change to allow a runner to have a specialized implementation for
> >> time-buffered state would be one or more StateKey types, right? Reuven,
> >> maybe put this and your Java API in a doc? A BIP? Seems like there's at
> >> least the following to explore:
> >>
> >>  - how that Java API would map to an SDK-side implementation on top of
> >> multimap state key
> >>  - how that Java API would map to a new StateKey
> >>  - whether there's actually more than one relevant implementation of that
> >> StateKey
> >>  - whether SDK-side implementation on some other state key would be
> >> performant enough in all SDK languages (present and future)
> >>
> >> Zooming back out to generic philosophy: Proliferation of StateKey
> >> types tuned by runners (which can very easily still share implementation)
> >> is probably better than proliferation of complex SDK-side implementations
> >> with varying completeness and performance.
> >>
> >> Kenn
> >>
> >> On Wed, Jun 17, 2020 at 3:24 PM Reuven Lax  wrote:
> >>
> >>> It might help for me to describe what I have in mind. I'm still
> >>> proposing that we build multimap, just not a globally-sorted multimap.
> >>>
> >>> My previous proposal was that we provide a Multimap state
> >>> type, sorted by key. this would have two additional operations -
> >>> multimap.getRange(startKey, endKey) and multimap.deleteRange(startKey,
> >>> endKey). The primary use case was timestamp sorting, but I felt that a
> >>> sorted multimap provided a nice generalization - after all, you can simply
> >>> key the multimap by timestamp to get timestamp sorting.
> >>>
> >>> This approach had some issues immediately that would take some work to
> >>> solve. Since a multimap key can have any type and a runner will only be
> >>> able to sort by encoded type, we would need to introduce a concept of
> >>> order-preserving coders into Beam and plumb that through. Robert pointed
> >>> out that even our existing standard coders for simple integral 

Re: [External] Re: DISCUSS: Sorted MapState API

2020-08-03 Thread Catlyn Kong
Hey folks,

Sry I'm late to this thread but this might be very helpful for the problem
we're dealing with. Do we have a design doc or a jira ticket I can follow?

Cheers,
Catlyn

On Thu, Jun 18, 2020 at 1:11 PM Jan Lukavský  wrote:

> My questions were just an example. I fully agree there is a fundamental
> need for a sorted state (of some form, and I also think this links to
> efficient implementation of retrations) - I was reacting to Kenn's question
> about BIP. This one would be pretty nice example why it would be good to
> have such a "process" - not everything can be solved on ML and there are
> fundamental decisions that might need a closer attention.
> On 6/18/20 5:28 PM, Reuven Lax wrote:
>
> Jan - my proposal is exactly TimeSortedBagState (more accurately -
> TimeSortedListState), though I went a bit further and also proposed a way
> to have a dynamic number of tagged TimeSortedBagStates.
>
> You are correct that the runner doesn't really have to store the data time
> sorted - what's actually needed is the ability to fetch and remove
> timestamp ranges of data (though that does include fetching the entire
> list); TimeOrderedState is probably a more accurate name then
> TimeSortedState. I don't think we could get away with operations that only
> act on the smallest timestamp, however we could limit the API to only being
> able to fetch and remove prefixes of data (ordered by timestamp). However
> if we support prefixes, we might as well support arbitrary subranges.
>
> On Thu, Jun 18, 2020 at 7:26 AM Jan Lukavský  wrote:
>
>> Big +1 for a BIP, as this might really help clarify all the pros and cons
>> of all possibilities. There seem to be questions that need answering and
>> motivating use cases - do we need sorted map state or can we solve our use
>> cases by something simpler - e.g. the mentioned TimeSortedBagState? Does
>> that really have to be time-sorted structure, or does it "only" have to
>> have operations that can efficiently find and remove element with smallest
>> timestamp (like a PriorityQueue)?
>>
>> Jan
>> On 6/18/20 5:32 AM, Kenneth Knowles wrote:
>>
>> Zooming in from generic philosophy to be clear: adding time ordered
>> buffer to the Fn state API is *not* a shortcut.It has benefits that will
>> not be achieved by SDK-side implementation on top of either ordered or
>> unordered multimap. Are those benefits worth expanding the API? I don't
>> know.
>>
>> A change to allow a runner to have a specialized implementation for
>> time-buffered state would be one or more StateKey types, right? Reuven,
>> maybe put this and your Java API in a doc? A BIP? Seems like there's at
>> least the following to explore:
>>
>>  - how that Java API would map to an SDK-side implementation on top of
>> multimap state key
>>  - how that Java API would map to a new StateKey
>>  - whether there's actually more than one relevant implementation of that
>> StateKey
>>  - whether SDK-side implementation on some other state key would be
>> performant enough in all SDK languages (present and future)
>>
>> Zooming back out to generic philosophy: Proliferation of StateKey
>> types tuned by runners (which can very easily still share implementation)
>> is probably better than proliferation of complex SDK-side implementations
>> with varying completeness and performance.
>>
>> Kenn
>>
>> On Wed, Jun 17, 2020 at 3:24 PM Reuven Lax  wrote:
>>
>>> It might help for me to describe what I have in mind. I'm still
>>> proposing that we build multimap, just not a globally-sorted multimap.
>>>
>>> My previous proposal was that we provide a Multimap state
>>> type, sorted by key. this would have two additional operations -
>>> multimap.getRange(startKey, endKey) and multimap.deleteRange(startKey,
>>> endKey). The primary use case was timestamp sorting, but I felt that a
>>> sorted multimap provided a nice generalization - after all, you can simply
>>> key the multimap by timestamp to get timestamp sorting.
>>>
>>> This approach had some issues immediately that would take some work to
>>> solve. Since a multimap key can have any type and a runner will only be
>>> able to sort by encoded type, we would need to introduce a concept of
>>> order-preserving coders into Beam and plumb that through. Robert pointed
>>> out that even our existing standard coders for simple integral types don't
>>> preserve order, so there will likely be surprises here.
>>>
>>> My current proposal is for a multimap that is not sorted by key, but
>>> that can support.ordered values for a single key. Remember that a multimap
>>> maps K -> Iterable, so this means that each individual Iterable is
>>> ordered, but the keys have no specific order relative to each other. This
>>> is not too different from many multimap implementations where the keys are
>>> unordered, but the list of values for a single key at least has a stable
>>> order.
>>>
>>> The interface would look like this:
>>>
>>> public interface MultimapState extends State {
>>>   // Add a 

Re: DISCUSS: Sorted MapState API

2020-06-18 Thread Jan Lukavský
My questions were just an example. I fully agree there is a fundamental 
need for a sorted state (of some form, and I also think this links to 
efficient implementation of retrations) - I was reacting to Kenn's 
question about BIP. This one would be pretty nice example why it would 
be good to have such a "process" - not everything can be solved on ML 
and there are fundamental decisions that might need a closer attention.


On 6/18/20 5:28 PM, Reuven Lax wrote:
Jan - my proposal is exactly TimeSortedBagState (more accurately - 
TimeSortedListState), though I went a bit further and also proposed a 
way to have a dynamic number of tagged TimeSortedBagStates.


You are correct that the runner doesn't really have to store the data 
time sorted - what's actually needed is the ability to fetch and 
remove timestamp ranges of data (though that does include fetching the 
entire list); TimeOrderedState is probably a more accurate name then 
TimeSortedState. I don't think we could get away with operations that 
only act on the smallest timestamp, however we could limit the API to 
only being able to fetch and remove prefixes of data (ordered by 
timestamp). However if we support prefixes, we might as well support 
arbitrary subranges.


On Thu, Jun 18, 2020 at 7:26 AM Jan Lukavský > wrote:


Big +1 for a BIP, as this might really help clarify all the pros
and cons of all possibilities. There seem to be questions that
need answering and motivating use cases - do we need sorted map
state or can we solve our use cases by something simpler - e.g.
the mentioned TimeSortedBagState? Does that really have to be
time-sorted structure, or does it "only" have to have operations
that can efficiently find and remove element with smallest
timestamp (like a PriorityQueue)?

Jan

On 6/18/20 5:32 AM, Kenneth Knowles wrote:

Zooming in from generic philosophy to be clear: adding time
ordered buffer to the Fn state API is *not* a shortcut.It has
benefits that will not be achieved by SDK-side implementation on
top of either ordered or unordered multimap. Are those benefits
worth expanding the API? I don't know.

A change to allow a runner to have a specialized implementation
for time-buffered state would be one or more StateKey types,
right? Reuven, maybe put this and your Java API in a doc? A BIP?
Seems like there's at least the following to explore:

 - how that Java API would map to an SDK-side implementation on
top of multimap state key
 - how that Java API would map to a new StateKey
 - whether there's actually more than one relevant implementation
of that StateKey
 - whether SDK-side implementation on some other state key would
be performant enough in all SDK languages (present and future)

Zooming back out to generic philosophy: Proliferation of StateKey
types tuned by runners (which can very easily still share
implementation) is probably better than proliferation of complex
SDK-side implementations with varying completeness and performance.

Kenn

On Wed, Jun 17, 2020 at 3:24 PM Reuven Lax mailto:re...@google.com>> wrote:

It might help for me to describe what I have in mind. I'm
still proposing that we build multimap, just not a
globally-sorted multimap.

My previous proposal was that we provide a Multimap state type, sorted by key. this would have two
additional operations - multimap.getRange(startKey, endKey)
and multimap.deleteRange(startKey, endKey). The primary use
case was timestamp sorting, but I felt that a sorted multimap
provided a nice generalization - after all, you can simply
key the multimap by timestamp to get timestamp sorting.

This approach had some issues immediately that would take
some work to solve. Since a multimap key can have any type
and a runner will only be able to sort by encoded type, we
would need to introduce a concept of order-preserving coders
into Beam and plumb that through. Robert pointed out that
even our existing standard coders for simple integral types
don't preserve order, so there will likely be surprises here.

My current proposal is for a multimap that is not sorted by
key, but that can support.ordered values for a single key.
Remember that a multimap maps K -> Iterable, so this means
that each individual Iterable is ordered, but the keys
have no specific order relative to each other. This is not
too different from many multimap implementations where the
keys are unordered, but the list of values for a single key
at least has a stable order.

The interface would look like this:

public interface MultimapState extends State {
  // Add a value with a default timestamp.
  void put(K key, V value);

  // Add 

Re: DISCUSS: Sorted MapState API

2020-06-18 Thread Reuven Lax
Jan - my proposal is exactly TimeSortedBagState (more accurately -
TimeSortedListState), though I went a bit further and also proposed a way
to have a dynamic number of tagged TimeSortedBagStates.

You are correct that the runner doesn't really have to store the data time
sorted - what's actually needed is the ability to fetch and remove
timestamp ranges of data (though that does include fetching the entire
list); TimeOrderedState is probably a more accurate name then
TimeSortedState. I don't think we could get away with operations that only
act on the smallest timestamp, however we could limit the API to only being
able to fetch and remove prefixes of data (ordered by timestamp). However
if we support prefixes, we might as well support arbitrary subranges.

On Thu, Jun 18, 2020 at 7:26 AM Jan Lukavský  wrote:

> Big +1 for a BIP, as this might really help clarify all the pros and cons
> of all possibilities. There seem to be questions that need answering and
> motivating use cases - do we need sorted map state or can we solve our use
> cases by something simpler - e.g. the mentioned TimeSortedBagState? Does
> that really have to be time-sorted structure, or does it "only" have to
> have operations that can efficiently find and remove element with smallest
> timestamp (like a PriorityQueue)?
>
> Jan
> On 6/18/20 5:32 AM, Kenneth Knowles wrote:
>
> Zooming in from generic philosophy to be clear: adding time ordered buffer
> to the Fn state API is *not* a shortcut.It has benefits that will not be
> achieved by SDK-side implementation on top of either ordered or unordered
> multimap. Are those benefits worth expanding the API? I don't know.
>
> A change to allow a runner to have a specialized implementation for
> time-buffered state would be one or more StateKey types, right? Reuven,
> maybe put this and your Java API in a doc? A BIP? Seems like there's at
> least the following to explore:
>
>  - how that Java API would map to an SDK-side implementation on top of
> multimap state key
>  - how that Java API would map to a new StateKey
>  - whether there's actually more than one relevant implementation of that
> StateKey
>  - whether SDK-side implementation on some other state key would be
> performant enough in all SDK languages (present and future)
>
> Zooming back out to generic philosophy: Proliferation of StateKey
> types tuned by runners (which can very easily still share implementation)
> is probably better than proliferation of complex SDK-side implementations
> with varying completeness and performance.
>
> Kenn
>
> On Wed, Jun 17, 2020 at 3:24 PM Reuven Lax  wrote:
>
>> It might help for me to describe what I have in mind. I'm still proposing
>> that we build multimap, just not a globally-sorted multimap.
>>
>> My previous proposal was that we provide a Multimap state
>> type, sorted by key. this would have two additional operations -
>> multimap.getRange(startKey, endKey) and multimap.deleteRange(startKey,
>> endKey). The primary use case was timestamp sorting, but I felt that a
>> sorted multimap provided a nice generalization - after all, you can simply
>> key the multimap by timestamp to get timestamp sorting.
>>
>> This approach had some issues immediately that would take some work to
>> solve. Since a multimap key can have any type and a runner will only be
>> able to sort by encoded type, we would need to introduce a concept of
>> order-preserving coders into Beam and plumb that through. Robert pointed
>> out that even our existing standard coders for simple integral types don't
>> preserve order, so there will likely be surprises here.
>>
>> My current proposal is for a multimap that is not sorted by key, but that
>> can support.ordered values for a single key. Remember that a multimap maps
>> K -> Iterable, so this means that each individual Iterable is
>> ordered, but the keys have no specific order relative to each other. This
>> is not too different from many multimap implementations where the keys are
>> unordered, but the list of values for a single key at least has a stable
>> order.
>>
>> The interface would look like this:
>>
>> public interface MultimapState extends State {
>>   // Add a value with a default timestamp.
>>   void put(K key, V value);
>>
>>   // Add a timestamped value.
>>   void put(K, key, TimestampedValue value);
>>
>>   // Remove all values for a key.
>>   void remove (K key);
>>
>>   // Remove all values for a key with timestamps within the specified
>> range.
>>   void removeRange(K key, Instant startTs, Instant endTs);
>>
>>   // Get an Iterable of values for V. The Iterable will be returned
>> sorted by timestamp.
>>   ReadableState>> get(K key);
>>
>>   // Get an Iterable of values for V in the specified range. The Iterable
>> will be returned sorted by timestamp.
>>   ReadableState>> getRange(K key, Instant
>> startTs, Instant endTs);
>>
>>   ReadableState> keys();
>>   ReadableState>> values();
>>   ReadableState> entries;
>> }
>>
>> We can of course 

Re: DISCUSS: Sorted MapState API

2020-06-18 Thread Jan Lukavský
Big +1 for a BIP, as this might really help clarify all the pros and 
cons of all possibilities. There seem to be questions that need 
answering and motivating use cases - do we need sorted map state or can 
we solve our use cases by something simpler - e.g. the mentioned 
TimeSortedBagState? Does that really have to be time-sorted structure, 
or does it "only" have to have operations that can efficiently find and 
remove element with smallest timestamp (like a PriorityQueue)?


Jan

On 6/18/20 5:32 AM, Kenneth Knowles wrote:
Zooming in from generic philosophy to be clear: adding time ordered 
buffer to the Fn state API is *not* a shortcut.It has benefits that 
will not be achieved by SDK-side implementation on top of either 
ordered or unordered multimap. Are those benefits worth expanding 
the API? I don't know.


A change to allow a runner to have a specialized implementation for 
time-buffered state would be one or more StateKey types, right? 
Reuven, maybe put this and your Java API in a doc? A BIP? Seems like 
there's at least the following to explore:


 - how that Java API would map to an SDK-side implementation on top of 
multimap state key

 - how that Java API would map to a new StateKey
 - whether there's actually more than one relevant implementation of 
that StateKey
 - whether SDK-side implementation on some other state key would be 
performant enough in all SDK languages (present and future)


Zooming back out to generic philosophy: Proliferation of StateKey 
types tuned by runners (which can very easily still share 
implementation) is probably better than proliferation of complex 
SDK-side implementations with varying completeness and performance.


Kenn

On Wed, Jun 17, 2020 at 3:24 PM Reuven Lax > wrote:


It might help for me to describe what I have in mind. I'm still
proposing that we build multimap, just not a globally-sorted
multimap.

My previous proposal was that we provide a Multimap
state type, sorted by key. this would have two additional
operations - multimap.getRange(startKey, endKey) and
multimap.deleteRange(startKey, endKey). The primary use case was
timestamp sorting, but I felt that a sorted multimap provided a
nice generalization - after all, you can simply key the multimap
by timestamp to get timestamp sorting.

This approach had some issues immediately that would take some
work to solve. Since a multimap key can have any type and a runner
will only be able to sort by encoded type, we would need to
introduce a concept of order-preserving coders into Beam and
plumb that through. Robert pointed out that even our existing
standard coders for simple integral types don't preserve order, so
there will likely be surprises here.

My current proposal is for a multimap that is not sorted by key,
but that can support.ordered values for a single key. Remember
that a multimap maps K -> Iterable, so this means that each
individual Iterable is ordered, but the keys have no specific
order relative to each other. This is not too different from many
multimap implementations where the keys are unordered, but the
list of values for a single key at least has a stable order.

The interface would look like this:

public interface MultimapState extends State {
  // Add a value with a default timestamp.
  void put(K key, V value);

  // Add a timestamped value.
  void put(K, key, TimestampedValue value);

  // Remove all values for a key.
  void remove (K key);

  // Remove all values for a key with timestamps within the
specified range.
  void removeRange(K key, Instant startTs, Instant endTs);

  // Get an Iterable of values for V. The Iterable will be
returned sorted by timestamp.
ReadableState>> get(K key);

  // Get an Iterable of values for V in the specified range. The
Iterable will be returned sorted by timestamp.
ReadableState>> getRange(K key,
Instant startTs, Instant endTs);

  ReadableState> keys();
ReadableState>> values();
  ReadableState> entries;
}

We can of course provide helper functions that allow using
MultimapState without deailing with TimestampValue for users who
only want a multimap and don't want sorting.

I think many users will only need a single sorted list - not a
full multimap. It's worth offering this as well, and we can simply
build it on top of MultimapState. It will look like an extension
of BagState

public interface TimestampSortedListState extends State {
  void add(TimestampedValue value);
  Iterable> read();
  Iterable> readRange(Instant startTs, Instant
endTs);
  void clearRange(Instant startTs, Instant endTs);
}


On Wed, Jun 17, 2020 at 2:47 PM Luke Cwik mailto:lc...@google.com>> wrote:

The portability layer is meant to live across multiple
versions of Beam and I don't 

Re: DISCUSS: Sorted MapState API

2020-06-17 Thread Kenneth Knowles
Zooming in from generic philosophy to be clear: adding time ordered buffer
to the Fn state API is *not* a shortcut.It has benefits that will not be
achieved by SDK-side implementation on top of either ordered or unordered
multimap. Are those benefits worth expanding the API? I don't know.

A change to allow a runner to have a specialized implementation for
time-buffered state would be one or more StateKey types, right? Reuven,
maybe put this and your Java API in a doc? A BIP? Seems like there's at
least the following to explore:

 - how that Java API would map to an SDK-side implementation on top of
multimap state key
 - how that Java API would map to a new StateKey
 - whether there's actually more than one relevant implementation of that
StateKey
 - whether SDK-side implementation on some other state key would be
performant enough in all SDK languages (present and future)

Zooming back out to generic philosophy: Proliferation of StateKey
types tuned by runners (which can very easily still share implementation)
is probably better than proliferation of complex SDK-side implementations
with varying completeness and performance.

Kenn

On Wed, Jun 17, 2020 at 3:24 PM Reuven Lax  wrote:

> It might help for me to describe what I have in mind. I'm still proposing
> that we build multimap, just not a globally-sorted multimap.
>
> My previous proposal was that we provide a Multimap state
> type, sorted by key. this would have two additional operations -
> multimap.getRange(startKey, endKey) and multimap.deleteRange(startKey,
> endKey). The primary use case was timestamp sorting, but I felt that a
> sorted multimap provided a nice generalization - after all, you can simply
> key the multimap by timestamp to get timestamp sorting.
>
> This approach had some issues immediately that would take some work to
> solve. Since a multimap key can have any type and a runner will only be
> able to sort by encoded type, we would need to introduce a concept of
> order-preserving coders into Beam and plumb that through. Robert pointed
> out that even our existing standard coders for simple integral types don't
> preserve order, so there will likely be surprises here.
>
> My current proposal is for a multimap that is not sorted by key, but that
> can support.ordered values for a single key. Remember that a multimap maps
> K -> Iterable, so this means that each individual Iterable is
> ordered, but the keys have no specific order relative to each other. This
> is not too different from many multimap implementations where the keys are
> unordered, but the list of values for a single key at least has a stable
> order.
>
> The interface would look like this:
>
> public interface MultimapState extends State {
>   // Add a value with a default timestamp.
>   void put(K key, V value);
>
>   // Add a timestamped value.
>   void put(K, key, TimestampedValue value);
>
>   // Remove all values for a key.
>   void remove (K key);
>
>   // Remove all values for a key with timestamps within the specified
> range.
>   void removeRange(K key, Instant startTs, Instant endTs);
>
>   // Get an Iterable of values for V. The Iterable will be returned sorted
> by timestamp.
>   ReadableState>> get(K key);
>
>   // Get an Iterable of values for V in the specified range. The Iterable
> will be returned sorted by timestamp.
>   ReadableState>> getRange(K key, Instant
> startTs, Instant endTs);
>
>   ReadableState> keys();
>   ReadableState>> values();
>   ReadableState> entries;
> }
>
> We can of course provide helper functions that allow using MultimapState
> without deailing with TimestampValue for users who only want a multimap and
> don't want sorting.
>
> I think many users will only need a single sorted list - not a full
> multimap. It's worth offering this as well, and we can simply build it on
> top of MultimapState. It will look like an extension of BagState
>
> public interface TimestampSortedListState extends State {
>   void add(TimestampedValue value);
>   Iterable> read();
>   Iterable> readRange(Instant startTs, Instant endTs);
>   void clearRange(Instant startTs, Instant endTs);
> }
>
>
> On Wed, Jun 17, 2020 at 2:47 PM Luke Cwik  wrote:
>
>> The portability layer is meant to live across multiple versions of Beam
>> and I don't think it should be treated by doing the simple and useful thing
>> now since I believe it will lead to a proliferation of the API.
>>
>> On Wed, Jun 17, 2020 at 2:30 PM Kenneth Knowles  wrote:
>>
>>> I have thoughts on the subject of whether to have APIs just for the
>>> lowest-level building blocks versus having APIs for higher-level
>>> constructs. Specifically this applies to providing only unsorted multimap
>>> vs what I will call "time-ordered buffer". TL;DR: I'd vote to focus on
>>> time-ordered buffer; if it turns out to be easy to go all the way to sorted
>>> multimap that's nice-to-have; if it turns out to be easy to implement on
>>> top of unsorted map state that should probably be under the hood
>>>
>>> 

Re: DISCUSS: Sorted MapState API

2020-06-17 Thread Reuven Lax
It might help for me to describe what I have in mind. I'm still proposing
that we build multimap, just not a globally-sorted multimap.

My previous proposal was that we provide a Multimap state type,
sorted by key. this would have two additional operations -
multimap.getRange(startKey, endKey) and multimap.deleteRange(startKey,
endKey). The primary use case was timestamp sorting, but I felt that a
sorted multimap provided a nice generalization - after all, you can simply
key the multimap by timestamp to get timestamp sorting.

This approach had some issues immediately that would take some work to
solve. Since a multimap key can have any type and a runner will only be
able to sort by encoded type, we would need to introduce a concept of
order-preserving coders into Beam and plumb that through. Robert pointed
out that even our existing standard coders for simple integral types don't
preserve order, so there will likely be surprises here.

My current proposal is for a multimap that is not sorted by key, but that
can support.ordered values for a single key. Remember that a multimap maps
K -> Iterable, so this means that each individual Iterable is
ordered, but the keys have no specific order relative to each other. This
is not too different from many multimap implementations where the keys are
unordered, but the list of values for a single key at least has a stable
order.

The interface would look like this:

public interface MultimapState extends State {
  // Add a value with a default timestamp.
  void put(K key, V value);

  // Add a timestamped value.
  void put(K, key, TimestampedValue value);

  // Remove all values for a key.
  void remove (K key);

  // Remove all values for a key with timestamps within the specified range.
  void removeRange(K key, Instant startTs, Instant endTs);

  // Get an Iterable of values for V. The Iterable will be returned sorted
by timestamp.
  ReadableState>> get(K key);

  // Get an Iterable of values for V in the specified range. The Iterable
will be returned sorted by timestamp.
  ReadableState>> getRange(K key, Instant
startTs, Instant endTs);

  ReadableState> keys();
  ReadableState>> values();
  ReadableState> entries;
}

We can of course provide helper functions that allow using MultimapState
without deailing with TimestampValue for users who only want a multimap and
don't want sorting.

I think many users will only need a single sorted list - not a full
multimap. It's worth offering this as well, and we can simply build it on
top of MultimapState. It will look like an extension of BagState

public interface TimestampSortedListState extends State {
  void add(TimestampedValue value);
  Iterable> read();
  Iterable> readRange(Instant startTs, Instant endTs);
  void clearRange(Instant startTs, Instant endTs);
}


On Wed, Jun 17, 2020 at 2:47 PM Luke Cwik  wrote:

> The portability layer is meant to live across multiple versions of Beam
> and I don't think it should be treated by doing the simple and useful thing
> now since I believe it will lead to a proliferation of the API.
>
> On Wed, Jun 17, 2020 at 2:30 PM Kenneth Knowles  wrote:
>
>> I have thoughts on the subject of whether to have APIs just for the
>> lowest-level building blocks versus having APIs for higher-level
>> constructs. Specifically this applies to providing only unsorted multimap
>> vs what I will call "time-ordered buffer". TL;DR: I'd vote to focus on
>> time-ordered buffer; if it turns out to be easy to go all the way to sorted
>> multimap that's nice-to-have; if it turns out to be easy to implement on
>> top of unsorted map state that should probably be under the hood
>>
>> Reasons to build low-level multimap in the runner & fn api and layer
>> higher-level things in the SDK:
>>
>>  - It is less implementation for runners if they have to only provide
>> fewer lower-level building blocks like multimap state.
>>  - There are many more runners than SDKs (and will be even more and more)
>> so this saves overall.
>>
>> Reasons to build higher-level constructs directly in the runner and fn
>> api:
>>
>>  - Having multiple higher-level state types may actually be less
>> implementation than one complex state type, especially if they map to
>> runner primitives.
>>  - The runner may have better specialized implementations, especially for
>> something like a time-ordered buffer.
>>  - The particular access patterns in an SDK-based implementation may not
>> be ideal for each runner's underlying implementation of the low-level
>> building block.
>>  - There may be excessive gRPC overhead even for optimal access patterns.
>>
>> There are ways to have best of both worlds, like:
>>
>> 1. Define multiple state types according to fundamental access patterns,
>> like we did this before portability.
>> 2. If it is easy to layer one on top of the other, do that inside the
>> runner. Provide shared code so for runners providing the lowest-level
>> primitive they get all the types for free.
>>
>> I understand that 

Re: DISCUSS: Sorted MapState API

2020-06-17 Thread Luke Cwik
The portability layer is meant to live across multiple versions of Beam and
I don't think it should be treated by doing the simple and useful thing now
since I believe it will lead to a proliferation of the API.

On Wed, Jun 17, 2020 at 2:30 PM Kenneth Knowles  wrote:

> I have thoughts on the subject of whether to have APIs just for the
> lowest-level building blocks versus having APIs for higher-level
> constructs. Specifically this applies to providing only unsorted multimap
> vs what I will call "time-ordered buffer". TL;DR: I'd vote to focus on
> time-ordered buffer; if it turns out to be easy to go all the way to sorted
> multimap that's nice-to-have; if it turns out to be easy to implement on
> top of unsorted map state that should probably be under the hood
>
> Reasons to build low-level multimap in the runner & fn api and layer
> higher-level things in the SDK:
>
>  - It is less implementation for runners if they have to only provide
> fewer lower-level building blocks like multimap state.
>  - There are many more runners than SDKs (and will be even more and more)
> so this saves overall.
>
> Reasons to build higher-level constructs directly in the runner and fn api:
>
>  - Having multiple higher-level state types may actually be less
> implementation than one complex state type, especially if they map to
> runner primitives.
>  - The runner may have better specialized implementations, especially for
> something like a time-ordered buffer.
>  - The particular access patterns in an SDK-based implementation may not
> be ideal for each runner's underlying implementation of the low-level
> building block.
>  - There may be excessive gRPC overhead even for optimal access patterns.
>
> There are ways to have best of both worlds, like:
>
> 1. Define multiple state types according to fundamental access patterns,
> like we did this before portability.
> 2. If it is easy to layer one on top of the other, do that inside the
> runner. Provide shared code so for runners providing the lowest-level
> primitive they get all the types for free.
>
> I understand that this is an oversimplification. It still creates some
> more work. And APIs are a burden so it is good to introduce as few as
> possible for maintenance. But it has performance benefits and also unblocks
> "just doing the simple and useful thing now" which I always like to do as
> long as it is compatible with future changes. If the APIs are fundamental,
> like sets, maps, timestamp ordering, then it is safe to guess that they
> will change rarely and be useful forever.
>
> Kenn
>
> On Tue, Jun 16, 2020 at 2:54 PM Luke Cwik  wrote:
>
>> I would be glad to take a stab at how to provide sorting on top of
>> unsorted multimap state.
>> Based upon your description, you want integer keys representing
>> timestamps and arbitrary user value for the values, is that correct?
>> What kinds of operations do you need on the sorted map state in order of
>> efficiency requirements?
>> (e.g. Next(x), Previous(x), GetAll(Range[x, y)), ClearAll(Range[x, y))
>> What kinds of operations do we expect the underlying unsorted map state
>> to be able to provide?
>> (at a minimum Get(K), Append(K), Clear(K) but what else e.g.
>> enumerate(K)?)
>>
>> I went through a similar exercise of how to provide a list like side
>> input view over a multimap[1] side input which efficiently allowed
>> computation of size and provided random access while only having access to
>> get(K) and enumerate K's.
>>
>> 1:
>> https://github.com/lukecwik/incubator-beam/blob/ec8769f6163ca8a4daecc2fb29708bc1da430917/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java#L568
>>
>> On Tue, Jun 16, 2020 at 8:47 AM Reuven Lax  wrote:
>>
>>> Bringing this subject up again,
>>>
>>> I've spent some time looking into implementing this for the Dataflow
>>> runner. I'm unable to find a way to implement the arbitrary sorted multimap
>>> efficiently for the case where there are large numbers of unique keys.
>>> Since the primary driving use case is timestamp ordering (i.e. key is event
>>> timestamp), you would expect to have nearly a new key per element. I
>>> considered Luke's suggestion above, but unfortunately it doesn't really
>>> solve this issue.
>>>
>>> The primary use case for sorting always seems to be sorting by
>>> timestamp. I want to propose that instead of building the fully-general
>>> sorted multimap, we instead focus on a state type where the sort key is an
>>> integral type (like a timestamp or an integer). There is still a valid use
>>> case for multimap, but we can provide that as an unordered state. At least
>>> for Dataflow, it will be much easier
>>>
>>> While my difficulties here may be specific to the Dataflow runner, any
>>> such support would have to be built into other runners as well, and
>>> limiting to integral sorting likely makes it easier for other runners to
>>> implement this. Also, if you look at this
>>> 

Re: DISCUSS: Sorted MapState API

2020-06-17 Thread Kenneth Knowles
I have thoughts on the subject of whether to have APIs just for the
lowest-level building blocks versus having APIs for higher-level
constructs. Specifically this applies to providing only unsorted multimap
vs what I will call "time-ordered buffer". TL;DR: I'd vote to focus on
time-ordered buffer; if it turns out to be easy to go all the way to sorted
multimap that's nice-to-have; if it turns out to be easy to implement on
top of unsorted map state that should probably be under the hood

Reasons to build low-level multimap in the runner & fn api and layer
higher-level things in the SDK:

 - It is less implementation for runners if they have to only provide fewer
lower-level building blocks like multimap state.
 - There are many more runners than SDKs (and will be even more and more)
so this saves overall.

Reasons to build higher-level constructs directly in the runner and fn api:

 - Having multiple higher-level state types may actually be less
implementation than one complex state type, especially if they map to
runner primitives.
 - The runner may have better specialized implementations, especially for
something like a time-ordered buffer.
 - The particular access patterns in an SDK-based implementation may not be
ideal for each runner's underlying implementation of the low-level building
block.
 - There may be excessive gRPC overhead even for optimal access patterns.

There are ways to have best of both worlds, like:

1. Define multiple state types according to fundamental access patterns,
like we did this before portability.
2. If it is easy to layer one on top of the other, do that inside the
runner. Provide shared code so for runners providing the lowest-level
primitive they get all the types for free.

I understand that this is an oversimplification. It still creates some more
work. And APIs are a burden so it is good to introduce as few as possible
for maintenance. But it has performance benefits and also unblocks "just
doing the simple and useful thing now" which I always like to do as long as
it is compatible with future changes. If the APIs are fundamental, like
sets, maps, timestamp ordering, then it is safe to guess that they will
change rarely and be useful forever.

Kenn

On Tue, Jun 16, 2020 at 2:54 PM Luke Cwik  wrote:

> I would be glad to take a stab at how to provide sorting on top of
> unsorted multimap state.
> Based upon your description, you want integer keys representing timestamps
> and arbitrary user value for the values, is that correct?
> What kinds of operations do you need on the sorted map state in order of
> efficiency requirements?
> (e.g. Next(x), Previous(x), GetAll(Range[x, y)), ClearAll(Range[x, y))
> What kinds of operations do we expect the underlying unsorted map state to
> be able to provide?
> (at a minimum Get(K), Append(K), Clear(K) but what else e.g. enumerate(K)?)
>
> I went through a similar exercise of how to provide a list like side input
> view over a multimap[1] side input which efficiently allowed computation of
> size and provided random access while only having access to get(K) and
> enumerate K's.
>
> 1:
> https://github.com/lukecwik/incubator-beam/blob/ec8769f6163ca8a4daecc2fb29708bc1da430917/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java#L568
>
> On Tue, Jun 16, 2020 at 8:47 AM Reuven Lax  wrote:
>
>> Bringing this subject up again,
>>
>> I've spent some time looking into implementing this for the Dataflow
>> runner. I'm unable to find a way to implement the arbitrary sorted multimap
>> efficiently for the case where there are large numbers of unique keys.
>> Since the primary driving use case is timestamp ordering (i.e. key is event
>> timestamp), you would expect to have nearly a new key per element. I
>> considered Luke's suggestion above, but unfortunately it doesn't really
>> solve this issue.
>>
>> The primary use case for sorting always seems to be sorting by timestamp.
>> I want to propose that instead of building the fully-general sorted
>> multimap, we instead focus on a state type where the sort key is an
>> integral type (like a timestamp or an integer). There is still a valid use
>> case for multimap, but we can provide that as an unordered state. At least
>> for Dataflow, it will be much easier
>>
>> While my difficulties here may be specific to the Dataflow runner, any
>> such support would have to be built into other runners as well, and
>> limiting to integral sorting likely makes it easier for other runners to
>> implement this. Also, if you look at this
>> 
>>  Flink
>> comment pointed out by Aljoscha, for Flink the main use case identified was
>> also timestamp sorting. This will also simplify the API design for this
>> feature: Sorted multimap with arbitrary keys would require us to introduce
>> a way of 

Re: DISCUSS: Sorted MapState API

2020-06-16 Thread Luke Cwik
I would be glad to take a stab at how to provide sorting on top of unsorted
multimap state.
Based upon your description, you want integer keys representing timestamps
and arbitrary user value for the values, is that correct?
What kinds of operations do you need on the sorted map state in order of
efficiency requirements?
(e.g. Next(x), Previous(x), GetAll(Range[x, y)), ClearAll(Range[x, y))
What kinds of operations do we expect the underlying unsorted map state to
be able to provide?
(at a minimum Get(K), Append(K), Clear(K) but what else e.g. enumerate(K)?)

I went through a similar exercise of how to provide a list like side input
view over a multimap[1] side input which efficiently allowed computation of
size and provided random access while only having access to get(K) and
enumerate K's.

1:
https://github.com/lukecwik/incubator-beam/blob/ec8769f6163ca8a4daecc2fb29708bc1da430917/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java#L568

On Tue, Jun 16, 2020 at 8:47 AM Reuven Lax  wrote:

> Bringing this subject up again,
>
> I've spent some time looking into implementing this for the Dataflow
> runner. I'm unable to find a way to implement the arbitrary sorted multimap
> efficiently for the case where there are large numbers of unique keys.
> Since the primary driving use case is timestamp ordering (i.e. key is event
> timestamp), you would expect to have nearly a new key per element. I
> considered Luke's suggestion above, but unfortunately it doesn't really
> solve this issue.
>
> The primary use case for sorting always seems to be sorting by timestamp.
> I want to propose that instead of building the fully-general sorted
> multimap, we instead focus on a state type where the sort key is an
> integral type (like a timestamp or an integer). There is still a valid use
> case for multimap, but we can provide that as an unordered state. At least
> for Dataflow, it will be much easier
>
> While my difficulties here may be specific to the Dataflow runner, any
> such support would have to be built into other runners as well, and
> limiting to integral sorting likely makes it easier for other runners to
> implement this. Also, if you look at this
> 
>  Flink
> comment pointed out by Aljoscha, for Flink the main use case identified was
> also timestamp sorting. This will also simplify the API design for this
> feature: Sorted multimap with arbitrary keys would require us to introduce
> a way of mapping natural ordering to encoded ordering (i.e. a new
> OrderPreservingCoder), but if we limit sort keys to integral types, the API
> design is simpler as integral types can be represented directly.
>
> Reuven
>
> On Sun, Jun 2, 2019 at 7:04 AM Reuven Lax  wrote:
>
>> This sounds to me like a potential runner strategy. However if a runner
>> can natively support sorted maps (e.g. we expect the Dataflow runner to be
>> able to do so, and I think it would be useful for other runners as well),
>> then it's probably preferable to allow the runner to use its native
>> capabilities.
>>
>> On Fri, May 24, 2019 at 11:05 AM Lukasz Cwik  wrote:
>>
>>> For the API that you proposed, the map key is always "void" and the sort
>>> key == user key. So in my example of
>>> key: dummy value
>>> key.000: token, (0001, value4)
>>> key.001: token, (0010, value1), (0011, value2)
>>> key.01: token
>>> key.1: token, (1011, value3)
>>> you would have:
>>> "void": dummy value
>>> "void".000: token, (0001, value4)
>>> "void".001: token, (0010, value1), (0011, value2)
>>> "void".01: token
>>> "void".1: token, (1011, value3)
>>>
>>> Iterable> entriesUntil(K limit) translates into walking the the
>>> prefixes until you find a common prefix for K and then filter for values
>>> where they have a sort key <= K. Using the example above, to find
>>> entriesUntil(0010) you would:
>>> look for key."", miss
>>> look for key.0, miss
>>> look for key.00, miss
>>> look for key.000, hit, sort all contained values using secondary key,
>>> provide value4 to user
>>> look for key.001, hit, notice that 001 is a prefix of 0010 so we sort
>>> all contained values using secondary key, filter out value2 and provide
>>> value1
>>>
>>> void removeUntil(K limit) also translates into walking the prefixes but
>>> instead we will clear them when we have a "hit" with some special logic for
>>> when the sort key is a prefix of the key. Used the example, to
>>> removeUntil(0010) you would:
>>> look for key."", miss
>>> look for key.0, miss
>>> look for key.00, miss
>>> look for key.000, hit, clear
>>> look for key.001, hit, notice that 001 is a prefix of 0010 so we sort
>>> all contained values using secondary key, store in memory all values that >
>>> 0010, clear and append values stored in memory.
>>>
>>> On Fri, May 24, 2019 at 10:36 AM Reuven Lax  wrote:

Re: DISCUSS: Sorted MapState API

2019-06-02 Thread Reuven Lax
This sounds to me like a potential runner strategy. However if a runner can
natively support sorted maps (e.g. we expect the Dataflow runner to be able
to do so, and I think it would be useful for other runners as well), then
it's probably preferable to allow the runner to use its native capabilities.

On Fri, May 24, 2019 at 11:05 AM Lukasz Cwik  wrote:

> For the API that you proposed, the map key is always "void" and the sort
> key == user key. So in my example of
> key: dummy value
> key.000: token, (0001, value4)
> key.001: token, (0010, value1), (0011, value2)
> key.01: token
> key.1: token, (1011, value3)
> you would have:
> "void": dummy value
> "void".000: token, (0001, value4)
> "void".001: token, (0010, value1), (0011, value2)
> "void".01: token
> "void".1: token, (1011, value3)
>
> Iterable> entriesUntil(K limit) translates into walking the the
> prefixes until you find a common prefix for K and then filter for values
> where they have a sort key <= K. Using the example above, to find
> entriesUntil(0010) you would:
> look for key."", miss
> look for key.0, miss
> look for key.00, miss
> look for key.000, hit, sort all contained values using secondary key,
> provide value4 to user
> look for key.001, hit, notice that 001 is a prefix of 0010 so we sort all
> contained values using secondary key, filter out value2 and provide value1
>
> void removeUntil(K limit) also translates into walking the prefixes but
> instead we will clear them when we have a "hit" with some special logic for
> when the sort key is a prefix of the key. Used the example, to
> removeUntil(0010) you would:
> look for key."", miss
> look for key.0, miss
> look for key.00, miss
> look for key.000, hit, clear
> look for key.001, hit, notice that 001 is a prefix of 0010 so we sort all
> contained values using secondary key, store in memory all values that >
> 0010, clear and append values stored in memory.
>
> On Fri, May 24, 2019 at 10:36 AM Reuven Lax  wrote:
>
>> Can you explain how fetching and deleting ranges of keys would work with
>> this data structure?
>>
>> On Fri, May 24, 2019 at 9:50 AM Lukasz Cwik  wrote:
>>
>>> Reuven, for the example, I assume that we never want to store more then
>>> 2 values at a given sort key prefix, and if we do then we will create a new
>>> longer prefix splitting up the values based upon the sort key.
>>>
>>> Tuple representation in examples below is (key, sort key, value) and .
>>> is a character outside of the alphabet which can be represented by using an
>>> escaping encoding that wraps the key + sort key encoding.
>>>
>>> To insert (key, 0010, value1), we lookup "key" + all the prefixes of
>>> 0010 finding one that is not empty. In this case its 0, so we append value
>>> to the map at key.0 ending up with (we also set the key to any dummy value
>>> to know that it it contains values):
>>> key: dummy value
>>> key."": token, (0010, value1)
>>> Now we insert (key, 0011, value2), we again lookup "key" + all the
>>> prefixes of 0010, finding "", so we append value2 to key."" ending up with:
>>> key: dummy value
>>> key."": token, (0010, value1), (0011, value2)
>>> Now we insert (key, 1011, value3), we again lookup "key" + all the
>>> prefixes of 1011 finding "" but notice that it is full, so we partition all
>>> the values into two prefixes 0 and 1. We also clear the "" prefix ending up
>>> with:
>>> key: dummy value
>>> key.0: token, (0010, value1), (0011, value2)
>>> key.1: token, (1011, value3)
>>> Now we insert (key, 0001, value4), we again lookup "key" + all the
>>> prefixes of the value finding 0 but notice that it is full, so we partition
>>> all the values into two prefixes 00 and 01 but notice this doesn't help us
>>> since 00 will be too full so we split 00 again to 000, 001. We also clear
>>> the 0 prefix ending up with:
>>> key: dummy value
>>> key.000: token, (0001, value4)
>>> key.001: token, (0010, value1), (0011, value2)
>>> key.01: token
>>> key.1: token, (1011, value3)
>>>
>>> We are effectively building a trie[1] where we only have values at the
>>> leaves and control how full each leaf can be. There are other trie
>>> representations like a radix tree that may be better.
>>>
>>> Looking up the values in sorted order for "key" would go like this:
>>> Is key set, yes
>>> look for key."", miss
>>> look for key.0, miss
>>> look for key.00, miss
>>> look for key.000, hit, sort all contained values using secondary key,
>>> provide value4 to user
>>> look for key.001, hit, sort all contained values using secondary key,
>>> provide value1 followed by value2 to user
>>> look for key.01, hit, empty, return no values to user
>>> look for key.1, hit, sort all contained values using secondary key,
>>> provide value3 to user
>>> we have walked the entire prefix space, signal end of iterable
>>>
>>> Some notes for the above:
>>> * The dummy value is used to know that the key contains values and the
>>> token is to know whether there are any values deeper in the trie so when we
>>> 

Re: DISCUSS: Sorted MapState API

2019-05-30 Thread Kenneth Knowles
On Tue, May 28, 2019 at 2:59 AM Robert Bradshaw  wrote:

> On Fri, May 24, 2019 at 6:57 PM Kenneth Knowles  wrote:
> >
> > On Fri, May 24, 2019 at 9:51 AM Kenneth Knowles  wrote:
> >>
> >> On Fri, May 24, 2019 at 8:14 AM Reuven Lax  wrote:
> >>>
> >>> Some great comments!
> >>>
> >>> Aljoscha: absolutely this would have to be implemented by runners to
> be efficient. We can of course provide a default (inefficient)
> implementation, but ideally runners would provide better ones.
> >>>
> >>> Jan Exactly. I think MapState can be dropped or backed by this. E.g.
> >>>
> >>> Robert Great point about standard coders not satisfying this. That's
> why I suggested that we provide a way to tag the coders that do preserve
> order, and only accept those as key coders Alternatively we could present a
> more limited API - e.g. only allowing a hard-coded set of types to be used
> as keys - but that seems counter to the direction Beam usually goes.
> >>
> >>
> >> I think we got it right with GroupByKey: the encoded form of a key is
> authoritative/portable.
> >>
> >> Instead of seeing the in-language type as the "real" value and the
> coder a way to serialize it, the portable encoded bytestring is the "real"
> value and the representation in a particular SDK is the responsibility of
> the SDK.
> >>
> >> This is very important, because many in-language representations,
> especially low-level representations, do not have the desired equality. For
> example, Java arrays. Coder.structuralValue(...) is required to have
> equality that matches the equality of the encoded form. It can be a noop if
> the in-language equality already matches. Or it can be a full encoding if
> there is not a more efficient option. I think we could add another method
> "lexicalValue" or add the requirement that structuralValue also sort
> equivalently to the wire format.
> >>
> >> Now, since many of our wire formats do not sort in the natural
> mathematical order, SDKs should help users avoid the pitfall of using
> these, as we do for GBK by checking "determinism" of the coder. Note that I
> am *not* referring to the order they would sort in any particular
> programming language implementation.
> >
> >
> > Another related features request. Today we do this:
> >
> > (1) infer any coder for a type
> > (2) crash if it is not suitable for GBK
> >
> > I have proposed in the past that instead we:
> >
> > (1) notice that a PCollection is input to a GBK
> > (2) infer an appropriate coder for a type that will work with GBK
> >
> > This generalizes to the idea of registering multiple coders for
> different purposes, particularly inferring a coder that has good lexical
> sorting.
> >
> > I don't recall that there was any objection, but neither I nor anyone
> else has gotten around to working on this. It does have the problem that it
> could change the coders and impact pipeline update. I suggest that update
> is fragile enough that we should develop pipeline options that allow opt-in
> to improvements without a major version bump.
>
> It also has the difficulty that coders cannot be inferred until one
> knows all their consumers (which breaks a transform being able to
> inspect or act on the coder(s) of its input(s).
>
> Possibly, if we move to something more generic, like specifying
> element types/schemas on collections and then doing
> whole-pipeline-analysis to infer all the coders, this would be
> solvable (but potentially backwards incompatible, and fraught with
> difficulties if users (aka transform authors) specify their own coders
> (e.g. when inference fails, or as part of inference code)).
>
> Another way to look at this is that there are certain transformations
> ("upgrades") one can do to coders when they need certain properties,
> which change the encoded form but preserve the types. Upgrading a
> coder C to its length-prefixed one is one such operation. Upgrading a
> coder to a deterministic version, or a natural-order-preserving one,
> are two other possible transformations (which should be idempotent,
> may be the identity, and may be an error).
>

You pose a good problem, and a reasonable solution. Since on most (all?)
runners this coder conversion will occur in the middle of a fused stage, it
should have near-zero cost. And if it has a standard URN + payload it could
even be optimized away entirely and/or inform optimization decisions.

Ideally, there's be less of transforms acting on the coders of their input.
Today I think it is mostly to push them around when coder inference fails.
In that case, I've hypothesized that we could do elementary constraint
solving like any type inferencer does, keeping unspecified coders as
"variables" until after construction. But that's a sizable change now that
we've gone down the current path, and not enough clear benefit I suppose.
If we started from scratch I'd start with that probably.

Kenn


Re: DISCUSS: Sorted MapState API

2019-05-28 Thread Robert Bradshaw
On Fri, May 24, 2019 at 6:57 PM Kenneth Knowles  wrote:
>
> On Fri, May 24, 2019 at 9:51 AM Kenneth Knowles  wrote:
>>
>> On Fri, May 24, 2019 at 8:14 AM Reuven Lax  wrote:
>>>
>>> Some great comments!
>>>
>>> Aljoscha: absolutely this would have to be implemented by runners to be 
>>> efficient. We can of course provide a default (inefficient) implementation, 
>>> but ideally runners would provide better ones.
>>>
>>> Jan Exactly. I think MapState can be dropped or backed by this. E.g.
>>>
>>> Robert Great point about standard coders not satisfying this. That's why I 
>>> suggested that we provide a way to tag the coders that do preserve order, 
>>> and only accept those as key coders Alternatively we could present a more 
>>> limited API - e.g. only allowing a hard-coded set of types to be used as 
>>> keys - but that seems counter to the direction Beam usually goes.
>>
>>
>> I think we got it right with GroupByKey: the encoded form of a key is 
>> authoritative/portable.
>>
>> Instead of seeing the in-language type as the "real" value and the coder a 
>> way to serialize it, the portable encoded bytestring is the "real" value and 
>> the representation in a particular SDK is the responsibility of the SDK.
>>
>> This is very important, because many in-language representations, especially 
>> low-level representations, do not have the desired equality. For example, 
>> Java arrays. Coder.structuralValue(...) is required to have equality that 
>> matches the equality of the encoded form. It can be a noop if the 
>> in-language equality already matches. Or it can be a full encoding if there 
>> is not a more efficient option. I think we could add another method 
>> "lexicalValue" or add the requirement that structuralValue also sort 
>> equivalently to the wire format.
>>
>> Now, since many of our wire formats do not sort in the natural mathematical 
>> order, SDKs should help users avoid the pitfall of using these, as we do for 
>> GBK by checking "determinism" of the coder. Note that I am *not* referring 
>> to the order they would sort in any particular programming language 
>> implementation.
>
>
> Another related features request. Today we do this:
>
> (1) infer any coder for a type
> (2) crash if it is not suitable for GBK
>
> I have proposed in the past that instead we:
>
> (1) notice that a PCollection is input to a GBK
> (2) infer an appropriate coder for a type that will work with GBK
>
> This generalizes to the idea of registering multiple coders for different 
> purposes, particularly inferring a coder that has good lexical sorting.
>
> I don't recall that there was any objection, but neither I nor anyone else 
> has gotten around to working on this. It does have the problem that it could 
> change the coders and impact pipeline update. I suggest that update is 
> fragile enough that we should develop pipeline options that allow opt-in to 
> improvements without a major version bump.

It also has the difficulty that coders cannot be inferred until one
knows all their consumers (which breaks a transform being able to
inspect or act on the coder(s) of its input(s).

Possibly, if we move to something more generic, like specifying
element types/schemas on collections and then doing
whole-pipeline-analysis to infer all the coders, this would be
solvable (but potentially backwards incompatible, and fraught with
difficulties if users (aka transform authors) specify their own coders
(e.g. when inference fails, or as part of inference code)).

Another way to look at this is that there are certain transformations
("upgrades") one can do to coders when they need certain properties,
which change the encoded form but preserve the types. Upgrading a
coder C to its length-prefixed one is one such operation. Upgrading a
coder to a deterministic version, or a natural-order-preserving one,
are two other possible transformations (which should be idempotent,
may be the identity, and may be an error).


Re: DISCUSS: Sorted MapState API

2019-05-24 Thread Lukasz Cwik
In my look for, it should have said "void". instead of "key". when
explaining how to do it.

On Fri, May 24, 2019 at 11:05 AM Lukasz Cwik  wrote:

> For the API that you proposed, the map key is always "void" and the sort
> key == user key. So in my example of
> key: dummy value
> key.000: token, (0001, value4)
> key.001: token, (0010, value1), (0011, value2)
> key.01: token
> key.1: token, (1011, value3)
> you would have:
> "void": dummy value
> "void".000: token, (0001, value4)
> "void".001: token, (0010, value1), (0011, value2)
> "void".01: token
> "void".1: token, (1011, value3)
>
> Iterable> entriesUntil(K limit) translates into walking the the
> prefixes until you find a common prefix for K and then filter for values
> where they have a sort key <= K. Using the example above, to find
> entriesUntil(0010) you would:
> look for key."", miss
> look for key.0, miss
> look for key.00, miss
> look for key.000, hit, sort all contained values using secondary key,
> provide value4 to user
> look for key.001, hit, notice that 001 is a prefix of 0010 so we sort all
> contained values using secondary key, filter out value2 and provide value1
>
> void removeUntil(K limit) also translates into walking the prefixes but
> instead we will clear them when we have a "hit" with some special logic for
> when the sort key is a prefix of the key. Used the example, to
> removeUntil(0010) you would:
> look for key."", miss
> look for key.0, miss
> look for key.00, miss
> look for key.000, hit, clear
> look for key.001, hit, notice that 001 is a prefix of 0010 so we sort all
> contained values using secondary key, store in memory all values that >
> 0010, clear and append values stored in memory.
>
> On Fri, May 24, 2019 at 10:36 AM Reuven Lax  wrote:
>
>> Can you explain how fetching and deleting ranges of keys would work with
>> this data structure?
>>
>> On Fri, May 24, 2019 at 9:50 AM Lukasz Cwik  wrote:
>>
>>> Reuven, for the example, I assume that we never want to store more then
>>> 2 values at a given sort key prefix, and if we do then we will create a new
>>> longer prefix splitting up the values based upon the sort key.
>>>
>>> Tuple representation in examples below is (key, sort key, value) and .
>>> is a character outside of the alphabet which can be represented by using an
>>> escaping encoding that wraps the key + sort key encoding.
>>>
>>> To insert (key, 0010, value1), we lookup "key" + all the prefixes of
>>> 0010 finding one that is not empty. In this case its 0, so we append value
>>> to the map at key.0 ending up with (we also set the key to any dummy value
>>> to know that it it contains values):
>>> key: dummy value
>>> key."": token, (0010, value1)
>>> Now we insert (key, 0011, value2), we again lookup "key" + all the
>>> prefixes of 0010, finding "", so we append value2 to key."" ending up with:
>>> key: dummy value
>>> key."": token, (0010, value1), (0011, value2)
>>> Now we insert (key, 1011, value3), we again lookup "key" + all the
>>> prefixes of 1011 finding "" but notice that it is full, so we partition all
>>> the values into two prefixes 0 and 1. We also clear the "" prefix ending up
>>> with:
>>> key: dummy value
>>> key.0: token, (0010, value1), (0011, value2)
>>> key.1: token, (1011, value3)
>>> Now we insert (key, 0001, value4), we again lookup "key" + all the
>>> prefixes of the value finding 0 but notice that it is full, so we partition
>>> all the values into two prefixes 00 and 01 but notice this doesn't help us
>>> since 00 will be too full so we split 00 again to 000, 001. We also clear
>>> the 0 prefix ending up with:
>>> key: dummy value
>>> key.000: token, (0001, value4)
>>> key.001: token, (0010, value1), (0011, value2)
>>> key.01: token
>>> key.1: token, (1011, value3)
>>>
>>> We are effectively building a trie[1] where we only have values at the
>>> leaves and control how full each leaf can be. There are other trie
>>> representations like a radix tree that may be better.
>>>
>>> Looking up the values in sorted order for "key" would go like this:
>>> Is key set, yes
>>> look for key."", miss
>>> look for key.0, miss
>>> look for key.00, miss
>>> look for key.000, hit, sort all contained values using secondary key,
>>> provide value4 to user
>>> look for key.001, hit, sort all contained values using secondary key,
>>> provide value1 followed by value2 to user
>>> look for key.01, hit, empty, return no values to user
>>> look for key.1, hit, sort all contained values using secondary key,
>>> provide value3 to user
>>> we have walked the entire prefix space, signal end of iterable
>>>
>>> Some notes for the above:
>>> * The dummy value is used to know that the key contains values and the
>>> token is to know whether there are any values deeper in the trie so when we
>>> know when to stop searching.
>>> * If we can recalculate the sort key from the combination of the key and
>>> value, then we don't need to store it.
>>> * Keys with lots of values will perform worse then keys with 

Re: DISCUSS: Sorted MapState API

2019-05-24 Thread Lukasz Cwik
For the API that you proposed, the map key is always "void" and the sort
key == user key. So in my example of
key: dummy value
key.000: token, (0001, value4)
key.001: token, (0010, value1), (0011, value2)
key.01: token
key.1: token, (1011, value3)
you would have:
"void": dummy value
"void".000: token, (0001, value4)
"void".001: token, (0010, value1), (0011, value2)
"void".01: token
"void".1: token, (1011, value3)

Iterable> entriesUntil(K limit) translates into walking the the
prefixes until you find a common prefix for K and then filter for values
where they have a sort key <= K. Using the example above, to find
entriesUntil(0010) you would:
look for key."", miss
look for key.0, miss
look for key.00, miss
look for key.000, hit, sort all contained values using secondary key,
provide value4 to user
look for key.001, hit, notice that 001 is a prefix of 0010 so we sort all
contained values using secondary key, filter out value2 and provide value1

void removeUntil(K limit) also translates into walking the prefixes but
instead we will clear them when we have a "hit" with some special logic for
when the sort key is a prefix of the key. Used the example, to
removeUntil(0010) you would:
look for key."", miss
look for key.0, miss
look for key.00, miss
look for key.000, hit, clear
look for key.001, hit, notice that 001 is a prefix of 0010 so we sort all
contained values using secondary key, store in memory all values that >
0010, clear and append values stored in memory.

On Fri, May 24, 2019 at 10:36 AM Reuven Lax  wrote:

> Can you explain how fetching and deleting ranges of keys would work with
> this data structure?
>
> On Fri, May 24, 2019 at 9:50 AM Lukasz Cwik  wrote:
>
>> Reuven, for the example, I assume that we never want to store more then 2
>> values at a given sort key prefix, and if we do then we will create a new
>> longer prefix splitting up the values based upon the sort key.
>>
>> Tuple representation in examples below is (key, sort key, value) and . is
>> a character outside of the alphabet which can be represented by using an
>> escaping encoding that wraps the key + sort key encoding.
>>
>> To insert (key, 0010, value1), we lookup "key" + all the prefixes of 0010
>> finding one that is not empty. In this case its 0, so we append value to
>> the map at key.0 ending up with (we also set the key to any dummy value to
>> know that it it contains values):
>> key: dummy value
>> key."": token, (0010, value1)
>> Now we insert (key, 0011, value2), we again lookup "key" + all the
>> prefixes of 0010, finding "", so we append value2 to key."" ending up with:
>> key: dummy value
>> key."": token, (0010, value1), (0011, value2)
>> Now we insert (key, 1011, value3), we again lookup "key" + all the
>> prefixes of 1011 finding "" but notice that it is full, so we partition all
>> the values into two prefixes 0 and 1. We also clear the "" prefix ending up
>> with:
>> key: dummy value
>> key.0: token, (0010, value1), (0011, value2)
>> key.1: token, (1011, value3)
>> Now we insert (key, 0001, value4), we again lookup "key" + all the
>> prefixes of the value finding 0 but notice that it is full, so we partition
>> all the values into two prefixes 00 and 01 but notice this doesn't help us
>> since 00 will be too full so we split 00 again to 000, 001. We also clear
>> the 0 prefix ending up with:
>> key: dummy value
>> key.000: token, (0001, value4)
>> key.001: token, (0010, value1), (0011, value2)
>> key.01: token
>> key.1: token, (1011, value3)
>>
>> We are effectively building a trie[1] where we only have values at the
>> leaves and control how full each leaf can be. There are other trie
>> representations like a radix tree that may be better.
>>
>> Looking up the values in sorted order for "key" would go like this:
>> Is key set, yes
>> look for key."", miss
>> look for key.0, miss
>> look for key.00, miss
>> look for key.000, hit, sort all contained values using secondary key,
>> provide value4 to user
>> look for key.001, hit, sort all contained values using secondary key,
>> provide value1 followed by value2 to user
>> look for key.01, hit, empty, return no values to user
>> look for key.1, hit, sort all contained values using secondary key,
>> provide value3 to user
>> we have walked the entire prefix space, signal end of iterable
>>
>> Some notes for the above:
>> * The dummy value is used to know that the key contains values and the
>> token is to know whether there are any values deeper in the trie so when we
>> know when to stop searching.
>> * If we can recalculate the sort key from the combination of the key and
>> value, then we don't need to store it.
>> * Keys with lots of values will perform worse then keys with less values
>> since we have to look up more keys but they will be empty reads. The number
>> of misses can be controlled by how many elements we are willing to store at
>> a given node before we subdivide.
>>
>> In reality you could build a lot of structures (e.g. red black tree,
>> 

Re: DISCUSS: Sorted MapState API

2019-05-24 Thread Reuven Lax
Can you explain how fetching and deleting ranges of keys would work with
this data structure?

On Fri, May 24, 2019 at 9:50 AM Lukasz Cwik  wrote:

> Reuven, for the example, I assume that we never want to store more then 2
> values at a given sort key prefix, and if we do then we will create a new
> longer prefix splitting up the values based upon the sort key.
>
> Tuple representation in examples below is (key, sort key, value) and . is
> a character outside of the alphabet which can be represented by using an
> escaping encoding that wraps the key + sort key encoding.
>
> To insert (key, 0010, value1), we lookup "key" + all the prefixes of 0010
> finding one that is not empty. In this case its 0, so we append value to
> the map at key.0 ending up with (we also set the key to any dummy value to
> know that it it contains values):
> key: dummy value
> key."": token, (0010, value1)
> Now we insert (key, 0011, value2), we again lookup "key" + all the
> prefixes of 0010, finding "", so we append value2 to key."" ending up with:
> key: dummy value
> key."": token, (0010, value1), (0011, value2)
> Now we insert (key, 1011, value3), we again lookup "key" + all the
> prefixes of 1011 finding "" but notice that it is full, so we partition all
> the values into two prefixes 0 and 1. We also clear the "" prefix ending up
> with:
> key: dummy value
> key.0: token, (0010, value1), (0011, value2)
> key.1: token, (1011, value3)
> Now we insert (key, 0001, value4), we again lookup "key" + all the
> prefixes of the value finding 0 but notice that it is full, so we partition
> all the values into two prefixes 00 and 01 but notice this doesn't help us
> since 00 will be too full so we split 00 again to 000, 001. We also clear
> the 0 prefix ending up with:
> key: dummy value
> key.000: token, (0001, value4)
> key.001: token, (0010, value1), (0011, value2)
> key.01: token
> key.1: token, (1011, value3)
>
> We are effectively building a trie[1] where we only have values at the
> leaves and control how full each leaf can be. There are other trie
> representations like a radix tree that may be better.
>
> Looking up the values in sorted order for "key" would go like this:
> Is key set, yes
> look for key."", miss
> look for key.0, miss
> look for key.00, miss
> look for key.000, hit, sort all contained values using secondary key,
> provide value4 to user
> look for key.001, hit, sort all contained values using secondary key,
> provide value1 followed by value2 to user
> look for key.01, hit, empty, return no values to user
> look for key.1, hit, sort all contained values using secondary key,
> provide value3 to user
> we have walked the entire prefix space, signal end of iterable
>
> Some notes for the above:
> * The dummy value is used to know that the key contains values and the
> token is to know whether there are any values deeper in the trie so when we
> know when to stop searching.
> * If we can recalculate the sort key from the combination of the key and
> value, then we don't need to store it.
> * Keys with lots of values will perform worse then keys with less values
> since we have to look up more keys but they will be empty reads. The number
> of misses can be controlled by how many elements we are willing to store at
> a given node before we subdivide.
>
> In reality you could build a lot of structures (e.g. red black tree,
> binary tree) using the sort key, the issue is the cost of
> rebalancing/re-organizing the structure in map form and whether it has a
> convenient pre-order traversal for lookups.
>
>
>
> On Fri, May 24, 2019 at 8:14 AM Reuven Lax  wrote:
>
>> Some great comments!
>>
>> *Aljoscha*: absolutely this would have to be implemented by runners to
>> be efficient. We can of course provide a default (inefficient)
>> implementation, but ideally runners would provide better ones.
>>
>> *Jan* Exactly. I think MapState can be dropped or backed by this. E.g.
>>
>> *Robert* Great point about standard coders not satisfying this. That's
>> why I suggested that we provide a way to tag the coders that do preserve
>> order, and only accept those as key coders Alternatively we could present a
>> more limited API - e.g. only allowing a hard-coded set of types to be used
>> as keys - but that seems counter to the direction Beam usually goes. So
>> users will have two ways .of creating multimap state specs:
>>
>>private final StateSpec> state =
>> StateSpecs.multimap(VarLongCoder.of(), StringUtf8Coder.of());
>>
>> or
>>private final StateSpec> state =
>> StateSpecs.orderedMultimap(VarLongCoder.of(), StringUtf8Coder.of());
>>
>> The second one will validate that the key coder preserves order, and
>> fails otherwise (similar to coder determinism checking in GroupByKey). (BTW
>> we would also have versions of these functions that use coder inference to
>> "guess" the coder, but those will do the same checking)
>>
>> Also the API I proposed did support random access! We could separate out
>> OrderedBagState again 

Re: DISCUSS: Sorted MapState API

2019-05-24 Thread Kenneth Knowles
On Fri, May 24, 2019 at 9:51 AM Kenneth Knowles  wrote:

>
>
> On Fri, May 24, 2019 at 8:14 AM Reuven Lax  wrote:
>
>> Some great comments!
>>
>> *Aljoscha*: absolutely this would have to be implemented by runners to
>> be efficient. We can of course provide a default (inefficient)
>> implementation, but ideally runners would provide better ones.
>>
>> *Jan* Exactly. I think MapState can be dropped or backed by this. E.g.
>>
>> *Robert* Great point about standard coders not satisfying this. That's
>> why I suggested that we provide a way to tag the coders that do preserve
>> order, and only accept those as key coders Alternatively we could present a
>> more limited API - e.g. only allowing a hard-coded set of types to be used
>> as keys - but that seems counter to the direction Beam usually goes.
>>
>
> I think we got it right with GroupByKey: the encoded form of a key is
> authoritative/portable.
>
> Instead of seeing the in-language type as the "real" value and the coder a
> way to serialize it, the portable encoded bytestring is the "real" value
> and the representation in a particular SDK is the responsibility of the SDK.
>
> This is very important, because many in-language representations,
> especially low-level representations, do not have the desired equality. For
> example, Java arrays. Coder.structuralValue(...) is required to have
> equality that matches the equality of the encoded form. It can be a noop if
> the in-language equality already matches. Or it can be a full encoding if
> there is not a more efficient option. I think we could add another method
> "lexicalValue" or add the requirement that structuralValue also sort
> equivalently to the wire format.
>
> Now, since many of our wire formats do not sort in the natural
> mathematical order, SDKs should help users avoid the pitfall of using
> these, as we do for GBK by checking "determinism" of the coder. Note that I
> am *not* referring to the order they would sort in any particular
> programming language implementation.
>

Another related features request. Today we do this:

(1) infer any coder for a type
(2) crash if it is not suitable for GBK

I have proposed in the past that instead we:

(1) notice that a PCollection is input to a GBK
(2) infer an appropriate coder for a type that will work with GBK

This generalizes to the idea of registering multiple coders for different
purposes, particularly inferring a coder that has good lexical sorting.

I don't recall that there was any objection, but neither I nor anyone else
has gotten around to working on this. It does have the problem that it
could change the coders and impact pipeline update. I suggest that update
is fragile enough that we should develop pipeline options that allow opt-in
to improvements without a major version bump.

Kenn


>
> Kenn
>
>
>> So users will have two ways .of creating multimap state specs:
>>
>>private final StateSpec> state =
>> StateSpecs.multimap(VarLongCoder.of(), StringUtf8Coder.of());
>>
>> or
>>private final StateSpec> state =
>> StateSpecs.orderedMultimap(VarLongCoder.of(), StringUtf8Coder.of());
>>
>> The second one will validate that the key coder preserves order, and
>> fails otherwise (similar to coder determinism checking in GroupByKey). (BTW
>> we would also have versions of these functions that use coder inference to
>> "guess" the coder, but those will do the same checking)
>>
>> Also the API I proposed did support random access! We could separate out
>> OrderedBagState again if we think the use cases are fundamentally
>> different. I merged the proposal into that of MultimapState because there
>> seemed be 99% overlap.
>>
>> Reuven
>>
>> On Fri, May 24, 2019 at 6:19 AM Robert Bradshaw 
>> wrote:
>>
>>> On Fri, May 24, 2019 at 5:32 AM Reuven Lax  wrote:
>>> >
>>> > On Thu, May 23, 2019 at 1:53 PM Ahmet Altay  wrote:
>>> >>
>>> >>
>>> >>
>>> >> On Thu, May 23, 2019 at 1:38 PM Lukasz Cwik  wrote:
>>> >>>
>>> >>>
>>> >>>
>>> >>> On Thu, May 23, 2019 at 11:37 AM Rui Wang  wrote:
>>> >
>>> > A few obvious problems with this code:
>>> >   1. Removing the elements already processed from the bag requires
>>> clearing and rewriting the entire bag. This is O(n^2) in the number of
>>> input trades.
>>> 
>>>  why it's not O(2 * n) to clearing and rewriting trade state?
>>> 
>>> >
>>> > public interface SortedMultimapState extends State {
>>> >   // Add a value to the map.
>>> >   void put(K key, V value);
>>> >   // Get all values for a given key.
>>> >   ReadableState> get(K key);
>>> >  // Return all entries in the map.
>>> >   ReadableState>> allEntries();
>>> >   // Return all entries in the map with keys <= limit. returned
>>> elements are sorted by the key.
>>> >   ReadableState>> entriesUntil(K limit);
>>> >
>>> >  // Remove all values with the given key;
>>> >   void remove(K key);
>>> >  // Remove all entries in the map with keys <= limit.
>>> >   void 

Re: DISCUSS: Sorted MapState API

2019-05-24 Thread Kenneth Knowles
On Fri, May 24, 2019 at 8:14 AM Reuven Lax  wrote:

> Some great comments!
>
> *Aljoscha*: absolutely this would have to be implemented by runners to be
> efficient. We can of course provide a default (inefficient) implementation,
> but ideally runners would provide better ones.
>
> *Jan* Exactly. I think MapState can be dropped or backed by this. E.g.
>
> *Robert* Great point about standard coders not satisfying this. That's
> why I suggested that we provide a way to tag the coders that do preserve
> order, and only accept those as key coders Alternatively we could present a
> more limited API - e.g. only allowing a hard-coded set of types to be used
> as keys - but that seems counter to the direction Beam usually goes.
>

I think we got it right with GroupByKey: the encoded form of a key is
authoritative/portable.

Instead of seeing the in-language type as the "real" value and the coder a
way to serialize it, the portable encoded bytestring is the "real" value
and the representation in a particular SDK is the responsibility of the SDK.

This is very important, because many in-language representations,
especially low-level representations, do not have the desired equality. For
example, Java arrays. Coder.structuralValue(...) is required to have
equality that matches the equality of the encoded form. It can be a noop if
the in-language equality already matches. Or it can be a full encoding if
there is not a more efficient option. I think we could add another method
"lexicalValue" or add the requirement that structuralValue also sort
equivalently to the wire format.

Now, since many of our wire formats do not sort in the natural mathematical
order, SDKs should help users avoid the pitfall of using these, as we do
for GBK by checking "determinism" of the coder. Note that I am *not*
referring to the order they would sort in any particular programming
language implementation.

Kenn


> So users will have two ways .of creating multimap state specs:
>
>private final StateSpec> state =
> StateSpecs.multimap(VarLongCoder.of(), StringUtf8Coder.of());
>
> or
>private final StateSpec> state =
> StateSpecs.orderedMultimap(VarLongCoder.of(), StringUtf8Coder.of());
>
> The second one will validate that the key coder preserves order, and fails
> otherwise (similar to coder determinism checking in GroupByKey). (BTW we
> would also have versions of these functions that use coder inference to
> "guess" the coder, but those will do the same checking)
>
> Also the API I proposed did support random access! We could separate out
> OrderedBagState again if we think the use cases are fundamentally
> different. I merged the proposal into that of MultimapState because there
> seemed be 99% overlap.
>
> Reuven
>
> On Fri, May 24, 2019 at 6:19 AM Robert Bradshaw 
> wrote:
>
>> On Fri, May 24, 2019 at 5:32 AM Reuven Lax  wrote:
>> >
>> > On Thu, May 23, 2019 at 1:53 PM Ahmet Altay  wrote:
>> >>
>> >>
>> >>
>> >> On Thu, May 23, 2019 at 1:38 PM Lukasz Cwik  wrote:
>> >>>
>> >>>
>> >>>
>> >>> On Thu, May 23, 2019 at 11:37 AM Rui Wang  wrote:
>> >
>> > A few obvious problems with this code:
>> >   1. Removing the elements already processed from the bag requires
>> clearing and rewriting the entire bag. This is O(n^2) in the number of
>> input trades.
>> 
>>  why it's not O(2 * n) to clearing and rewriting trade state?
>> 
>> >
>> > public interface SortedMultimapState extends State {
>> >   // Add a value to the map.
>> >   void put(K key, V value);
>> >   // Get all values for a given key.
>> >   ReadableState> get(K key);
>> >  // Return all entries in the map.
>> >   ReadableState>> allEntries();
>> >   // Return all entries in the map with keys <= limit. returned
>> elements are sorted by the key.
>> >   ReadableState>> entriesUntil(K limit);
>> >
>> >  // Remove all values with the given key;
>> >   void remove(K key);
>> >  // Remove all entries in the map with keys <= limit.
>> >   void removeUntil(K limit);
>> 
>>  Will removeUntilExcl(K limit) also useful? It will remove all
>> entries in the map with keys < limit.
>> 
>> >
>> > Runners will sort based on the encoded value of the key. In order
>> to make this easier for users, I propose that we introduce a new tag on
>> Coders PreservesOrder. A Coder that contains this tag guarantees that the
>> encoded value preserves the same ordering as the base Java type.
>> 
>> 
>>  Could you clarify what is  "encoded value preserves the same
>> ordering as the base Java type"?
>> >>>
>> >>>
>> >>> Lets say A and B represent two different instances of the same Java
>> type like a double, then A < B (using the languages comparison operator)
>> iff encode(A) < encode(B) (note the encoded versions are compared
>> lexicographically)
>> >>
>> >>
>> >> Since coders are shared across SDKs, do we expect A < B iff e(A) <
>> e(P) property to hold for all languages we support? 

Re: DISCUSS: Sorted MapState API

2019-05-24 Thread Reuven Lax
On Fri, May 24, 2019 at 9:36 AM Rui Wang  wrote:

>
>> *Jan* Exactly. I think MapState can be dropped or backed by this. E.g.
>>
>>>
>>> Regarding to performance, I have a concern to drop MapSate or use
> SortedMapState to back it. Although SortedMapState provide API to remove a
> single key, I would imagine its implementation in runners will different
> from a map state implementation without order requirement on map keys,
> which might give different performance on single key access. For example,
> SortedMapSate's get single key API might be implemented by allEntriesInc(k)
> - allEntriesExcl(k).
>

It's a good question. In the Dataflow runner it would likely be the same
implementation. If that is not true for other runners, then it's worth
keeping both around.


>
> Also, I would assume MapState is implemented by some runners already. So
> will dropping MapState API means runners will change accordingly? Do I
> misunderstand?
>
>
> -Rui
>
>


Re: DISCUSS: Sorted MapState API

2019-05-24 Thread Lukasz Cwik
Reuven, for the example, I assume that we never want to store more then 2
values at a given sort key prefix, and if we do then we will create a new
longer prefix splitting up the values based upon the sort key.

Tuple representation in examples below is (key, sort key, value) and . is a
character outside of the alphabet which can be represented by using an
escaping encoding that wraps the key + sort key encoding.

To insert (key, 0010, value1), we lookup "key" + all the prefixes of 0010
finding one that is not empty. In this case its 0, so we append value to
the map at key.0 ending up with (we also set the key to any dummy value to
know that it it contains values):
key: dummy value
key."": token, (0010, value1)
Now we insert (key, 0011, value2), we again lookup "key" + all the prefixes
of 0010, finding "", so we append value2 to key."" ending up with:
key: dummy value
key."": token, (0010, value1), (0011, value2)
Now we insert (key, 1011, value3), we again lookup "key" + all the prefixes
of 1011 finding "" but notice that it is full, so we partition all the
values into two prefixes 0 and 1. We also clear the "" prefix ending up
with:
key: dummy value
key.0: token, (0010, value1), (0011, value2)
key.1: token, (1011, value3)
Now we insert (key, 0001, value4), we again lookup "key" + all the prefixes
of the value finding 0 but notice that it is full, so we partition all the
values into two prefixes 00 and 01 but notice this doesn't help us since 00
will be too full so we split 00 again to 000, 001. We also clear the 0
prefix ending up with:
key: dummy value
key.000: token, (0001, value4)
key.001: token, (0010, value1), (0011, value2)
key.01: token
key.1: token, (1011, value3)

We are effectively building a trie[1] where we only have values at the
leaves and control how full each leaf can be. There are other trie
representations like a radix tree that may be better.

Looking up the values in sorted order for "key" would go like this:
Is key set, yes
look for key."", miss
look for key.0, miss
look for key.00, miss
look for key.000, hit, sort all contained values using secondary key,
provide value4 to user
look for key.001, hit, sort all contained values using secondary key,
provide value1 followed by value2 to user
look for key.01, hit, empty, return no values to user
look for key.1, hit, sort all contained values using secondary key, provide
value3 to user
we have walked the entire prefix space, signal end of iterable

Some notes for the above:
* The dummy value is used to know that the key contains values and the
token is to know whether there are any values deeper in the trie so when we
know when to stop searching.
* If we can recalculate the sort key from the combination of the key and
value, then we don't need to store it.
* Keys with lots of values will perform worse then keys with less values
since we have to look up more keys but they will be empty reads. The number
of misses can be controlled by how many elements we are willing to store at
a given node before we subdivide.

In reality you could build a lot of structures (e.g. red black tree, binary
tree) using the sort key, the issue is the cost of
rebalancing/re-organizing the structure in map form and whether it has a
convenient pre-order traversal for lookups.



On Fri, May 24, 2019 at 8:14 AM Reuven Lax  wrote:

> Some great comments!
>
> *Aljoscha*: absolutely this would have to be implemented by runners to be
> efficient. We can of course provide a default (inefficient) implementation,
> but ideally runners would provide better ones.
>
> *Jan* Exactly. I think MapState can be dropped or backed by this. E.g.
>
> *Robert* Great point about standard coders not satisfying this. That's
> why I suggested that we provide a way to tag the coders that do preserve
> order, and only accept those as key coders Alternatively we could present a
> more limited API - e.g. only allowing a hard-coded set of types to be used
> as keys - but that seems counter to the direction Beam usually goes. So
> users will have two ways .of creating multimap state specs:
>
>private final StateSpec> state =
> StateSpecs.multimap(VarLongCoder.of(), StringUtf8Coder.of());
>
> or
>private final StateSpec> state =
> StateSpecs.orderedMultimap(VarLongCoder.of(), StringUtf8Coder.of());
>
> The second one will validate that the key coder preserves order, and fails
> otherwise (similar to coder determinism checking in GroupByKey). (BTW we
> would also have versions of these functions that use coder inference to
> "guess" the coder, but those will do the same checking)
>
> Also the API I proposed did support random access! We could separate out
> OrderedBagState again if we think the use cases are fundamentally
> different. I merged the proposal into that of MultimapState because there
> seemed be 99% overlap.
>
> Reuven
>
> On Fri, May 24, 2019 at 6:19 AM Robert Bradshaw 
> wrote:
>
>> On Fri, May 24, 2019 at 5:32 AM Reuven Lax  wrote:
>> >
>> > On Thu, May 23, 2019 at 1:53 PM 

Re: DISCUSS: Sorted MapState API

2019-05-24 Thread Rui Wang
>
>
> *Jan* Exactly. I think MapState can be dropped or backed by this. E.g.
>
>>
>> Regarding to performance, I have a concern to drop MapSate or use
SortedMapState to back it. Although SortedMapState provide API to remove a
single key, I would imagine its implementation in runners will different
from a map state implementation without order requirement on map keys,
which might give different performance on single key access. For example,
SortedMapSate's get single key API might be implemented by allEntriesInc(k)
- allEntriesExcl(k).

Also, I would assume MapState is implemented by some runners already. So
will dropping MapState API means runners will change accordingly? Do I
misunderstand?


-Rui


Re: DISCUSS: Sorted MapState API

2019-05-24 Thread Reuven Lax
Some great comments!

*Aljoscha*: absolutely this would have to be implemented by runners to be
efficient. We can of course provide a default (inefficient) implementation,
but ideally runners would provide better ones.

*Jan* Exactly. I think MapState can be dropped or backed by this. E.g.

*Robert* Great point about standard coders not satisfying this. That's why
I suggested that we provide a way to tag the coders that do preserve order,
and only accept those as key coders Alternatively we could present a more
limited API - e.g. only allowing a hard-coded set of types to be used as
keys - but that seems counter to the direction Beam usually goes. So users
will have two ways .of creating multimap state specs:

   private final StateSpec> state =
StateSpecs.multimap(VarLongCoder.of(), StringUtf8Coder.of());

or
   private final StateSpec> state =
StateSpecs.orderedMultimap(VarLongCoder.of(), StringUtf8Coder.of());

The second one will validate that the key coder preserves order, and fails
otherwise (similar to coder determinism checking in GroupByKey). (BTW we
would also have versions of these functions that use coder inference to
"guess" the coder, but those will do the same checking)

Also the API I proposed did support random access! We could separate out
OrderedBagState again if we think the use cases are fundamentally
different. I merged the proposal into that of MultimapState because there
seemed be 99% overlap.

Reuven

On Fri, May 24, 2019 at 6:19 AM Robert Bradshaw  wrote:

> On Fri, May 24, 2019 at 5:32 AM Reuven Lax  wrote:
> >
> > On Thu, May 23, 2019 at 1:53 PM Ahmet Altay  wrote:
> >>
> >>
> >>
> >> On Thu, May 23, 2019 at 1:38 PM Lukasz Cwik  wrote:
> >>>
> >>>
> >>>
> >>> On Thu, May 23, 2019 at 11:37 AM Rui Wang  wrote:
> >
> > A few obvious problems with this code:
> >   1. Removing the elements already processed from the bag requires
> clearing and rewriting the entire bag. This is O(n^2) in the number of
> input trades.
> 
>  why it's not O(2 * n) to clearing and rewriting trade state?
> 
> >
> > public interface SortedMultimapState extends State {
> >   // Add a value to the map.
> >   void put(K key, V value);
> >   // Get all values for a given key.
> >   ReadableState> get(K key);
> >  // Return all entries in the map.
> >   ReadableState>> allEntries();
> >   // Return all entries in the map with keys <= limit. returned
> elements are sorted by the key.
> >   ReadableState>> entriesUntil(K limit);
> >
> >  // Remove all values with the given key;
> >   void remove(K key);
> >  // Remove all entries in the map with keys <= limit.
> >   void removeUntil(K limit);
> 
>  Will removeUntilExcl(K limit) also useful? It will remove all entries
> in the map with keys < limit.
> 
> >
> > Runners will sort based on the encoded value of the key. In order to
> make this easier for users, I propose that we introduce a new tag on Coders
> PreservesOrder. A Coder that contains this tag guarantees that the encoded
> value preserves the same ordering as the base Java type.
> 
> 
>  Could you clarify what is  "encoded value preserves the same ordering
> as the base Java type"?
> >>>
> >>>
> >>> Lets say A and B represent two different instances of the same Java
> type like a double, then A < B (using the languages comparison operator)
> iff encode(A) < encode(B) (note the encoded versions are compared
> lexicographically)
> >>
> >>
> >> Since coders are shared across SDKs, do we expect A < B iff e(A) < e(P)
> property to hold for all languages we support? What happens A, B sort
> differently in different languages?
> >
> >
> > That would have to be the property of the coder (which means that this
> property probably needs to be represented in the portability representation
> of the coder). I imagine the common use cases will be for simple coders
> like int, long, string, etc., which are likely to sort the same in most
> languages.
>
> The standard coders for both double and integral types do not respect
> the natural ordering (consider negative values). KV coders violate the
> "natural" lexicographic ordering on components as well. I think
> implicitly sorting on encoded value would yield many surprises. (The
> state, of course, could take a order-preserving, bytes
> (string?)-producing callable as a parameter of course). (As for
> naming, I'd probably call this OrderedBagState or something like
> that...rather than Map which tends to imply random access.)
>


Re: DISCUSS: Sorted MapState API

2019-05-24 Thread Robert Bradshaw
On Fri, May 24, 2019 at 5:32 AM Reuven Lax  wrote:
>
> On Thu, May 23, 2019 at 1:53 PM Ahmet Altay  wrote:
>>
>>
>>
>> On Thu, May 23, 2019 at 1:38 PM Lukasz Cwik  wrote:
>>>
>>>
>>>
>>> On Thu, May 23, 2019 at 11:37 AM Rui Wang  wrote:
>
> A few obvious problems with this code:
>   1. Removing the elements already processed from the bag requires 
> clearing and rewriting the entire bag. This is O(n^2) in the number of 
> input trades.

 why it's not O(2 * n) to clearing and rewriting trade state?

>
> public interface SortedMultimapState extends State {
>   // Add a value to the map.
>   void put(K key, V value);
>   // Get all values for a given key.
>   ReadableState> get(K key);
>  // Return all entries in the map.
>   ReadableState>> allEntries();
>   // Return all entries in the map with keys <= limit. returned elements 
> are sorted by the key.
>   ReadableState>> entriesUntil(K limit);
>
>  // Remove all values with the given key;
>   void remove(K key);
>  // Remove all entries in the map with keys <= limit.
>   void removeUntil(K limit);

 Will removeUntilExcl(K limit) also useful? It will remove all entries in 
 the map with keys < limit.

>
> Runners will sort based on the encoded value of the key. In order to make 
> this easier for users, I propose that we introduce a new tag on Coders 
> PreservesOrder. A Coder that contains this tag guarantees that the 
> encoded value preserves the same ordering as the base Java type.


 Could you clarify what is  "encoded value preserves the same ordering as 
 the base Java type"?
>>>
>>>
>>> Lets say A and B represent two different instances of the same Java type 
>>> like a double, then A < B (using the languages comparison operator) iff 
>>> encode(A) < encode(B) (note the encoded versions are compared 
>>> lexicographically)
>>
>>
>> Since coders are shared across SDKs, do we expect A < B iff e(A) < e(P) 
>> property to hold for all languages we support? What happens A, B sort 
>> differently in different languages?
>
>
> That would have to be the property of the coder (which means that this 
> property probably needs to be represented in the portability representation 
> of the coder). I imagine the common use cases will be for simple coders like 
> int, long, string, etc., which are likely to sort the same in most languages.

The standard coders for both double and integral types do not respect
the natural ordering (consider negative values). KV coders violate the
"natural" lexicographic ordering on components as well. I think
implicitly sorting on encoded value would yield many surprises. (The
state, of course, could take a order-preserving, bytes
(string?)-producing callable as a parameter of course). (As for
naming, I'd probably call this OrderedBagState or something like
that...rather than Map which tends to imply random access.)


Re: DISCUSS: Sorted MapState API

2019-05-24 Thread Jan Lukavský

Hi, absolutely +1 to add this to the model, but does this imply that
MapState can be dropped (or backed by this)? It can have different insert or
delete time complexity (O(1)) instead of O(logn).

Jan

-- Původní e-mail --
Od: Aljoscha Krettek 
Komu: dev@beam.apache.org
Datum: 24. 5. 2019 10:56:45
Předmět: Re: DISCUSS: Sorted MapState API
"
This is quite interesting! The Flink Table API (relational and SQL) has an
implementation for the type of join you mention in the example. We call it
Temporal Table Join, and it works on something we call Temporal Tables: 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/
temporal_tables.html
(https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/temporal_tables.html)



The implementation would have benefited from something like a Sorted
MapState and we actually discussed adding such a state type during
implementation. You can still see that in the TODOs here, actually: https://
github.com/apache/flink/blob/0ab1549f52f1f544e8492757c6b0d562bf50a061/flink-
table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/
join/TemporalRowtimeJoin.scala#L95
(https://github.com/apache/flink/blob/0ab1549f52f1f544e8492757c6b0d562bf50a061/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala#L95)




So I’m +1 for this, but someone would have to implement that for the
different Runners as well. 




Aljoscha



"
On 24. May 2019, at 05:32, Reuven Lax mailto:re...@google.com)> wrote:






On Thu, May 23, 2019 at 1:53 PM Ahmet Altay mailto:al...@google.com)> wrote:

"






On Thu, May 23, 2019 at 1:38 PM Lukasz Cwik mailto:lc...@google.com)> wrote:

"






On Thu, May 23, 2019 at 11:37 AM Rui Wang mailto:ruw...@google.com)> wrote:

"

"

A few obvious problems with this code:


  1. Removing the elements already processed from the bag requires clearing
and rewriting the entire bag. This is O(n^2) in the number of input trades.

"
why it's not O(2 * n) to clearing and rewriting trade state?



"




public interface SortedMultimapState extends State {

  // Add a value to the map.

  void put(K key, V value);

  // Get all values for a given key. 

  ReadableState> get(K key);

 // Return all entries in the map.

  ReadableState>> allEntries();

  // Return all entries in the map with keys <= limit. returned elements are
sorted by the key.


  ReadableState>> entriesUntil(K limit); 


""


 // Remove all values with the given key;

  void remove(K key);

 // Remove all entries in the map with keys <= limit.

  void removeUntil(K limit);


"
Will removeUntilExcl(K limit) also useful? It will remove all entries in the
map with keys < limit.

 
"

Runners will sort based on the encoded value of the key. In order to make 
this easier for users, I propose that we introduce a new tag on Coders 
PreservesOrder. A Coder that contains this tag guarantees that the encoded
value preserves the same ordering as the base Java type.

"



Could you clarify what is  "encoded value preserves the same ordering as the
base Java type"?






"



Lets say A and B represent two different instances of the same Java type 
like a double, then A < B (using the languages comparison operator) iff 
encode(A) < encode(B) (note the encoded versions are compared
lexicographically)


"



Since coders are shared across SDKs, do we expect A < B iff e(A) < e(P) 
property to hold for all languages we support? What happens A, B sort
differently in different languages? 


"



That would have to be the property of the coder (which means that this
property probably needs to be represented in the portability representation
of the coder). I imagine the common use cases will be for simple coders like
int, long, string, etc., which are likely to sort the same in most
languages.

 
"

"


 

 
"





-Rui


"

"

"

"



"

Re: DISCUSS: Sorted MapState API

2019-05-24 Thread Aljoscha Krettek
This is quite interesting! The Flink Table API (relational and SQL) has an 
implementation for the type of join you mention in the example. We call it 
Temporal Table Join, and it works on something we call Temporal Tables: 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/temporal_tables.html
 


The implementation would have benefited from something like a Sorted MapState 
and we actually discussed adding such a state type during implementation. You 
can still see that in the TODOs here, actually: 
https://github.com/apache/flink/blob/0ab1549f52f1f544e8492757c6b0d562bf50a061/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/join/TemporalRowtimeJoin.scala#L95
 


So I’m +1 for this, but someone would have to implement that for the different 
Runners as well. 

Aljoscha

> On 24. May 2019, at 05:32, Reuven Lax  wrote:
> 
> 
> 
> On Thu, May 23, 2019 at 1:53 PM Ahmet Altay  > wrote:
> 
> 
> On Thu, May 23, 2019 at 1:38 PM Lukasz Cwik  > wrote:
> 
> 
> On Thu, May 23, 2019 at 11:37 AM Rui Wang  > wrote:
> A few obvious problems with this code:
>   1. Removing the elements already processed from the bag requires clearing 
> and rewriting the entire bag. This is O(n^2) in the number of input trades.
> why it's not O(2 * n) to clearing and rewriting trade state?
> 
> 
> public interface SortedMultimapState extends State {
>   // Add a value to the map.
>   void put(K key, V value);
>   // Get all values for a given key. 
>   ReadableState> get(K key);
>  // Return all entries in the map.
>   ReadableState>> allEntries();
>   // Return all entries in the map with keys <= limit. returned elements are 
> sorted by the key.
>   ReadableState>> entriesUntil(K limit); 
>  // Remove all values with the given key;
>   void remove(K key);
>  // Remove all entries in the map with keys <= limit.
>   void removeUntil(K limit);
> Will removeUntilExcl(K limit) also useful? It will remove all entries in the 
> map with keys < limit.
>  
> Runners will sort based on the encoded value of the key. In order to make 
> this easier for users, I propose that we introduce a new tag on Coders 
> PreservesOrder. A Coder that contains this tag guarantees that the encoded 
> value preserves the same ordering as the base Java type.
> 
> Could you clarify what is  "encoded value preserves the same ordering as the 
> base Java type"?
> 
> Lets say A and B represent two different instances of the same Java type like 
> a double, then A < B (using the languages comparison operator) iff encode(A) 
> < encode(B) (note the encoded versions are compared lexicographically)
> 
> Since coders are shared across SDKs, do we expect A < B iff e(A) < e(P) 
> property to hold for all languages we support? What happens A, B sort 
> differently in different languages? 
> 
> That would have to be the property of the coder (which means that this 
> property probably needs to be represented in the portability representation 
> of the coder). I imagine the common use cases will be for simple coders like 
> int, long, string, etc., which are likely to sort the same in most languages.
>  
>  
>  
> 
> -Rui



Re: DISCUSS: Sorted MapState API

2019-05-23 Thread Reuven Lax
On Thu, May 23, 2019 at 1:53 PM Ahmet Altay  wrote:

>
>
> On Thu, May 23, 2019 at 1:38 PM Lukasz Cwik  wrote:
>
>>
>>
>> On Thu, May 23, 2019 at 11:37 AM Rui Wang  wrote:
>>
>>> A few obvious problems with this code:
   1. Removing the elements already processed from the bag requires
 clearing and rewriting the entire bag. This is O(n^2) in the number of
 input trades.

>>> why it's not O(2 * n) to clearing and rewriting trade state?
>>>
>>>
 public interface SortedMultimapState extends State {
   // Add a value to the map.
   void put(K key, V value);
   // Get all values for a given key.
   ReadableState> get(K key);
  // Return all entries in the map.
   ReadableState>> allEntries();
   // Return all entries in the map with keys <= limit. returned
 elements are sorted by the key.
   ReadableState>> entriesUntil(K limit);

>>>  // Remove all values with the given key;
   void remove(K key);
  // Remove all entries in the map with keys <= limit.
   void removeUntil(K limit);

>>> Will removeUntilExcl(K limit) also useful? It will remove all entries in
>>> the map with keys < limit.
>>>
>>>
 Runners will sort based on the encoded value of the key. In order to
 make this easier for users, I propose that we introduce a new tag on Coders
 *PreservesOrder*. A Coder that contains this tag guarantees that the
 encoded value preserves the same ordering as the base Java type.

>>>
>>> Could you clarify what is  "encoded value preserves the same ordering as
>>> the base Java type"?
>>>
>>
>> Lets say A and B represent two different instances of the same Java type
>> like a double, then A < B (using the languages comparison operator) iff
>> encode(A) < encode(B) (note the encoded versions are compared
>> lexicographically)
>>
>
> Since coders are shared across SDKs, do we expect A < B iff e(A) < e(P)
> property to hold for all languages we support? What happens A, B sort
> differently in different languages?
>

That would have to be the property of the coder (which means that this
property probably needs to be represented in the portability representation
of the coder). I imagine the common use cases will be for simple coders
like int, long, string, etc., which are likely to sort the same in most
languages.


>
>>
>>
>>>
>>> -Rui
>>>
>>


Re: DISCUSS: Sorted MapState API

2019-05-23 Thread Reuven Lax
On Thu, May 23, 2019 at 11:23 AM Lukasz Cwik  wrote:

> I would suggest that we drop MapState and instead support MultimapState
> without ordering as a first pass and potentially add ordering later.
>

While I agree that MultimapState is useful on its own, for these
aggregation use cases ordering is actually the important factor. I
considered suggesting a SortedBagState, but that just ends up looking like
MultimapState.

Are you suggesting this as an implementation strategy, that we implement
MultimapState first and then add ordering? If so I agree, however in that
case the community should still discuss the ordered state now, as we'll
likely want to add ordering shortly thereafter.

Agree that we don't need both MapState and MultiMap state. We can also
alternatively reimplement MapState as a simple wrapper on top of
MultimapState.


> Inside an SDK we would be able to build a "sorted" multimap state that
> uses a prefix of the key and implement radix based sorting. Any time a
> prefix has too many elements, you rebalance and split it into 2 and use a
> slightly longer prefix. Similarly, removing elements makes prefixes merge
> and become shorter.
>

Can you explain this more? How would this allow us to have a sorted state
without having to read the entire state into memory?


>
>
>
>
> On Thu, May 23, 2019 at 10:36 AM Reuven Lax  wrote:
>
>> Beam's state API is intended to be useful as an alternative aggregation
>> method. Stateful ParDos can aggregate elements into state and set timers to
>> read the state.  Currently Beam's state API supports two ways of storing
>> collections of elements: BagState and MapState. BagState is the one most
>> commonly for these aggregations, likely for the practical reason that it's
>> the only one that has a scalable implementation on several runners.
>>
>> There are two major limitations of bag state:
>> * It is immutable: To delete an element from BagState, you must read and
>> rewrite the entire bag.
>> * It is unsorted: Almost all use cases of BagState need the element in
>> some order (usually timestamp order). Since BagState is unsorted, every
>> time it's read it must be read in full and sorted in the ParDo.
>>
>> To make this concrete, consider a ParDo that does timeseries joins. There
>> are two types of input events: trades and quotes. A quote tells the
>> current price of a stock. The goal of the ParDo is to pair each stock's
>> trades with the closest quote that proceeds it in time.
>>
>> A simple implementation would look as follows (pseudocode):
>>
>> void process(@Element Event event, @Timestamp timestamp) {
>>   if (event.type() == TRADE) {
>> tradesBag.add(event.quote());
>> if (timestamp < existingTimerTime) {
>>   setTimer(processTradeTimer, timestamp);
>>}
>>   } else {
>> quotesBag.add(event.trade());
>>   }
>> }
>>
>> void onTimer(@Timestamp timestamp) {
>>   List quotes = sort(quotesBag.get());
>>   List trades = sort(tradesBag.get());
>>   for (Trade trade : trades) {
>> if (trade.timestamp() > timestamp) break;
>> Quote quote = findClosestQuote(quotes, trade.timestamp);
>> // Output KV
>>   }
>>
>>   List newTrades = /* remaining trades. */
>>   tradesBag.clear();
>>   tradesBag.forEach(BagState::add);
>> }
>>
>> A few obvious problems with this code:
>>   1. Removing the elements already processed from the bag requires
>> clearing and rewriting the entire bag. This is O(n^2) in the number of
>> input trades.
>>   2. Both bags need to be fetched and sorted on every timer. This is
>> O(n^2*logn) in the size of the input PCollections.
>>   3. BagState is designed so that the input iterables can be large, the
>> entire BagState shouldn't have to fit in memory. However this stops working
>> when you need to sort it, because we must page the entire BagState into
>> memory in order to sort it.
>>
>> Some of these problems can be alleviated with separate GC, caching of
>> BagState fetches, etc. However this results in increasingly complicated
>> code, and does not solve all the problems above.
>>
>> I'd like to propose a new state type to solve the above problems:
>> SortedMultimapState. SortedMultimapState could have the following signature:
>>
>> public interface SortedMultimapState extends State {
>>   // Add a value to the map.
>>   void put(K key, V value);
>>   // Get all values for a given key.
>>   ReadableState> get(K key);
>>  // Return all entries in the map.
>>   ReadableState>> allEntries();
>>   // Return all entries in the map with keys <= limit. returned elements
>> are sorted by the key.
>>   ReadableState>> entriesUntil(K limit);
>>  // Remove all values with the given key;
>>   void remove(K key);
>>  // Remove all entries in the map with keys <= limit.
>>   void removeUntil(K limit);
>> }
>>
>> Runners will sort based on the encoded value of the key. In order to make
>> this easier for users, I propose that we introduce a new tag on Coders
>> *PreservesOrder*. A Coder that contains this tag guarantees 

Re: DISCUSS: Sorted MapState API

2019-05-23 Thread Reza Rokni
+100 As someone who has just spent a lot of time coding all the "GC,
caching of BagState fetches, etc" this would make life a lot easier!

Its also generally valuable for a lot of timeseries work.

On Fri, 24 May 2019 at 04:59, Lukasz Cwik  wrote:

>
>
> On Thu, May 23, 2019 at 1:53 PM Ahmet Altay  wrote:
>
>>
>>
>> On Thu, May 23, 2019 at 1:38 PM Lukasz Cwik  wrote:
>>
>>>
>>>
>>> On Thu, May 23, 2019 at 11:37 AM Rui Wang  wrote:
>>>
 A few obvious problems with this code:
>   1. Removing the elements already processed from the bag requires
> clearing and rewriting the entire bag. This is O(n^2) in the number of
> input trades.
>
 why it's not O(2 * n) to clearing and rewriting trade state?


> public interface SortedMultimapState extends State {
>   // Add a value to the map.
>   void put(K key, V value);
>   // Get all values for a given key.
>   ReadableState> get(K key);
>  // Return all entries in the map.
>   ReadableState>> allEntries();
>   // Return all entries in the map with keys <= limit. returned
> elements are sorted by the key.
>   ReadableState>> entriesUntil(K limit);
>
  // Remove all values with the given key;
>   void remove(K key);
>  // Remove all entries in the map with keys <= limit.
>   void removeUntil(K limit);
>
 Will removeUntilExcl(K limit) also useful? It will remove all entries
 in the map with keys < limit.


> Runners will sort based on the encoded value of the key. In order to
> make this easier for users, I propose that we introduce a new tag on 
> Coders
> *PreservesOrder*. A Coder that contains this tag guarantees that the
> encoded value preserves the same ordering as the base Java type.
>

 Could you clarify what is  "encoded value preserves the same ordering
 as the base Java type"?

>>>
>>> Lets say A and B represent two different instances of the same Java type
>>> like a double, then A < B (using the languages comparison operator) iff
>>> encode(A) < encode(B) (note the encoded versions are compared
>>> lexicographically)
>>>
>>
>> Since coders are shared across SDKs, do we expect A < B iff e(A) < e(P)
>> property to hold for all languages we support? What happens A, B sort
>> differently in different languages?
>>
>
> I assume it would need to mean that the property holds across all
> languages or the language has to use a specific wrapper type which honors
> the sorted order within that language. It is likely that the runner is
> doing the sorting and the runner may or may not be written in the same
> language the SDK is executing in.
>
>
>>
>>>
>>>

 -Rui

>>>

-- 

This email may be confidential and privileged. If you received this
communication by mistake, please don't forward it to anyone else, please
erase all copies and attachments, and please let me know that it has gone
to the wrong person.

The above terms reflect a potential business arrangement, are provided
solely as a basis for further discussion, and are not intended to be and do
not constitute a legally binding obligation. No legally binding obligations
will be created, implied, or inferred until an agreement in final form is
executed in writing by all parties involved.


Re: DISCUSS: Sorted MapState API

2019-05-23 Thread Lukasz Cwik
On Thu, May 23, 2019 at 1:53 PM Ahmet Altay  wrote:

>
>
> On Thu, May 23, 2019 at 1:38 PM Lukasz Cwik  wrote:
>
>>
>>
>> On Thu, May 23, 2019 at 11:37 AM Rui Wang  wrote:
>>
>>> A few obvious problems with this code:
   1. Removing the elements already processed from the bag requires
 clearing and rewriting the entire bag. This is O(n^2) in the number of
 input trades.

>>> why it's not O(2 * n) to clearing and rewriting trade state?
>>>
>>>
 public interface SortedMultimapState extends State {
   // Add a value to the map.
   void put(K key, V value);
   // Get all values for a given key.
   ReadableState> get(K key);
  // Return all entries in the map.
   ReadableState>> allEntries();
   // Return all entries in the map with keys <= limit. returned
 elements are sorted by the key.
   ReadableState>> entriesUntil(K limit);

>>>  // Remove all values with the given key;
   void remove(K key);
  // Remove all entries in the map with keys <= limit.
   void removeUntil(K limit);

>>> Will removeUntilExcl(K limit) also useful? It will remove all entries in
>>> the map with keys < limit.
>>>
>>>
 Runners will sort based on the encoded value of the key. In order to
 make this easier for users, I propose that we introduce a new tag on Coders
 *PreservesOrder*. A Coder that contains this tag guarantees that the
 encoded value preserves the same ordering as the base Java type.

>>>
>>> Could you clarify what is  "encoded value preserves the same ordering as
>>> the base Java type"?
>>>
>>
>> Lets say A and B represent two different instances of the same Java type
>> like a double, then A < B (using the languages comparison operator) iff
>> encode(A) < encode(B) (note the encoded versions are compared
>> lexicographically)
>>
>
> Since coders are shared across SDKs, do we expect A < B iff e(A) < e(P)
> property to hold for all languages we support? What happens A, B sort
> differently in different languages?
>

I assume it would need to mean that the property holds across all languages
or the language has to use a specific wrapper type which honors the sorted
order within that language. It is likely that the runner is doing the
sorting and the runner may or may not be written in the same language the
SDK is executing in.


>
>>
>>
>>>
>>> -Rui
>>>
>>


Re: DISCUSS: Sorted MapState API

2019-05-23 Thread Ahmet Altay
On Thu, May 23, 2019 at 1:38 PM Lukasz Cwik  wrote:

>
>
> On Thu, May 23, 2019 at 11:37 AM Rui Wang  wrote:
>
>> A few obvious problems with this code:
>>>   1. Removing the elements already processed from the bag requires
>>> clearing and rewriting the entire bag. This is O(n^2) in the number of
>>> input trades.
>>>
>> why it's not O(2 * n) to clearing and rewriting trade state?
>>
>>
>>> public interface SortedMultimapState extends State {
>>>   // Add a value to the map.
>>>   void put(K key, V value);
>>>   // Get all values for a given key.
>>>   ReadableState> get(K key);
>>>  // Return all entries in the map.
>>>   ReadableState>> allEntries();
>>>   // Return all entries in the map with keys <= limit. returned elements
>>> are sorted by the key.
>>>   ReadableState>> entriesUntil(K limit);
>>>
>>  // Remove all values with the given key;
>>>   void remove(K key);
>>>  // Remove all entries in the map with keys <= limit.
>>>   void removeUntil(K limit);
>>>
>> Will removeUntilExcl(K limit) also useful? It will remove all entries in
>> the map with keys < limit.
>>
>>
>>> Runners will sort based on the encoded value of the key. In order to
>>> make this easier for users, I propose that we introduce a new tag on Coders
>>> *PreservesOrder*. A Coder that contains this tag guarantees that the
>>> encoded value preserves the same ordering as the base Java type.
>>>
>>
>> Could you clarify what is  "encoded value preserves the same ordering as
>> the base Java type"?
>>
>
> Lets say A and B represent two different instances of the same Java type
> like a double, then A < B (using the languages comparison operator) iff
> encode(A) < encode(B) (note the encoded versions are compared
> lexicographically)
>

Since coders are shared across SDKs, do we expect A < B iff e(A) < e(P)
property to hold for all languages we support? What happens A, B sort
differently in different languages?

>
>
>
>>
>> -Rui
>>
>


Re: DISCUSS: Sorted MapState API

2019-05-23 Thread Lukasz Cwik
On Thu, May 23, 2019 at 11:37 AM Rui Wang  wrote:

> A few obvious problems with this code:
>>   1. Removing the elements already processed from the bag requires
>> clearing and rewriting the entire bag. This is O(n^2) in the number of
>> input trades.
>>
> why it's not O(2 * n) to clearing and rewriting trade state?
>
>
>> public interface SortedMultimapState extends State {
>>   // Add a value to the map.
>>   void put(K key, V value);
>>   // Get all values for a given key.
>>   ReadableState> get(K key);
>>  // Return all entries in the map.
>>   ReadableState>> allEntries();
>>   // Return all entries in the map with keys <= limit. returned elements
>> are sorted by the key.
>>   ReadableState>> entriesUntil(K limit);
>>
>  // Remove all values with the given key;
>>   void remove(K key);
>>  // Remove all entries in the map with keys <= limit.
>>   void removeUntil(K limit);
>>
> Will removeUntilExcl(K limit) also useful? It will remove all entries in
> the map with keys < limit.
>
>
>> Runners will sort based on the encoded value of the key. In order to make
>> this easier for users, I propose that we introduce a new tag on Coders
>> *PreservesOrder*. A Coder that contains this tag guarantees that the
>> encoded value preserves the same ordering as the base Java type.
>>
>
> Could you clarify what is  "encoded value preserves the same ordering as
> the base Java type"?
>

Lets say A and B represent two different instances of the same Java type
like a double, then A < B (using the languages comparison operator) iff
encode(A) < encode(B) (note the encoded versions are compared
lexicographically)



>
> -Rui
>


Re: DISCUSS: Sorted MapState API

2019-05-23 Thread Rui Wang
>
> A few obvious problems with this code:
>   1. Removing the elements already processed from the bag requires
> clearing and rewriting the entire bag. This is O(n^2) in the number of
> input trades.
>
why it's not O(2 * n) to clearing and rewriting trade state?


> public interface SortedMultimapState extends State {
>   // Add a value to the map.
>   void put(K key, V value);
>   // Get all values for a given key.
>   ReadableState> get(K key);
>  // Return all entries in the map.
>   ReadableState>> allEntries();
>   // Return all entries in the map with keys <= limit. returned elements
> are sorted by the key.
>   ReadableState>> entriesUntil(K limit);
>
 // Remove all values with the given key;
>   void remove(K key);
>  // Remove all entries in the map with keys <= limit.
>   void removeUntil(K limit);
>
Will removeUntilExcl(K limit) also useful? It will remove all entries in
the map with keys < limit.


> Runners will sort based on the encoded value of the key. In order to make
> this easier for users, I propose that we introduce a new tag on Coders
> *PreservesOrder*. A Coder that contains this tag guarantees that the
> encoded value preserves the same ordering as the base Java type.
>

Could you clarify what is  "encoded value preserves the same ordering as
the base Java type"?


-Rui


Re: DISCUSS: Sorted MapState API

2019-05-23 Thread Lukasz Cwik
I would suggest that we drop MapState and instead support MultimapState
without ordering as a first pass and potentially add ordering later.

Inside an SDK we would be able to build a "sorted" multimap state that uses
a prefix of the key and implement radix based sorting. Any time a prefix
has too many elements, you rebalance and split it into 2 and use a slightly
longer prefix. Similarly, removing elements makes prefixes merge and become
shorter.




On Thu, May 23, 2019 at 10:36 AM Reuven Lax  wrote:

> Beam's state API is intended to be useful as an alternative aggregation
> method. Stateful ParDos can aggregate elements into state and set timers to
> read the state.  Currently Beam's state API supports two ways of storing
> collections of elements: BagState and MapState. BagState is the one most
> commonly for these aggregations, likely for the practical reason that it's
> the only one that has a scalable implementation on several runners.
>
> There are two major limitations of bag state:
> * It is immutable: To delete an element from BagState, you must read and
> rewrite the entire bag.
> * It is unsorted: Almost all use cases of BagState need the element in
> some order (usually timestamp order). Since BagState is unsorted, every
> time it's read it must be read in full and sorted in the ParDo.
>
> To make this concrete, consider a ParDo that does timeseries joins. There
> are two types of input events: trades and quotes. A quote tells the
> current price of a stock. The goal of the ParDo is to pair each stock's
> trades with the closest quote that proceeds it in time.
>
> A simple implementation would look as follows (pseudocode):
>
> void process(@Element Event event, @Timestamp timestamp) {
>   if (event.type() == TRADE) {
> tradesBag.add(event.quote());
> if (timestamp < existingTimerTime) {
>   setTimer(processTradeTimer, timestamp);
>}
>   } else {
> quotesBag.add(event.trade());
>   }
> }
>
> void onTimer(@Timestamp timestamp) {
>   List quotes = sort(quotesBag.get());
>   List trades = sort(tradesBag.get());
>   for (Trade trade : trades) {
> if (trade.timestamp() > timestamp) break;
> Quote quote = findClosestQuote(quotes, trade.timestamp);
> // Output KV
>   }
>
>   List newTrades = /* remaining trades. */
>   tradesBag.clear();
>   tradesBag.forEach(BagState::add);
> }
>
> A few obvious problems with this code:
>   1. Removing the elements already processed from the bag requires
> clearing and rewriting the entire bag. This is O(n^2) in the number of
> input trades.
>   2. Both bags need to be fetched and sorted on every timer. This is
> O(n^2*logn) in the size of the input PCollections.
>   3. BagState is designed so that the input iterables can be large, the
> entire BagState shouldn't have to fit in memory. However this stops working
> when you need to sort it, because we must page the entire BagState into
> memory in order to sort it.
>
> Some of these problems can be alleviated with separate GC, caching of
> BagState fetches, etc. However this results in increasingly complicated
> code, and does not solve all the problems above.
>
> I'd like to propose a new state type to solve the above problems:
> SortedMultimapState. SortedMultimapState could have the following signature:
>
> public interface SortedMultimapState extends State {
>   // Add a value to the map.
>   void put(K key, V value);
>   // Get all values for a given key.
>   ReadableState> get(K key);
>  // Return all entries in the map.
>   ReadableState>> allEntries();
>   // Return all entries in the map with keys <= limit. returned elements
> are sorted by the key.
>   ReadableState>> entriesUntil(K limit);
>  // Remove all values with the given key;
>   void remove(K key);
>  // Remove all entries in the map with keys <= limit.
>   void removeUntil(K limit);
> }
>
> Runners will sort based on the encoded value of the key. In order to make
> this easier for users, I propose that we introduce a new tag on Coders
> *PreservesOrder*. A Coder that contains this tag guarantees that the
> encoded value preserves the same ordering as the base Java type.
>
> The above logic can now be rewritten! Tjhere's no need to explicitly sort,
> since the elements are
> returned sorted by key (in this case the key will be timestamp). We can
> explicitly only fetch up to the current timer timestamp, so there's no need
> to pull the entire set into memory. Furthermore we can efficiently delete
> ranges, with no more need to rewrite the entire set each time.
>
> void onTimer(@Timestamp timestamp) {
>   Iterable> quotes = quotesMap.entriesUntil(timestamp);
>   Iterable> trades = tradesMap.entriesUntil(timestamp);
>   for (Trade trade : trades) {
> Quote quote = findClosestQuote(quotes, trade.timestamp);
> // Output KV
>   }
>   tradesMap.removeUntil(timestamp);
> }
>
> Comments?
>
> Reuven
>
>
>