Github user douglaz commented on the pull request:

    https://github.com/apache/spark/pull/813#issuecomment-43659430
  
    It isn't just about lines of code, it is about pollution of code using 
`asInstanceOf` and runtime errors because of this and wrong pattern matching on 
Sequences.
    
    Compare this almost-real-code using `cogroup`:
    
    ```scala
    val userHistories = parsedViews.cogroup(parsedBuyOrders, 
parsedShoppingCarts, parsedSentMails, partitioner=context.partitioner)
      .map(values => {
        val (key, events) = values
        val (groupedViews, groupedBuyOrders, groupedShoppingCarts, 
groupedSentMails) = events
    
        val sentMailsProducts = groupedSentMails.flatMap(_.products)
    
        val validViews = groupedViews.filter(v => 
!sentMailsProducts.contains(v.productId))
    
        key -> UserHistory(validViews, groupedBuyOrders, groupedShoppingCarts, 
groupedSentMails)
      })
    ```
    
    With this using `CoGroupedRDD`:
    
    ```scala
    // Perhaps there is some mistake here, a RDD may be missing
    val userHistories = new CoGroupedRDD(Seq(parsedViews, parsedBuyOrders, 
parsedShoppingCarts, parsedSentMails), part=context.partitioner)
      .map(values => {
        val (key, events) = values
      
        // Or the match is wrong here
        val Seq(_groupedViews, _groupedBuyOrders, _groupedShoppingCarts, 
_groupedSentMails) = events
      
        // Or here we are casting with the wrong type. We'll find out at runtime
        val groupedViews = _groupedViews.asInstanceOf[Seq[UHView]]
        val groupedBuyOrders = _groupedBuyOrders.asInstanceOf[Seq[UHBuyOrder]]
        val groupedShoppingCarts = 
_groupedShoppingCarts.asInstanceOf[Seq[UHShoppingCartLog]]
        val groupedSentMails = _groupedSentMails.asInstanceOf[Seq[UHSentMail]]
      
        val sentMailsProducts = groupedSentMails.flatMap(_.products)
      
        val validViews = groupedViews.filter(v => 
!sentMailsProducts.contains(v.productId))
      
        key -> UserHistory(validViews, groupedBuyOrders, groupedShoppingCarts, 
groupedSentMails)
      })
    ```
    
    The second example is clearly more verbose and error-prone.
    
    Comparing `cogroup` with union misses the point:
    - `cogroup` may be called using different types and it keeps them thanks to 
the tuple signature. With union there is just one type. 
    - `rdd1.union(rdd2).union(rdd3)` works very well and is transparent to the 
user of the resulting RDD, while `rdd1.cogroup(rdd2).cogroup(rdd3)` will be 
very different from `rdd1.cogroup(rdd2, rdd3)`. The composition works fine for 
`union` but for `cogroup` we start to get `Seq[Seq[` and of course we may have 
performance implications.
    
    A more fair comparison would be with `join` because it also keeps different 
types and the composition will create tuple of tuples. But in this case I find 
it very easy and safe to unpack such tuples. It isn't ideal but better than 
`cogroup` in the same situation. Of course I wouldn't oppose to create a 
interface for joins with more elements.
    
    But I agree that we should really discuss this. If such operations won't 
get in main Spark, then external libraries (using implicits) will be created to 
handle such cases. I think it would be better if Spark could handle such cases 
without letting the user deal with boilerplate or resorting to external 
libraries.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to