Probably makes sense to discuss this on GitHub next to the code instead,
but I agree that fanciness like Bloom filters would be appreciated.

I've got the cogroup proposal working with tests here:
https://github.com/apache/incubator-spark/pull/506


On Fri, Jan 24, 2014 at 12:09 AM, Josh Rosen <[email protected]> wrote:

> For cases where you expect the intersection to be small, we might be able
> to use Bloom filters to prune out tuples that can't possibly be in the
> intersection in order to reduce the amount of data that we need to shuffle.
>
> Implementation-wise, this would run into similar issues as sortByKey()
> with respect to laziness since it would involve an action to collect and
> broadcast the Bloom filters.
>
>
> On Thu, Jan 23, 2014 at 10:58 PM, Matei Zaharia 
> <[email protected]>wrote:
>
>> I know some other places used null; haven’t seen None but it might exist.
>>
>> Join actually uses cogroup internally right now so it will be at least as
>> slow as that, but the problem is that it will generate lots of pairs of
>> objects if there are multiple items in both datasets with the same key
>> (unlikely if you really are using them as sets, but could happen).
>>
>> Matei
>>
>> On Jan 23, 2014, at 10:50 PM, Evan Sparks <[email protected]> wrote:
>>
>> If the intersection is really big, would join be better?
>>
>> Agreed on "null" vs None -but how frequent is this in the current
>> codebase?
>>
>> On Jan 23, 2014, at 10:38 PM, Matei Zaharia <[email protected]>
>> wrote:
>>
>> You’d have to add a filter after the cogroup too. Cogroup gives you (key,
>> (list of values in RDD 1, list in RDD 2)).
>>
>> Also one small thing, instead of setting the value to None, it may be
>> cheaper to use null.
>>
>> Matei
>>
>> On Jan 23, 2014, at 10:30 PM, Andrew Ash <[email protected]> wrote:
>>
>> You mean cogroup like this?
>>
>> A.map(v => (v,None)).cogroup(B.map(v => (v,None))).keys
>>
>> If so I might send a PR to start code review for getting this into master.
>>
>> Good to know about the strategy for sharding RDDs and for the core
>> operations.
>>
>> Thanks!
>> Andrew
>>
>>
>> On Thu, Jan 23, 2014 at 11:17 PM, Matei Zaharia 
>> <[email protected]>wrote:
>>
>>> Using cogroup would probably be slightly more efficient than join
>>> because you don’t have to generate every pair of keys for elements that
>>> occur in each dataset multiple times.
>>>
>>> We haven’t tried to explicitly separate the API between “core” methods
>>> and others, but in practice, everything can be built on mapPartitions and
>>> cogroup for transformations, and SparkContext.runJob (internal method) for
>>> actions. What really matters is actually the level at which the code sees
>>> dependencies in the DAGScheduler, which is done through the Dependency
>>> class. There are only two types of dependencies (narrow and shuffle), which
>>> correspond to those operations above. So in a sense there is this
>>> separation at the lowest level. But for the levels above, the goal was
>>> first and foremost to make the API as usable as possible, which meant
>>> giving people quick access to all the operations that might be useful, and
>>> dealing with how we’ll implement those later. Over time it will be possible
>>> to divide things like RDD.scala into multiple traits if they become
>>> unwieldy.
>>>
>>> Matei
>>>
>>>
>>> On Jan 23, 2014, at 9:40 PM, Andrew Ash <[email protected]> wrote:
>>>
>>> And I think the followup to Ian's question:
>>>
>>> Is there a way to implement .intersect() in the core API that's more
>>> efficient than the .join() method Evan suggested?
>>>
>>> Andrew
>>>
>>>
>>> On Thu, Jan 23, 2014 at 10:26 PM, Ian O'Connell <[email protected]>wrote:
>>>
>>>> Is there any separation in the API between functions that can be built
>>>> solely on the existing exposed public API and ones which require access to
>>>> internals?
>>>>
>>>> Just to maybe avoid bloat for composite functions like this that are
>>>> for user convenience?
>>>>
>>>> (Ala something like lua's aux api vs core api?)
>>>>
>>>>
>>>> On Thu, Jan 23, 2014 at 8:33 PM, Matei Zaharia <[email protected]
>>>> > wrote:
>>>>
>>>>> I’d be happy to see this added to the core API.
>>>>>
>>>>> Matei
>>>>>
>>>>> On Jan 23, 2014, at 5:39 PM, Andrew Ash <[email protected]> wrote:
>>>>>
>>>>> Ah right of course -- perils of typing code without running it!
>>>>>
>>>>> It feels like this is a pretty core operation that should be added to
>>>>> the main RDD API.  Do other people not run into this often?
>>>>>
>>>>> When I'm validating a foreign key join in my cluster I often check to
>>>>> make sure that the foreign keys land on valid values on the referenced
>>>>> table, and the way I do that is checking to see what percentage of the
>>>>> references actually land.
>>>>>
>>>>>
>>>>> On Thu, Jan 23, 2014 at 6:36 PM, Evan R. Sparks <[email protected]
>>>>> > wrote:
>>>>>
>>>>>> Yup (well, with _._1 at the end!)
>>>>>>
>>>>>>
>>>>>> On Thu, Jan 23, 2014 at 5:28 PM, Andrew Ash <[email protected]>wrote:
>>>>>>
>>>>>>> You're thinking like this?
>>>>>>>
>>>>>>> A.map(v => (v,None)).join(B.map(v => (v,None))).map(_._2)
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Jan 23, 2014 at 6:26 PM, Evan R. Sparks <
>>>>>>> [email protected]> wrote:
>>>>>>>
>>>>>>>> You could map each to an RDD[(String,None)] and do a join.
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Jan 23, 2014 at 5:18 PM, Andrew Ash 
>>>>>>>> <[email protected]>wrote:
>>>>>>>>
>>>>>>>>> Hi spark users,
>>>>>>>>>
>>>>>>>>> I recently wanted to calculate the set intersection of two RDDs of
>>>>>>>>> Strings.  I couldn't find a .intersection() method in the 
>>>>>>>>> autocomplete or
>>>>>>>>> in the Scala API docs, so used a little set theory to end up with 
>>>>>>>>> this:
>>>>>>>>>
>>>>>>>>> lazy val A = ...
>>>>>>>>> lazy val B = ...
>>>>>>>>> A.union(B).subtract(A.subtract(B)).subtract(B.subtract(A))
>>>>>>>>>
>>>>>>>>> Which feels very cumbersome.
>>>>>>>>>
>>>>>>>>> Does anyone have a more idiomatic way to calculate intersection?
>>>>>>>>>
>>>>>>>>> Thanks!
>>>>>>>>> Andrew
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>>
>>
>>
>>
>

Reply via email to