Efficient algorithm for many-to-many reduce-side join?

2009-05-28 Thread Stuart White
I need to do a reduce-side join of two datasets.  It's a many-to-many
join; that is, each dataset can can multiple records with any given
key.

Every description of a reduce-side join I've seen involves
constructing your keys out of your mapper such that records from one
dataset will be presented to the reducers before records from the
second dataset.  I should hold on to the value from the one dataset
and remember it as I iterate across the values from the second
dataset.

This seems like it only works well for one-to-many joins (when one of
your datasets will only have a single record with any given key).
This scales well because you're only remembering one value.

In a many-to-many join, if you apply this same algorithm, you'll need
to remember all values from one dataset, which of course will be
problematic (and won't scale) when dealing with large datasets with
large numbers of records with the same keys.

Does an efficient algorithm exist for a many-to-many reduce-side join?


Re: Efficient algorithm for many-to-many reduce-side join?

2009-05-28 Thread Todd Lipcon
Hi Stuart,

It seems to me like you have a few options.

Option 1: Just use a lot of RAM. Unless you really expect many millions of
entries on both sides of the join, you might be able to get away with
buffering despite its inefficiency.

Option 2: Use LocalDirAllocator to find some local storage to spill all of
the left table's records to disk in a MapFile format. Then as you iterate
over the right table, do lookups in the MapFile. This is really the same as
option 1, except that you're using disk as an extension of RAM.

Option 3: Convert this to a map-side merge join. Basically what you need to
do is sort both tables by the join key, and partition them with the same
partitioner into the same number of columns. This way you have an equal
number of part-N files for both tables, and within each part-N file
they're ordered by join key. In each map task, you open both tableA/part-N
and tableB/part-N and do a sequential merge to perform the join. I believe
the CompositeInputFormat class helps with this, though I've never used it.

Option 4: Perform the join in several passes. Whichever table is smaller,
break into pieces that fit in RAM. Unless my relational algebra is off, A
JOIN B = A JOIN (B1 UNION B2) = (A JOIN B1 UNION A JOIN B2) if B = B1 UNION
B2.

Hope that helps
-Todd

On Thu, May 28, 2009 at 5:02 AM, Stuart White stuart.whi...@gmail.comwrote:

 I need to do a reduce-side join of two datasets.  It's a many-to-many
 join; that is, each dataset can can multiple records with any given
 key.

 Every description of a reduce-side join I've seen involves
 constructing your keys out of your mapper such that records from one
 dataset will be presented to the reducers before records from the
 second dataset.  I should hold on to the value from the one dataset
 and remember it as I iterate across the values from the second
 dataset.

 This seems like it only works well for one-to-many joins (when one of
 your datasets will only have a single record with any given key).
 This scales well because you're only remembering one value.

 In a many-to-many join, if you apply this same algorithm, you'll need
 to remember all values from one dataset, which of course will be
 problematic (and won't scale) when dealing with large datasets with
 large numbers of records with the same keys.

 Does an efficient algorithm exist for a many-to-many reduce-side join?



Re: Efficient algorithm for many-to-many reduce-side join?

2009-05-28 Thread Todd Lipcon
One last possible trick to consider:

If you were to subclass SequenceFileRecordReader, you'd have access to its
seek method, allowing you to rewind the reducer input. You could then
implement a block hash join with something like the following pseudocode:

ahash = new HashMapKey, Val();
while (i have ram available) {
  read a record
  if the record is from table B, break
  put the record into ahash
}
nextAPos = reader.getPos()

while (current record is an A record) {
  skip to next record
}
firstBPos = reader.getPos()

while (current record has current key) {
  read and join against ahash
  process joined result
}

if firstBPos  nextAPos {
  seek(nextAPos)
  go back to top
}


On Thu, May 28, 2009 at 8:05 AM, Todd Lipcon t...@cloudera.com wrote:

 Hi Stuart,

 It seems to me like you have a few options.

 Option 1: Just use a lot of RAM. Unless you really expect many millions of
 entries on both sides of the join, you might be able to get away with
 buffering despite its inefficiency.

 Option 2: Use LocalDirAllocator to find some local storage to spill all of
 the left table's records to disk in a MapFile format. Then as you iterate
 over the right table, do lookups in the MapFile. This is really the same as
 option 1, except that you're using disk as an extension of RAM.

 Option 3: Convert this to a map-side merge join. Basically what you need to
 do is sort both tables by the join key, and partition them with the same
 partitioner into the same number of columns. This way you have an equal
 number of part-N files for both tables, and within each part-N file
 they're ordered by join key. In each map task, you open both tableA/part-N
 and tableB/part-N and do a sequential merge to perform the join. I believe
 the CompositeInputFormat class helps with this, though I've never used it.

 Option 4: Perform the join in several passes. Whichever table is smaller,
 break into pieces that fit in RAM. Unless my relational algebra is off, A
 JOIN B = A JOIN (B1 UNION B2) = (A JOIN B1 UNION A JOIN B2) if B = B1 UNION
 B2.

 Hope that helps
 -Todd


 On Thu, May 28, 2009 at 5:02 AM, Stuart White stuart.whi...@gmail.comwrote:

 I need to do a reduce-side join of two datasets.  It's a many-to-many
 join; that is, each dataset can can multiple records with any given
 key.

 Every description of a reduce-side join I've seen involves
 constructing your keys out of your mapper such that records from one
 dataset will be presented to the reducers before records from the
 second dataset.  I should hold on to the value from the one dataset
 and remember it as I iterate across the values from the second
 dataset.

 This seems like it only works well for one-to-many joins (when one of
 your datasets will only have a single record with any given key).
 This scales well because you're only remembering one value.

 In a many-to-many join, if you apply this same algorithm, you'll need
 to remember all values from one dataset, which of course will be
 problematic (and won't scale) when dealing with large datasets with
 large numbers of records with the same keys.

 Does an efficient algorithm exist for a many-to-many reduce-side join?





Re: Efficient algorithm for many-to-many reduce-side join?

2009-05-28 Thread Chris K Wensel
I believe PIG, and I know Cascading use a kind of 'spillable' list  
that can be re-iterated across. PIG's version is a bit more  
sophisticated last I looked.


that said, if you were using either one of them, you wouldn't need to  
write your own many-to-many join.


cheers,
ckw

On May 28, 2009, at 8:14 AM, Todd Lipcon wrote:


One last possible trick to consider:

If you were to subclass SequenceFileRecordReader, you'd have access  
to its

seek method, allowing you to rewind the reducer input. You could then
implement a block hash join with something like the following  
pseudocode:


ahash = new HashMapKey, Val();
while (i have ram available) {
 read a record
 if the record is from table B, break
 put the record into ahash
}
nextAPos = reader.getPos()

while (current record is an A record) {
 skip to next record
}
firstBPos = reader.getPos()

while (current record has current key) {
 read and join against ahash
 process joined result
}

if firstBPos  nextAPos {
 seek(nextAPos)
 go back to top
}


On Thu, May 28, 2009 at 8:05 AM, Todd Lipcon t...@cloudera.com  
wrote:



Hi Stuart,

It seems to me like you have a few options.

Option 1: Just use a lot of RAM. Unless you really expect many  
millions of

entries on both sides of the join, you might be able to get away with
buffering despite its inefficiency.

Option 2: Use LocalDirAllocator to find some local storage to spill  
all of
the left table's records to disk in a MapFile format. Then as you  
iterate
over the right table, do lookups in the MapFile. This is really the  
same as

option 1, except that you're using disk as an extension of RAM.

Option 3: Convert this to a map-side merge join. Basically what you  
need to
do is sort both tables by the join key, and partition them with the  
same
partitioner into the same number of columns. This way you have an  
equal
number of part-N files for both tables, and within each part- 
N file
they're ordered by join key. In each map task, you open both tableA/ 
part-N
and tableB/part-N and do a sequential merge to perform the join. I  
believe
the CompositeInputFormat class helps with this, though I've never  
used it.


Option 4: Perform the join in several passes. Whichever table is  
smaller,
break into pieces that fit in RAM. Unless my relational algebra is  
off, A
JOIN B = A JOIN (B1 UNION B2) = (A JOIN B1 UNION A JOIN B2) if B =  
B1 UNION

B2.

Hope that helps
-Todd


On Thu, May 28, 2009 at 5:02 AM, Stuart White stuart.whi...@gmail.com 
wrote:


I need to do a reduce-side join of two datasets.  It's a many-to- 
many

join; that is, each dataset can can multiple records with any given
key.

Every description of a reduce-side join I've seen involves
constructing your keys out of your mapper such that records from one
dataset will be presented to the reducers before records from the
second dataset.  I should hold on to the value from the one  
dataset

and remember it as I iterate across the values from the second
dataset.

This seems like it only works well for one-to-many joins (when one  
of

your datasets will only have a single record with any given key).
This scales well because you're only remembering one value.

In a many-to-many join, if you apply this same algorithm, you'll  
need

to remember all values from one dataset, which of course will be
problematic (and won't scale) when dealing with large datasets with
large numbers of records with the same keys.

Does an efficient algorithm exist for a many-to-many reduce-side  
join?







--
Chris K Wensel
ch...@concurrentinc.com
http://www.concurrentinc.com



Re: Efficient algorithm for many-to-many reduce-side join?

2009-05-28 Thread jason hadoop
Use the mapside join stuff, if I understand your problem it provides a good
solution but requires getting over the learning hurdle.
Well described in chapter 8 of my book :)


On Thu, May 28, 2009 at 8:29 AM, Chris K Wensel ch...@wensel.net wrote:

 I believe PIG, and I know Cascading use a kind of 'spillable' list that can
 be re-iterated across. PIG's version is a bit more sophisticated last I
 looked.

 that said, if you were using either one of them, you wouldn't need to write
 your own many-to-many join.

 cheers,
 ckw


 On May 28, 2009, at 8:14 AM, Todd Lipcon wrote:

  One last possible trick to consider:

 If you were to subclass SequenceFileRecordReader, you'd have access to its
 seek method, allowing you to rewind the reducer input. You could then
 implement a block hash join with something like the following pseudocode:

 ahash = new HashMapKey, Val();
 while (i have ram available) {
  read a record
  if the record is from table B, break
  put the record into ahash
 }
 nextAPos = reader.getPos()

 while (current record is an A record) {
  skip to next record
 }
 firstBPos = reader.getPos()

 while (current record has current key) {
  read and join against ahash
  process joined result
 }

 if firstBPos  nextAPos {
  seek(nextAPos)
  go back to top
 }


 On Thu, May 28, 2009 at 8:05 AM, Todd Lipcon t...@cloudera.com wrote:

  Hi Stuart,

 It seems to me like you have a few options.

 Option 1: Just use a lot of RAM. Unless you really expect many millions
 of
 entries on both sides of the join, you might be able to get away with
 buffering despite its inefficiency.

 Option 2: Use LocalDirAllocator to find some local storage to spill all
 of
 the left table's records to disk in a MapFile format. Then as you iterate
 over the right table, do lookups in the MapFile. This is really the same
 as
 option 1, except that you're using disk as an extension of RAM.

 Option 3: Convert this to a map-side merge join. Basically what you need
 to
 do is sort both tables by the join key, and partition them with the same
 partitioner into the same number of columns. This way you have an equal
 number of part-N files for both tables, and within each part-N
 file
 they're ordered by join key. In each map task, you open both
 tableA/part-N
 and tableB/part-N and do a sequential merge to perform the join. I
 believe
 the CompositeInputFormat class helps with this, though I've never used
 it.

 Option 4: Perform the join in several passes. Whichever table is smaller,
 break into pieces that fit in RAM. Unless my relational algebra is off, A
 JOIN B = A JOIN (B1 UNION B2) = (A JOIN B1 UNION A JOIN B2) if B = B1
 UNION
 B2.

 Hope that helps
 -Todd


 On Thu, May 28, 2009 at 5:02 AM, Stuart White stuart.whi...@gmail.com
 wrote:

  I need to do a reduce-side join of two datasets.  It's a many-to-many
 join; that is, each dataset can can multiple records with any given
 key.

 Every description of a reduce-side join I've seen involves
 constructing your keys out of your mapper such that records from one
 dataset will be presented to the reducers before records from the
 second dataset.  I should hold on to the value from the one dataset
 and remember it as I iterate across the values from the second
 dataset.

 This seems like it only works well for one-to-many joins (when one of
 your datasets will only have a single record with any given key).
 This scales well because you're only remembering one value.

 In a many-to-many join, if you apply this same algorithm, you'll need
 to remember all values from one dataset, which of course will be
 problematic (and won't scale) when dealing with large datasets with
 large numbers of records with the same keys.

 Does an efficient algorithm exist for a many-to-many reduce-side join?




 --
 Chris K Wensel
 ch...@concurrentinc.com
 http://www.concurrentinc.com




-- 
Alpha Chapters of my book on Hadoop are available
http://www.apress.com/book/view/9781430219422
www.prohadoopbook.com a community for Hadoop Professionals