I really appreciate your taking the time to reply.
I'm currently most interested in flink runner -- my use cases have been
streaming and KafkaIO. But I am also interested in any broader
guarantees in the cross-runner technical contract, in batch use cases,
and in what additional optimizations are allowed and may exist for data
stores that allow random access.
I'm hearing that there may be a few different strategies for
materialization. Which parts am I getting right/wrong and what am I
missing? And what type of materialization do GBK and side input use in
general and specifically in flink runner if you happen to know?
1. A stock Java List (or similar) that stores everything in RAM. The
reason why side inputs are required to fit in memory is to leave the
option open for runners to choose to materialize this way, NOT because
runners MUST materialize this way. In other words a side input that
doesn't fit in memory might happen to work, but you can't rely on that
behavior.
2a. A list that was a lens of sorts into a runner-proprietary
representation on disk. This would limit max materialization size to
data sets that fit on a computer, but not necessarily to RAM, and is a
case where side inputs might get cached when GBK does not.
2b. A list that was a lens into a runner specific representation that
didn't have to fit on a single computer.
3. A list that was a lens directly into an underlying random access IO,
like if the List privately contained primary key values from a database
table in an append only database system and went back to the database to
fetch values when user code called .get(). This is also a case where
side inputs might get cached when GBK does not.
4. An iterable that was procedurally defined and could only be traversed
once and used O(1) incremental storage.
On 09/25/2017 07:17 PM, Kenneth Knowles wrote:
This is a great question. Both a list side input and a GroupByKey on
one key collect all of the contents of a window into a single iterable.
There are a few high-level differences: Side inputs can be controlled
by triggers, but may have additional latency compared to a GBK
controlled by a trigger - technically there is no real spec here, just
"eventually". On the other hand, side inputs are expected to be read
many times, so runners will likely add caching that would not make
sense for the output of a GBK, so a list side input is probably
cheaper to read many times.
Dropping to a lower level of abstraction, a side input is generally a
view on a materialization of an entire PCollection. A runner knows
this and can choose a suitable materialization to support it. On the
other hand, a GroupByKey will generally use the runner's underlying
networked shuffle implementation; there's probably needless overhead
since all data is being shuffled to a single key.
I've tried to answer somewhat vaguely, in terms of the Beam model,
since this is an are where a Beam runner has a lot of discretion. The
answers could get more specific if you are interested in a particular
runner.
Kenn
On Sun, Sep 24, 2017 at 5:27 PM, Wesley Tanaka <[email protected]
<mailto:[email protected]>> wrote:
What are the differences between the side input approach and
.apply(WithKeys.of(31337))
.apply(GroupByKey.create())
.apply(Values.create())
?
---
Wesley Tanaka
https://wtanaka.com/
On Monday, July 10, 2017, 5:55:44 PM HST, Kenneth Knowles
<[email protected] <mailto:[email protected]>> wrote:
Hi bluejoe,
Assuming you know that you have a very small PCollection, the way
you can do this is by reading it as a side input. See
https://beam.apache.org/documentation/programming-guide/#transforms-sideio
<https://beam.apache.org/documentation/programming-guide/#transforms-sideio>
Here's a snippet as a teaser to read the docs I link to:
PCollection<Whatever> mySmallCollection = ...
PCollectionView<List<Whatever>> mySideInput =
mySmallCollection.apply(View.asList());
someOtherCollection.apply(ParDo.of(...).withSideInputs(mySideInput);
This won't work if your collection is actually large, where
"large" means too big for memory. And it could be slow depending
on your runner and access pattern, even if you have a medium-sized
PCollection, aka fits in memory but still a lot to read without
parallelism.
Hope that helps,
Kenn
On Mon, Jul 10, 2017 at 8:35 PM, bluejoe <[email protected]
<mailto:[email protected]>> wrote:
> Hi,
>
> Can anybody tell me how to convert a PCollection to an Array in
local memory?
> I noticed there is a Create.of() which converts a local list
into a PCollection
> But how to do the conversion in an inverse direction?
>
> Best regards,
> bluejoe
>
>