That makes sense. Keeping the main input as [int, int] and using the side input to provide the mapping from [int => data] to be able to compute the [int, int, float] seems like a good approach.
If there are enough distinct ints and that none of them are hot keys I would perform a GBK so at least one of your side input lookups is really easy to cache transforming your pipeline to: [int, int] -> GBK -> [int, iterable<int>] -> DoFn(for each [int, int] pair, perform computation) -> [int, int, float] If you do have hot keys, then do something where you group by a key based upon concatenating one of the ints with a number between [0, Y) allows you to distribute the work Y ways. Keeping Y small will improve caching, larger Y helps with hot keys. On Fri, Jul 7, 2017 at 8:26 AM, Randal Moore <[email protected]> wrote: > Sorry for being confusing - I am still grasping at the correct semantics > to use to refer to some of the things. I think that made a mess of the > question. > > I think #2 above means that I'd like the behavior of the side input map > running on DataFlow. I will given that a try. > > Let me make another attempt at the confusing question. My pipeline source > is a large number of [Int, Int] pairs and the pipeline transforms that to > [Int, Int, Float]. To compute the Float, I need a relatively large set of > data for each integer. A given input integer value might be > occur [hundreds of] thousands of times within a given "window". The > *assumed bad* design variant (at least what I assume is bad) is to > transform the [Int, Int] pairs into [Int => Data, Int => Data] and then > perform the calculate that produces the [Int, Int, Float]. That seems like > it would transform the "window" from KB or MB to TB - with tons of > redundant data within the overall flow. > > I had been assuming that the side input map of the large data sets would > be better optimized by DataFlow. Just wanted to confirm that. > > Does that make any sense? > rdm > > > On Thu, Jul 6, 2017 at 10:38 AM Lukasz Cwik <[email protected]> wrote: > >> #1: For all runners, the side input needs to be ready (data needs to >> exist for the given window) before the main input is executed which means >> that in your case the whole side input will be materialized before the main >> input is executed. >> >> #2: For Dataflow, a map/multimap based side input is loaded lazily in >> parts based upon which key is being accessed. Each segment of the map is >> cached in memory (using an LRU policy) and the loading the data remotely is >> the largest cost in such a system. Depending on how large your main input >> is, performing a group by key on your access key will speed up your lookups >> (because you'll get a lot more cache hits) but you have to weight the cost >> of doing the GBK vs speed up in side input usage. >> >> What do you mean by "expanding the tuples to the expanded data"? >> * Are you trying to say that typically you'll look up the same value 100+ >> times from the side input >> ** In this case performing a GBK based upon your lookup key may be of >> benefit >> * Are you trying to say that you could have the data stored within the >> side input instead of just the index but it would be 100 times larger? >> ** A map based side input which has values which are 4 bytes vs 400 bytes >> isn't going to change much in lookup cost >> >> >> >> On Wed, Jul 5, 2017 at 6:22 PM, Randal Moore <[email protected]> wrote: >> >>> Based on my understanding so far, I'm targeting Dataflow with a batch >>> pipeline. Just starting to experiment with the setup/teardown with the >>> local runner - that might work fine. >>> >>> Somewhat intrigued with the side inputs, though. The pipeline might >>> iterate over 1,000,000 tuples of two integers. The integers are indices >>> into a database of data. A given integer will be repeated in the inputs >>> many times. Am I prematurely optimizing to rule out expanding the tuples >>> to the expanded data as each value might be expanded 100 or more times? As >>> side inputs, it might expand to ~100GB. Expanding the input would be >>> significantly bigger. >>> >>> #1 how does Dataflow schedule the pipeline with a map side input - does >>> it wait until the whole map is collected? >>> #2 can the DoFn specify that it depends on only specific keys of the >>> side input map? does that affect the scheduling of the DoFn? >>> >>> Thanks for any pointers... >>> rdm >>> >>> On Wed, Jul 5, 2017 at 4:58 PM Lukasz Cwik <[email protected]> wrote: >>> >>>> That should have said: >>>> ~100s MiBs per window in streaming pipelines >>>> >>>> On Wed, Jul 5, 2017 at 2:58 PM, Lukasz Cwik <[email protected]> wrote: >>>> >>>>> #1, side inputs supported sizes and performance are specific to a >>>>> runner. For example, I know that Dataflow supports side inputs which are >>>>> 1+ >>>>> TiB (aggregate) in batch pipelines and ~100s MiBs per window because there >>>>> have been several one off benchmarks/runs. What kinds of sizes/use case do >>>>> you want to support, some runners will do a much better job with really >>>>> small side inputs while others will be better with really large side >>>>> inputs? >>>>> >>>>> #2, this depends on which library your using to perform the REST calls >>>>> and whether it is thread safe. DoFns can be shared across multiple bundles >>>>> and can contain methods marked with @Setup/@Teardown which only get >>>>> invoked >>>>> once per DoFn instance (which is relatively infrequently) and you could >>>>> store an instance per DoFn instead of a singleton if the REST library was >>>>> not thread safe. >>>>> >>>>> On Wed, Jul 5, 2017 at 2:45 PM, Randal Moore <[email protected]> >>>>> wrote: >>>>> >>>>>> I have a step in my beam pipeline that needs some data from a rest >>>>>> service. The data acquired from the rest service is dependent on the >>>>>> context of the data being processed and relatively large. The rest >>>>>> client I >>>>>> am using isn't serializable - nor is it likely possible to make it so >>>>>> (background threads, etc.). >>>>>> >>>>>> #1 What are the practical limits to the size of side inputs (e.g., I >>>>>> could try to gather all the data from the rest service and provide it as >>>>>> a >>>>>> side-input)? >>>>>> >>>>>> #2 Assuming that using the rest client is the better option, would a >>>>>> singleton instance be safe way to instantiate the rest client? >>>>>> >>>>>> Thanks, >>>>>> rdm >>>>>> >>>>> >>>>> >>>> >>
