Probably makes sense to discuss this on GitHub next to the code instead, but I agree that fanciness like Bloom filters would be appreciated.
I've got the cogroup proposal working with tests here: https://github.com/apache/incubator-spark/pull/506 On Fri, Jan 24, 2014 at 12:09 AM, Josh Rosen <[email protected]> wrote: > For cases where you expect the intersection to be small, we might be able > to use Bloom filters to prune out tuples that can't possibly be in the > intersection in order to reduce the amount of data that we need to shuffle. > > Implementation-wise, this would run into similar issues as sortByKey() > with respect to laziness since it would involve an action to collect and > broadcast the Bloom filters. > > > On Thu, Jan 23, 2014 at 10:58 PM, Matei Zaharia > <[email protected]>wrote: > >> I know some other places used null; haven’t seen None but it might exist. >> >> Join actually uses cogroup internally right now so it will be at least as >> slow as that, but the problem is that it will generate lots of pairs of >> objects if there are multiple items in both datasets with the same key >> (unlikely if you really are using them as sets, but could happen). >> >> Matei >> >> On Jan 23, 2014, at 10:50 PM, Evan Sparks <[email protected]> wrote: >> >> If the intersection is really big, would join be better? >> >> Agreed on "null" vs None -but how frequent is this in the current >> codebase? >> >> On Jan 23, 2014, at 10:38 PM, Matei Zaharia <[email protected]> >> wrote: >> >> You’d have to add a filter after the cogroup too. Cogroup gives you (key, >> (list of values in RDD 1, list in RDD 2)). >> >> Also one small thing, instead of setting the value to None, it may be >> cheaper to use null. >> >> Matei >> >> On Jan 23, 2014, at 10:30 PM, Andrew Ash <[email protected]> wrote: >> >> You mean cogroup like this? >> >> A.map(v => (v,None)).cogroup(B.map(v => (v,None))).keys >> >> If so I might send a PR to start code review for getting this into master. >> >> Good to know about the strategy for sharding RDDs and for the core >> operations. >> >> Thanks! >> Andrew >> >> >> On Thu, Jan 23, 2014 at 11:17 PM, Matei Zaharia >> <[email protected]>wrote: >> >>> Using cogroup would probably be slightly more efficient than join >>> because you don’t have to generate every pair of keys for elements that >>> occur in each dataset multiple times. >>> >>> We haven’t tried to explicitly separate the API between “core” methods >>> and others, but in practice, everything can be built on mapPartitions and >>> cogroup for transformations, and SparkContext.runJob (internal method) for >>> actions. What really matters is actually the level at which the code sees >>> dependencies in the DAGScheduler, which is done through the Dependency >>> class. There are only two types of dependencies (narrow and shuffle), which >>> correspond to those operations above. So in a sense there is this >>> separation at the lowest level. But for the levels above, the goal was >>> first and foremost to make the API as usable as possible, which meant >>> giving people quick access to all the operations that might be useful, and >>> dealing with how we’ll implement those later. Over time it will be possible >>> to divide things like RDD.scala into multiple traits if they become >>> unwieldy. >>> >>> Matei >>> >>> >>> On Jan 23, 2014, at 9:40 PM, Andrew Ash <[email protected]> wrote: >>> >>> And I think the followup to Ian's question: >>> >>> Is there a way to implement .intersect() in the core API that's more >>> efficient than the .join() method Evan suggested? >>> >>> Andrew >>> >>> >>> On Thu, Jan 23, 2014 at 10:26 PM, Ian O'Connell <[email protected]>wrote: >>> >>>> Is there any separation in the API between functions that can be built >>>> solely on the existing exposed public API and ones which require access to >>>> internals? >>>> >>>> Just to maybe avoid bloat for composite functions like this that are >>>> for user convenience? >>>> >>>> (Ala something like lua's aux api vs core api?) >>>> >>>> >>>> On Thu, Jan 23, 2014 at 8:33 PM, Matei Zaharia <[email protected] >>>> > wrote: >>>> >>>>> I’d be happy to see this added to the core API. >>>>> >>>>> Matei >>>>> >>>>> On Jan 23, 2014, at 5:39 PM, Andrew Ash <[email protected]> wrote: >>>>> >>>>> Ah right of course -- perils of typing code without running it! >>>>> >>>>> It feels like this is a pretty core operation that should be added to >>>>> the main RDD API. Do other people not run into this often? >>>>> >>>>> When I'm validating a foreign key join in my cluster I often check to >>>>> make sure that the foreign keys land on valid values on the referenced >>>>> table, and the way I do that is checking to see what percentage of the >>>>> references actually land. >>>>> >>>>> >>>>> On Thu, Jan 23, 2014 at 6:36 PM, Evan R. Sparks <[email protected] >>>>> > wrote: >>>>> >>>>>> Yup (well, with _._1 at the end!) >>>>>> >>>>>> >>>>>> On Thu, Jan 23, 2014 at 5:28 PM, Andrew Ash <[email protected]>wrote: >>>>>> >>>>>>> You're thinking like this? >>>>>>> >>>>>>> A.map(v => (v,None)).join(B.map(v => (v,None))).map(_._2) >>>>>>> >>>>>>> >>>>>>> On Thu, Jan 23, 2014 at 6:26 PM, Evan R. Sparks < >>>>>>> [email protected]> wrote: >>>>>>> >>>>>>>> You could map each to an RDD[(String,None)] and do a join. >>>>>>>> >>>>>>>> >>>>>>>> On Thu, Jan 23, 2014 at 5:18 PM, Andrew Ash >>>>>>>> <[email protected]>wrote: >>>>>>>> >>>>>>>>> Hi spark users, >>>>>>>>> >>>>>>>>> I recently wanted to calculate the set intersection of two RDDs of >>>>>>>>> Strings. I couldn't find a .intersection() method in the >>>>>>>>> autocomplete or >>>>>>>>> in the Scala API docs, so used a little set theory to end up with >>>>>>>>> this: >>>>>>>>> >>>>>>>>> lazy val A = ... >>>>>>>>> lazy val B = ... >>>>>>>>> A.union(B).subtract(A.subtract(B)).subtract(B.subtract(A)) >>>>>>>>> >>>>>>>>> Which feels very cumbersome. >>>>>>>>> >>>>>>>>> Does anyone have a more idiomatic way to calculate intersection? >>>>>>>>> >>>>>>>>> Thanks! >>>>>>>>> Andrew >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>>> >>>> >>> >>> >> >> >> >
