That makes sense. Keeping the main input as [int, int] and using the side
input to provide the mapping from [int => data] to be able to compute the
[int, int, float] seems like a good approach.

If there are enough distinct ints and that none of them are hot keys I
would perform a GBK so at least one of your side input lookups is really
easy to cache transforming your pipeline to:
[int, int] -> GBK -> [int, iterable<int>] -> DoFn(for each [int, int] pair,
perform computation) -> [int, int, float]
If you do have hot keys, then do something where you group by a key based
upon concatenating one of the ints with a number between [0, Y) allows you
to distribute the work Y ways. Keeping Y small will improve caching, larger
Y helps with hot keys.

On Fri, Jul 7, 2017 at 8:26 AM, Randal Moore <[email protected]> wrote:

> Sorry for being confusing - I am still grasping at the correct semantics
> to use to refer to some of the things. I think that made a mess of the
> question.
>
> I think #2 above means that I'd like the behavior of the side input map
> running on DataFlow. I will given that a try.
>
> Let me make another attempt at the confusing question.  My pipeline source
> is a large number of [Int, Int] pairs and the pipeline transforms that to
> [Int, Int, Float]. To compute the Float, I need a relatively large set of
> data for each integer.  A given input integer value might be
> occur  [hundreds of] thousands of times within a given "window". The
> *assumed bad* design variant (at least what I assume is bad) is to
> transform the [Int, Int] pairs into [Int => Data, Int => Data] and then
> perform the calculate that produces the [Int, Int, Float]. That seems like
> it would transform the "window" from KB or MB to TB - with tons of
> redundant data within the overall flow.
>
> I had been assuming that the side input map of the large data sets would
> be better optimized by DataFlow.  Just wanted to confirm that.
>
> Does that make any sense?
> rdm
>
>
> On Thu, Jul 6, 2017 at 10:38 AM Lukasz Cwik <[email protected]> wrote:
>
>> #1: For all runners, the side input needs to be ready (data needs to
>> exist for the given window) before the main input is executed which means
>> that in your case the whole side input will be materialized before the main
>> input is executed.
>>
>> #2: For Dataflow, a map/multimap based side input is loaded lazily in
>> parts based upon which key is being accessed. Each segment of the map is
>> cached in memory (using an LRU policy) and the loading the data remotely is
>> the largest cost in such a system. Depending on how large your main input
>> is, performing a group by key on your access key will speed up your lookups
>> (because you'll get a lot more cache hits) but you have to weight the cost
>> of doing the GBK vs speed up in side input usage.
>>
>> What do you mean by "expanding the tuples to the expanded data"?
>> * Are you trying to say that typically you'll look up the same value 100+
>> times from the side input
>> ** In this case performing a GBK based upon your lookup key may be of
>> benefit
>> * Are you trying to say that you could have the data stored within the
>> side input instead of just the index but it would be 100 times larger?
>> ** A map based side input which has values which are 4 bytes vs 400 bytes
>> isn't going to change much in lookup cost
>>
>>
>>
>> On Wed, Jul 5, 2017 at 6:22 PM, Randal Moore <[email protected]> wrote:
>>
>>> Based on my understanding so far, I'm targeting Dataflow with a batch
>>> pipeline. Just starting to experiment with the setup/teardown with the
>>> local runner - that might work fine.
>>>
>>> Somewhat intrigued with the side inputs, though.  The pipeline might
>>> iterate over 1,000,000 tuples of two integers.  The integers are indices
>>> into a database of data. A given integer will be repeated in the inputs
>>> many times.  Am I prematurely optimizing to rule out expanding the tuples
>>> to the expanded data as each value might be expanded 100 or more times? As
>>> side inputs, it might expand to ~100GB.  Expanding the input would be
>>> significantly bigger.
>>>
>>> #1 how does Dataflow schedule the pipeline with a map side input - does
>>> it wait until the whole map is collected?
>>> #2 can the DoFn specify that it depends on only specific keys of the
>>> side input map?  does that affect the scheduling of the DoFn?
>>>
>>> Thanks for any pointers...
>>> rdm
>>>
>>> On Wed, Jul 5, 2017 at 4:58 PM Lukasz Cwik <[email protected]> wrote:
>>>
>>>> That should have said:
>>>> ~100s MiBs per window in streaming pipelines
>>>>
>>>> On Wed, Jul 5, 2017 at 2:58 PM, Lukasz Cwik <[email protected]> wrote:
>>>>
>>>>> #1, side inputs supported sizes and performance are specific to a
>>>>> runner. For example, I know that Dataflow supports side inputs which are 
>>>>> 1+
>>>>> TiB (aggregate) in batch pipelines and ~100s MiBs per window because there
>>>>> have been several one off benchmarks/runs. What kinds of sizes/use case do
>>>>> you want to support, some runners will do a much better job with really
>>>>> small side inputs while others will be better with really large side 
>>>>> inputs?
>>>>>
>>>>> #2, this depends on which library your using to perform the REST calls
>>>>> and whether it is thread safe. DoFns can be shared across multiple bundles
>>>>> and can contain methods marked with @Setup/@Teardown which only get 
>>>>> invoked
>>>>> once per DoFn instance (which is relatively infrequently) and you could
>>>>> store an instance per DoFn instead of a singleton if the REST library was
>>>>> not thread safe.
>>>>>
>>>>> On Wed, Jul 5, 2017 at 2:45 PM, Randal Moore <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> I have a step in my beam pipeline that needs some data from a rest
>>>>>> service. The data acquired from the rest service is dependent on the
>>>>>> context of the data being processed and relatively large. The rest 
>>>>>> client I
>>>>>> am using isn't serializable - nor is it likely possible to make it so
>>>>>> (background threads, etc.).
>>>>>>
>>>>>> #1 What are the practical limits to the size of side inputs (e.g., I
>>>>>> could try to gather all the data from the rest service and provide it as 
>>>>>> a
>>>>>> side-input)?
>>>>>>
>>>>>> #2 Assuming that using the rest client is the better option, would a
>>>>>> singleton instance be safe way to instantiate the rest client?
>>>>>>
>>>>>> Thanks,
>>>>>> rdm
>>>>>>
>>>>>
>>>>>
>>>>
>>

Reply via email to