Re: Map side join without broadcast
Hey there, I think it's overcomplicating the partitioning by explicitly specifying the partitioning when using the hash is the default behaviour of the partitioner in Spark. You could simply do a partitionBy and it would implement the hash partitioner by default. Let me know if I've misinterpreted the code. I think also using map after partitioning will also cause Spark to lose the partitioner. On Sun, 30 Jun 2019 at 20:56, jelmer wrote: > Does something like the code below make any sense or would there be a more > efficient way to do it ? > > val wordsOnOnePartition = input >> .map { word => Math.abs(word.id.hashCode) % numPartitions -> word } >> .partitionBy(new PartitionIdPassthrough(numPartitions)) >> val indices = wordsOnOnePartition >> .mapPartitions(it => new IndexIterator(it, m)) >> .cache() >> val wordsOnEachPartition = input >> .flatMap(word => 0 until numPartitions map { partition => partition >> -> word } ) >> .partitionBy(new PartitionIdPassthrough(numPartitions)) >> val nearest = indices.join(wordsOnEachPartition) >> .flatMap { case (_, (index, Word(word, vector))) => >> index.findNearest(vector, k + 1).collect { >> case SearchResult(Word(relatedWord, _), score) if relatedWord >> != word => >> RelatedItem(word, relatedWord, score) >> } >> .take(k) >> } >> val result = nearest.groupBy(_.word).map { case (word, relatedItems) >> => >> word +: relatedItems.toSeq >> .sortBy(_.similarity)(Ordering[Double].reverse) >> .map(_.relatedWord) >> .take(k) >> .mkString("\t") >> } >> > > I manually assign a partition to each word of a list of words, and > repartition the rdd by this partition key > > There i use mapPartitions to construct a partial index so i end up with > one index in each partition. > > Then i read the words again but this time assign every partition to each > word and join it on the indices rdd by partition key. So effectively every > index will be queries > > Finally i merge the results from each index into a single list keeping > only the most relevant items by doing a groupBy > > > > On Sun, 30 Jun 2019 at 01:45, Chris Teoh wrote: > >> The closest thing I can think of here is if you have both dataframes >> written out using buckets. Hive uses this technique for join optimisation >> such that both datasets of the same bucket are read by the same mapper to >> achieve map side joins. >> >> On Sat., 29 Jun. 2019, 9:10 pm jelmer, wrote: >> >>> I have 2 dataframes, >>> >>> Dataframe A which contains 1 element per partition that is gigabytes big >>> (an index) >>> >>> Dataframe B which is made up out of millions of small rows. >>> >>> I want to join B on A but i want all the work to be done on the >>> executors holding the partitions of dataframe A >>> >>> Is there a way to accomplish this without putting dataframe B in a >>> broadcast variable or doing a broadcast join ? >>> >>> -- Chris
Re: Map side join without broadcast
You can implement custom partitioner to do the bucketing. On Sun, Jun 30, 2019 at 5:15 AM Chris Teoh wrote: > The closest thing I can think of here is if you have both dataframes > written out using buckets. Hive uses this technique for join optimisation > such that both datasets of the same bucket are read by the same mapper to > achieve map side joins. > > On Sat., 29 Jun. 2019, 9:10 pm jelmer, wrote: > >> I have 2 dataframes, >> >> Dataframe A which contains 1 element per partition that is gigabytes big >> (an index) >> >> Dataframe B which is made up out of millions of small rows. >> >> I want to join B on A but i want all the work to be done on the executors >> holding the partitions of dataframe A >> >> Is there a way to accomplish this without putting dataframe B in a >> broadcast variable or doing a broadcast join ? >> >>
Re: Map side join without broadcast
Does something like the code below make any sense or would there be a more efficient way to do it ? val wordsOnOnePartition = input > .map { word => Math.abs(word.id.hashCode) % numPartitions -> word } > .partitionBy(new PartitionIdPassthrough(numPartitions)) > val indices = wordsOnOnePartition > .mapPartitions(it => new IndexIterator(it, m)) > .cache() > val wordsOnEachPartition = input > .flatMap(word => 0 until numPartitions map { partition => partition > -> word } ) > .partitionBy(new PartitionIdPassthrough(numPartitions)) > val nearest = indices.join(wordsOnEachPartition) > .flatMap { case (_, (index, Word(word, vector))) => > index.findNearest(vector, k + 1).collect { > case SearchResult(Word(relatedWord, _), score) if relatedWord != > word => > RelatedItem(word, relatedWord, score) > } > .take(k) > } > val result = nearest.groupBy(_.word).map { case (word, relatedItems) => > word +: relatedItems.toSeq > .sortBy(_.similarity)(Ordering[Double].reverse) > .map(_.relatedWord) > .take(k) > .mkString("\t") > } > I manually assign a partition to each word of a list of words, and repartition the rdd by this partition key There i use mapPartitions to construct a partial index so i end up with one index in each partition. Then i read the words again but this time assign every partition to each word and join it on the indices rdd by partition key. So effectively every index will be queries Finally i merge the results from each index into a single list keeping only the most relevant items by doing a groupBy On Sun, 30 Jun 2019 at 01:45, Chris Teoh wrote: > The closest thing I can think of here is if you have both dataframes > written out using buckets. Hive uses this technique for join optimisation > such that both datasets of the same bucket are read by the same mapper to > achieve map side joins. > > On Sat., 29 Jun. 2019, 9:10 pm jelmer, wrote: > >> I have 2 dataframes, >> >> Dataframe A which contains 1 element per partition that is gigabytes big >> (an index) >> >> Dataframe B which is made up out of millions of small rows. >> >> I want to join B on A but i want all the work to be done on the executors >> holding the partitions of dataframe A >> >> Is there a way to accomplish this without putting dataframe B in a >> broadcast variable or doing a broadcast join ? >> >>
Re: Map side join without broadcast
The closest thing I can think of here is if you have both dataframes written out using buckets. Hive uses this technique for join optimisation such that both datasets of the same bucket are read by the same mapper to achieve map side joins. On Sat., 29 Jun. 2019, 9:10 pm jelmer, wrote: > I have 2 dataframes, > > Dataframe A which contains 1 element per partition that is gigabytes big > (an index) > > Dataframe B which is made up out of millions of small rows. > > I want to join B on A but i want all the work to be done on the executors > holding the partitions of dataframe A > > Is there a way to accomplish this without putting dataframe B in a > broadcast variable or doing a broadcast join ? > >
Re: Map side join without broadcast
You can use coalesce(1) or repartition on B but it would be better to put A in cache so that it becomes available on all executors and as well as in memory because it contians on one row. On Sat, Jun 29, 2019 at 4:10 PM jelmer wrote: > I have 2 dataframes, > > Dataframe A which contains 1 element per partition that is gigabytes big > (an index) > > Dataframe B which is made up out of millions of small rows. > > I want to join B on A but i want all the work to be done on the executors > holding the partitions of dataframe A > > Is there a way to accomplish this without putting dataframe B in a > broadcast variable or doing a broadcast join ? > > -- Regards, Arbab Khalil Software Design Engineer
Map side join without broadcast
I have 2 dataframes, Dataframe A which contains 1 element per partition that is gigabytes big (an index) Dataframe B which is made up out of millions of small rows. I want to join B on A but i want all the work to be done on the executors holding the partitions of dataframe A Is there a way to accomplish this without putting dataframe B in a broadcast variable or doing a broadcast join ?