I like the way this looks. I'll do some experimenting with the GBK idea! Thanks, rdm
On Fri, Jul 7, 2017 at 11:39 AM Lukasz Cwik <[email protected]> wrote: > That makes sense. Keeping the main input as [int, int] and using the side > input to provide the mapping from [int => data] to be able to compute the > [int, int, float] seems like a good approach. > > If there are enough distinct ints and that none of them are hot keys I > would perform a GBK so at least one of your side input lookups is really > easy to cache transforming your pipeline to: > [int, int] -> GBK -> [int, iterable<int>] -> DoFn(for each [int, int] > pair, perform computation) -> [int, int, float] > If you do have hot keys, then do something where you group by a key based > upon concatenating one of the ints with a number between [0, Y) allows you > to distribute the work Y ways. Keeping Y small will improve caching, larger > Y helps with hot keys. > > On Fri, Jul 7, 2017 at 8:26 AM, Randal Moore <[email protected]> wrote: > >> Sorry for being confusing - I am still grasping at the correct semantics >> to use to refer to some of the things. I think that made a mess of the >> question. >> >> I think #2 above means that I'd like the behavior of the side input map >> running on DataFlow. I will given that a try. >> >> Let me make another attempt at the confusing question. My pipeline >> source is a large number of [Int, Int] pairs and the pipeline transforms >> that to [Int, Int, Float]. To compute the Float, I need a relatively large >> set of data for each integer. A given input integer value might be >> occur [hundreds of] thousands of times within a given "window". The >> *assumed bad* design variant (at least what I assume is bad) is to >> transform the [Int, Int] pairs into [Int => Data, Int => Data] and then >> perform the calculate that produces the [Int, Int, Float]. That seems like >> it would transform the "window" from KB or MB to TB - with tons of >> redundant data within the overall flow. >> >> I had been assuming that the side input map of the large data sets would >> be better optimized by DataFlow. Just wanted to confirm that. >> >> Does that make any sense? >> rdm >> >> >> On Thu, Jul 6, 2017 at 10:38 AM Lukasz Cwik <[email protected]> wrote: >> >>> #1: For all runners, the side input needs to be ready (data needs to >>> exist for the given window) before the main input is executed which means >>> that in your case the whole side input will be materialized before the main >>> input is executed. >>> >>> #2: For Dataflow, a map/multimap based side input is loaded lazily in >>> parts based upon which key is being accessed. Each segment of the map is >>> cached in memory (using an LRU policy) and the loading the data remotely is >>> the largest cost in such a system. Depending on how large your main input >>> is, performing a group by key on your access key will speed up your lookups >>> (because you'll get a lot more cache hits) but you have to weight the cost >>> of doing the GBK vs speed up in side input usage. >>> >>> What do you mean by "expanding the tuples to the expanded data"? >>> * Are you trying to say that typically you'll look up the same value >>> 100+ times from the side input >>> ** In this case performing a GBK based upon your lookup key may be of >>> benefit >>> * Are you trying to say that you could have the data stored within the >>> side input instead of just the index but it would be 100 times larger? >>> ** A map based side input which has values which are 4 bytes vs 400 >>> bytes isn't going to change much in lookup cost >>> >>> >>> >>> On Wed, Jul 5, 2017 at 6:22 PM, Randal Moore <[email protected]> >>> wrote: >>> >>>> Based on my understanding so far, I'm targeting Dataflow with a batch >>>> pipeline. Just starting to experiment with the setup/teardown with the >>>> local runner - that might work fine. >>>> >>>> Somewhat intrigued with the side inputs, though. The pipeline might >>>> iterate over 1,000,000 tuples of two integers. The integers are indices >>>> into a database of data. A given integer will be repeated in the inputs >>>> many times. Am I prematurely optimizing to rule out expanding the tuples >>>> to the expanded data as each value might be expanded 100 or more times? As >>>> side inputs, it might expand to ~100GB. Expanding the input would be >>>> significantly bigger. >>>> >>>> #1 how does Dataflow schedule the pipeline with a map side input - does >>>> it wait until the whole map is collected? >>>> #2 can the DoFn specify that it depends on only specific keys of the >>>> side input map? does that affect the scheduling of the DoFn? >>>> >>>> Thanks for any pointers... >>>> rdm >>>> >>>> On Wed, Jul 5, 2017 at 4:58 PM Lukasz Cwik <[email protected]> wrote: >>>> >>>>> That should have said: >>>>> ~100s MiBs per window in streaming pipelines >>>>> >>>>> On Wed, Jul 5, 2017 at 2:58 PM, Lukasz Cwik <[email protected]> wrote: >>>>> >>>>>> #1, side inputs supported sizes and performance are specific to a >>>>>> runner. For example, I know that Dataflow supports side inputs which are >>>>>> 1+ >>>>>> TiB (aggregate) in batch pipelines and ~100s MiBs per window because >>>>>> there >>>>>> have been several one off benchmarks/runs. What kinds of sizes/use case >>>>>> do >>>>>> you want to support, some runners will do a much better job with really >>>>>> small side inputs while others will be better with really large side >>>>>> inputs? >>>>>> >>>>>> #2, this depends on which library your using to perform the REST >>>>>> calls and whether it is thread safe. DoFns can be shared across multiple >>>>>> bundles and can contain methods marked with @Setup/@Teardown which only >>>>>> get >>>>>> invoked once per DoFn instance (which is relatively infrequently) and you >>>>>> could store an instance per DoFn instead of a singleton if the REST >>>>>> library >>>>>> was not thread safe. >>>>>> >>>>>> On Wed, Jul 5, 2017 at 2:45 PM, Randal Moore <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> I have a step in my beam pipeline that needs some data from a rest >>>>>>> service. The data acquired from the rest service is dependent on the >>>>>>> context of the data being processed and relatively large. The rest >>>>>>> client I >>>>>>> am using isn't serializable - nor is it likely possible to make it so >>>>>>> (background threads, etc.). >>>>>>> >>>>>>> #1 What are the practical limits to the size of side inputs (e.g., I >>>>>>> could try to gather all the data from the rest service and provide it >>>>>>> as a >>>>>>> side-input)? >>>>>>> >>>>>>> #2 Assuming that using the rest client is the better option, would a >>>>>>> singleton instance be safe way to instantiate the rest client? >>>>>>> >>>>>>> Thanks, >>>>>>> rdm >>>>>>> >>>>>> >>>>>> >>>>> >>> >
