I like the way this looks.  I'll do some experimenting with the GBK idea!
Thanks,
rdm

On Fri, Jul 7, 2017 at 11:39 AM Lukasz Cwik <[email protected]> wrote:

> That makes sense. Keeping the main input as [int, int] and using the side
> input to provide the mapping from [int => data] to be able to compute the
> [int, int, float] seems like a good approach.
>
> If there are enough distinct ints and that none of them are hot keys I
> would perform a GBK so at least one of your side input lookups is really
> easy to cache transforming your pipeline to:
> [int, int] -> GBK -> [int, iterable<int>] -> DoFn(for each [int, int]
> pair, perform computation) -> [int, int, float]
> If you do have hot keys, then do something where you group by a key based
> upon concatenating one of the ints with a number between [0, Y) allows you
> to distribute the work Y ways. Keeping Y small will improve caching, larger
> Y helps with hot keys.
>
> On Fri, Jul 7, 2017 at 8:26 AM, Randal Moore <[email protected]> wrote:
>
>> Sorry for being confusing - I am still grasping at the correct semantics
>> to use to refer to some of the things. I think that made a mess of the
>> question.
>>
>> I think #2 above means that I'd like the behavior of the side input map
>> running on DataFlow. I will given that a try.
>>
>> Let me make another attempt at the confusing question.  My pipeline
>> source is a large number of [Int, Int] pairs and the pipeline transforms
>> that to [Int, Int, Float]. To compute the Float, I need a relatively large
>> set of data for each integer.  A given input integer value might be
>> occur  [hundreds of] thousands of times within a given "window". The
>> *assumed bad* design variant (at least what I assume is bad) is to
>> transform the [Int, Int] pairs into [Int => Data, Int => Data] and then
>> perform the calculate that produces the [Int, Int, Float]. That seems like
>> it would transform the "window" from KB or MB to TB - with tons of
>> redundant data within the overall flow.
>>
>> I had been assuming that the side input map of the large data sets would
>> be better optimized by DataFlow.  Just wanted to confirm that.
>>
>> Does that make any sense?
>> rdm
>>
>>
>> On Thu, Jul 6, 2017 at 10:38 AM Lukasz Cwik <[email protected]> wrote:
>>
>>> #1: For all runners, the side input needs to be ready (data needs to
>>> exist for the given window) before the main input is executed which means
>>> that in your case the whole side input will be materialized before the main
>>> input is executed.
>>>
>>> #2: For Dataflow, a map/multimap based side input is loaded lazily in
>>> parts based upon which key is being accessed. Each segment of the map is
>>> cached in memory (using an LRU policy) and the loading the data remotely is
>>> the largest cost in such a system. Depending on how large your main input
>>> is, performing a group by key on your access key will speed up your lookups
>>> (because you'll get a lot more cache hits) but you have to weight the cost
>>> of doing the GBK vs speed up in side input usage.
>>>
>>> What do you mean by "expanding the tuples to the expanded data"?
>>> * Are you trying to say that typically you'll look up the same value
>>> 100+ times from the side input
>>> ** In this case performing a GBK based upon your lookup key may be of
>>> benefit
>>> * Are you trying to say that you could have the data stored within the
>>> side input instead of just the index but it would be 100 times larger?
>>> ** A map based side input which has values which are 4 bytes vs 400
>>> bytes isn't going to change much in lookup cost
>>>
>>>
>>>
>>> On Wed, Jul 5, 2017 at 6:22 PM, Randal Moore <[email protected]>
>>> wrote:
>>>
>>>> Based on my understanding so far, I'm targeting Dataflow with a batch
>>>> pipeline. Just starting to experiment with the setup/teardown with the
>>>> local runner - that might work fine.
>>>>
>>>> Somewhat intrigued with the side inputs, though.  The pipeline might
>>>> iterate over 1,000,000 tuples of two integers.  The integers are indices
>>>> into a database of data. A given integer will be repeated in the inputs
>>>> many times.  Am I prematurely optimizing to rule out expanding the tuples
>>>> to the expanded data as each value might be expanded 100 or more times? As
>>>> side inputs, it might expand to ~100GB.  Expanding the input would be
>>>> significantly bigger.
>>>>
>>>> #1 how does Dataflow schedule the pipeline with a map side input - does
>>>> it wait until the whole map is collected?
>>>> #2 can the DoFn specify that it depends on only specific keys of the
>>>> side input map?  does that affect the scheduling of the DoFn?
>>>>
>>>> Thanks for any pointers...
>>>> rdm
>>>>
>>>> On Wed, Jul 5, 2017 at 4:58 PM Lukasz Cwik <[email protected]> wrote:
>>>>
>>>>> That should have said:
>>>>> ~100s MiBs per window in streaming pipelines
>>>>>
>>>>> On Wed, Jul 5, 2017 at 2:58 PM, Lukasz Cwik <[email protected]> wrote:
>>>>>
>>>>>> #1, side inputs supported sizes and performance are specific to a
>>>>>> runner. For example, I know that Dataflow supports side inputs which are 
>>>>>> 1+
>>>>>> TiB (aggregate) in batch pipelines and ~100s MiBs per window because 
>>>>>> there
>>>>>> have been several one off benchmarks/runs. What kinds of sizes/use case 
>>>>>> do
>>>>>> you want to support, some runners will do a much better job with really
>>>>>> small side inputs while others will be better with really large side 
>>>>>> inputs?
>>>>>>
>>>>>> #2, this depends on which library your using to perform the REST
>>>>>> calls and whether it is thread safe. DoFns can be shared across multiple
>>>>>> bundles and can contain methods marked with @Setup/@Teardown which only 
>>>>>> get
>>>>>> invoked once per DoFn instance (which is relatively infrequently) and you
>>>>>> could store an instance per DoFn instead of a singleton if the REST 
>>>>>> library
>>>>>> was not thread safe.
>>>>>>
>>>>>> On Wed, Jul 5, 2017 at 2:45 PM, Randal Moore <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> I have a step in my beam pipeline that needs some data from a rest
>>>>>>> service. The data acquired from the rest service is dependent on the
>>>>>>> context of the data being processed and relatively large. The rest 
>>>>>>> client I
>>>>>>> am using isn't serializable - nor is it likely possible to make it so
>>>>>>> (background threads, etc.).
>>>>>>>
>>>>>>> #1 What are the practical limits to the size of side inputs (e.g., I
>>>>>>> could try to gather all the data from the rest service and provide it 
>>>>>>> as a
>>>>>>> side-input)?
>>>>>>>
>>>>>>> #2 Assuming that using the rest client is the better option, would a
>>>>>>> singleton instance be safe way to instantiate the rest client?
>>>>>>>
>>>>>>> Thanks,
>>>>>>> rdm
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>
>

Reply via email to