I really appreciate your taking the time to reply.

I'm currently most interested in flink runner -- my use cases have been streaming and KafkaIO.  But I am also interested in any broader guarantees in the cross-runner technical contract, in batch use cases, and in what additional optimizations are allowed and may exist for data stores that allow random access.

I'm hearing that there may be a few different strategies for materialization. Which parts am I getting right/wrong and what am I missing? And what type of materialization do GBK and side input use in general and specifically in flink runner if you happen to know?

1. A stock Java List (or similar) that stores everything in RAM.  The reason why side inputs are required to fit in memory is to leave the option open for runners to choose to materialize this way, NOT because runners MUST materialize this way.  In other words a side input that doesn't fit in memory might happen to work, but you can't rely on that behavior.

2a. A list that was a lens of sorts into a runner-proprietary representation on disk.  This would limit max materialization size to data sets that fit on a computer, but not necessarily to RAM, and is a case where side inputs might get cached when GBK does not.

2b. A list that was a lens into a runner specific representation that didn't have to fit on a single computer.

3. A list that was a lens directly into an underlying random access IO, like if the List privately contained primary key values from a database table in an append only database system and went back to the database to fetch values when user code called .get(). This is also a case where side inputs might get cached when GBK does not.

4. An iterable that was procedurally defined and could only be traversed once and used O(1) incremental storage.

On 09/25/2017 07:17 PM, Kenneth Knowles wrote:
This is a great question. Both a list side input and a GroupByKey on one key collect all of the contents of a window into a single iterable.

There are a few high-level differences: Side inputs can be controlled by triggers, but may have additional latency compared to a GBK controlled by a trigger - technically there is no real spec here, just "eventually". On the other hand, side inputs are expected to be read many times, so runners will likely add caching that would not make sense for the output of a GBK, so a list side input is probably cheaper to read many times.

Dropping to a lower level of abstraction, a side input is generally a view on a materialization of an entire PCollection. A runner knows this and can choose a suitable materialization to support it. On the other hand, a GroupByKey will generally use the runner's underlying networked shuffle implementation; there's probably needless overhead since all data is being shuffled to a single key.

I've tried to answer somewhat vaguely, in terms of the Beam model, since this is an are where a Beam runner has a lot of discretion. The answers could get more specific if you are interested in a particular runner.

Kenn

On Sun, Sep 24, 2017 at 5:27 PM, Wesley Tanaka <[email protected] <mailto:[email protected]>> wrote:

    What are the differences between the side input approach and


    .apply(WithKeys.of(31337))
    .apply(GroupByKey.create())
    .apply(Values.create())

    ?



    ---
    Wesley Tanaka
    https://wtanaka.com/




    On Monday, July 10, 2017, 5:55:44 PM HST, Kenneth Knowles
    <[email protected] <mailto:[email protected]>> wrote:





    Hi bluejoe,

    Assuming you know that you have a very small PCollection, the way
    you can do this is by reading it as a side input. See
    https://beam.apache.org/documentation/programming-guide/#transforms-sideio
    <https://beam.apache.org/documentation/programming-guide/#transforms-sideio>

    Here's a snippet as a teaser to read the docs I link to:

        PCollection<Whatever> mySmallCollection = ...
        PCollectionView<List<Whatever>> mySideInput =
    mySmallCollection.apply(View.asList());
       
    someOtherCollection.apply(ParDo.of(...).withSideInputs(mySideInput);

    This won't work if your collection is actually large, where
    "large" means too big for memory. And it could be slow depending
    on your runner and access pattern, even if you have a medium-sized
    PCollection, aka fits in memory but still a lot to read without
    parallelism.

    Hope that helps,

    Kenn

    On Mon, Jul 10, 2017 at 8:35 PM, bluejoe <[email protected]
    <mailto:[email protected]>> wrote:
    > Hi,
    >
    > Can anybody tell me how to convert a PCollection to an Array in
    local memory?
    > I noticed there is a Create.of() which converts a local list
    into a PCollection
    > But how to do the conversion in an inverse direction?
    >
    > Best regards,
    > bluejoe
    >
    >




Reply via email to