Maybe you can override HadoopRDD's compute method to do that?
On Mon, Oct 21, 2013 at 8:16 AM, Ameet Kini <[email protected]> wrote: > Right, except both my sequence files are large and so doing a "collect()" > and then broadcasting one of them would be costly. Since I have two large > sorted sequence files with a one-to-one relationship among the keys, I need > to perform the "merge" portion of a good old "sort-merge" join. And it is > actually a very simple merge, since each key is unique within the file. > > I was looking at the mapPartitions API: > def mapPartitions[U](f: (Iterator[T]) ⇒ Iterator[U], preservesPartitioning: > Boolean)(implicit arg0: ClassManifest[U]): > RDD<http://spark.incubator.apache.org/docs/latest/api/core/org/apache/spark/rdd/RDD.html> > [U] > > If somehow the function f has access to the underlying partition > information (e.g., HadoopPartition.inputSplit), then it could open a reader > on the actual hdfs file corresponding to that inputSplit, and manually do > the join. But looks like HadoopPartition is declared private. Is there some > other way to figure out which underlying HDFS file corresponds to the > partition being iterated upon in mapPartitions? > > Ameet > > > > > On Mon, Oct 21, 2013 at 12:54 AM, Reynold Xin <[email protected]> wrote: > >> How about the following: >> >> val smallFile = sc.sequenceFile(....).collect() >> val largeFile = sc.sequenceFile(...) >> >> val small = sc.broadcast(smallFile) >> largeFile.mapPartitions { iter => >> // build up a hash table for small. called it smallTable >> iter.filter(row => smallTable.contains(row.joinKey)).map { row => >> join smallTable.get(row.joinKey) with row itself >> } >> } >> >> >> >> >> On Fri, Oct 18, 2013 at 2:22 PM, Ameet Kini <[email protected]> wrote: >> >>> Forgot to add an important point. My sequence files are sorted (they're >>> actually Hadoop map files). Since they're sorted, it makes sense to do a >>> fetch at the partition-level of the inner sequence file. >>> >>> Thanks, >>> Ameet >>> >>> >>> On Fri, Oct 18, 2013 at 5:20 PM, Ameet Kini <[email protected]> wrote: >>> >>>> >>>> I've seen discussions where the suggestion is to do a map-side join, >>>> but haven't seen an example yet, and can certainly use one. I have two >>>> sequence files where the key is unique within each file, so the join is a >>>> one-to-one join, and can hence benefit from a map-side join. However both >>>> sequence files can be large, so reading one of them completely in the >>>> driver and broadcasting it out would be expensive. >>>> >>>> I don't think there is a map-side join implementation in Spark but >>>> earlier suggestions have been to write one using mapPartitions on one of >>>> the operands as the outer loop. If that is the case, how would I fetch the >>>> split corresponding to the keys in the outer's partition. I'd prefer to do >>>> a fetch-per-partition rather than a fetch-per-tuple. >>>> >>>> In any case, some feedback, and preferably, an example of a map-side >>>> join without broadcasting would help. >>>> >>>> Thanks, >>>> Ameet >>>> >>> >>> >> >
