Efficient algorithm for many-to-many reduce-side join?
I need to do a reduce-side join of two datasets. It's a many-to-many join; that is, each dataset can can multiple records with any given key. Every description of a reduce-side join I've seen involves constructing your keys out of your mapper such that records from one dataset will be presented to the reducers before records from the second dataset. I should hold on to the value from the one dataset and remember it as I iterate across the values from the second dataset. This seems like it only works well for one-to-many joins (when one of your datasets will only have a single record with any given key). This scales well because you're only remembering one value. In a many-to-many join, if you apply this same algorithm, you'll need to remember all values from one dataset, which of course will be problematic (and won't scale) when dealing with large datasets with large numbers of records with the same keys. Does an efficient algorithm exist for a many-to-many reduce-side join?
Re: Efficient algorithm for many-to-many reduce-side join?
Hi Stuart, It seems to me like you have a few options. Option 1: Just use a lot of RAM. Unless you really expect many millions of entries on both sides of the join, you might be able to get away with buffering despite its inefficiency. Option 2: Use LocalDirAllocator to find some local storage to spill all of the left table's records to disk in a MapFile format. Then as you iterate over the right table, do lookups in the MapFile. This is really the same as option 1, except that you're using disk as an extension of RAM. Option 3: Convert this to a map-side merge join. Basically what you need to do is sort both tables by the join key, and partition them with the same partitioner into the same number of columns. This way you have an equal number of part-N files for both tables, and within each part-N file they're ordered by join key. In each map task, you open both tableA/part-N and tableB/part-N and do a sequential merge to perform the join. I believe the CompositeInputFormat class helps with this, though I've never used it. Option 4: Perform the join in several passes. Whichever table is smaller, break into pieces that fit in RAM. Unless my relational algebra is off, A JOIN B = A JOIN (B1 UNION B2) = (A JOIN B1 UNION A JOIN B2) if B = B1 UNION B2. Hope that helps -Todd On Thu, May 28, 2009 at 5:02 AM, Stuart White stuart.whi...@gmail.comwrote: I need to do a reduce-side join of two datasets. It's a many-to-many join; that is, each dataset can can multiple records with any given key. Every description of a reduce-side join I've seen involves constructing your keys out of your mapper such that records from one dataset will be presented to the reducers before records from the second dataset. I should hold on to the value from the one dataset and remember it as I iterate across the values from the second dataset. This seems like it only works well for one-to-many joins (when one of your datasets will only have a single record with any given key). This scales well because you're only remembering one value. In a many-to-many join, if you apply this same algorithm, you'll need to remember all values from one dataset, which of course will be problematic (and won't scale) when dealing with large datasets with large numbers of records with the same keys. Does an efficient algorithm exist for a many-to-many reduce-side join?
Re: Efficient algorithm for many-to-many reduce-side join?
One last possible trick to consider: If you were to subclass SequenceFileRecordReader, you'd have access to its seek method, allowing you to rewind the reducer input. You could then implement a block hash join with something like the following pseudocode: ahash = new HashMapKey, Val(); while (i have ram available) { read a record if the record is from table B, break put the record into ahash } nextAPos = reader.getPos() while (current record is an A record) { skip to next record } firstBPos = reader.getPos() while (current record has current key) { read and join against ahash process joined result } if firstBPos nextAPos { seek(nextAPos) go back to top } On Thu, May 28, 2009 at 8:05 AM, Todd Lipcon t...@cloudera.com wrote: Hi Stuart, It seems to me like you have a few options. Option 1: Just use a lot of RAM. Unless you really expect many millions of entries on both sides of the join, you might be able to get away with buffering despite its inefficiency. Option 2: Use LocalDirAllocator to find some local storage to spill all of the left table's records to disk in a MapFile format. Then as you iterate over the right table, do lookups in the MapFile. This is really the same as option 1, except that you're using disk as an extension of RAM. Option 3: Convert this to a map-side merge join. Basically what you need to do is sort both tables by the join key, and partition them with the same partitioner into the same number of columns. This way you have an equal number of part-N files for both tables, and within each part-N file they're ordered by join key. In each map task, you open both tableA/part-N and tableB/part-N and do a sequential merge to perform the join. I believe the CompositeInputFormat class helps with this, though I've never used it. Option 4: Perform the join in several passes. Whichever table is smaller, break into pieces that fit in RAM. Unless my relational algebra is off, A JOIN B = A JOIN (B1 UNION B2) = (A JOIN B1 UNION A JOIN B2) if B = B1 UNION B2. Hope that helps -Todd On Thu, May 28, 2009 at 5:02 AM, Stuart White stuart.whi...@gmail.comwrote: I need to do a reduce-side join of two datasets. It's a many-to-many join; that is, each dataset can can multiple records with any given key. Every description of a reduce-side join I've seen involves constructing your keys out of your mapper such that records from one dataset will be presented to the reducers before records from the second dataset. I should hold on to the value from the one dataset and remember it as I iterate across the values from the second dataset. This seems like it only works well for one-to-many joins (when one of your datasets will only have a single record with any given key). This scales well because you're only remembering one value. In a many-to-many join, if you apply this same algorithm, you'll need to remember all values from one dataset, which of course will be problematic (and won't scale) when dealing with large datasets with large numbers of records with the same keys. Does an efficient algorithm exist for a many-to-many reduce-side join?
Re: Efficient algorithm for many-to-many reduce-side join?
I believe PIG, and I know Cascading use a kind of 'spillable' list that can be re-iterated across. PIG's version is a bit more sophisticated last I looked. that said, if you were using either one of them, you wouldn't need to write your own many-to-many join. cheers, ckw On May 28, 2009, at 8:14 AM, Todd Lipcon wrote: One last possible trick to consider: If you were to subclass SequenceFileRecordReader, you'd have access to its seek method, allowing you to rewind the reducer input. You could then implement a block hash join with something like the following pseudocode: ahash = new HashMapKey, Val(); while (i have ram available) { read a record if the record is from table B, break put the record into ahash } nextAPos = reader.getPos() while (current record is an A record) { skip to next record } firstBPos = reader.getPos() while (current record has current key) { read and join against ahash process joined result } if firstBPos nextAPos { seek(nextAPos) go back to top } On Thu, May 28, 2009 at 8:05 AM, Todd Lipcon t...@cloudera.com wrote: Hi Stuart, It seems to me like you have a few options. Option 1: Just use a lot of RAM. Unless you really expect many millions of entries on both sides of the join, you might be able to get away with buffering despite its inefficiency. Option 2: Use LocalDirAllocator to find some local storage to spill all of the left table's records to disk in a MapFile format. Then as you iterate over the right table, do lookups in the MapFile. This is really the same as option 1, except that you're using disk as an extension of RAM. Option 3: Convert this to a map-side merge join. Basically what you need to do is sort both tables by the join key, and partition them with the same partitioner into the same number of columns. This way you have an equal number of part-N files for both tables, and within each part- N file they're ordered by join key. In each map task, you open both tableA/ part-N and tableB/part-N and do a sequential merge to perform the join. I believe the CompositeInputFormat class helps with this, though I've never used it. Option 4: Perform the join in several passes. Whichever table is smaller, break into pieces that fit in RAM. Unless my relational algebra is off, A JOIN B = A JOIN (B1 UNION B2) = (A JOIN B1 UNION A JOIN B2) if B = B1 UNION B2. Hope that helps -Todd On Thu, May 28, 2009 at 5:02 AM, Stuart White stuart.whi...@gmail.com wrote: I need to do a reduce-side join of two datasets. It's a many-to- many join; that is, each dataset can can multiple records with any given key. Every description of a reduce-side join I've seen involves constructing your keys out of your mapper such that records from one dataset will be presented to the reducers before records from the second dataset. I should hold on to the value from the one dataset and remember it as I iterate across the values from the second dataset. This seems like it only works well for one-to-many joins (when one of your datasets will only have a single record with any given key). This scales well because you're only remembering one value. In a many-to-many join, if you apply this same algorithm, you'll need to remember all values from one dataset, which of course will be problematic (and won't scale) when dealing with large datasets with large numbers of records with the same keys. Does an efficient algorithm exist for a many-to-many reduce-side join? -- Chris K Wensel ch...@concurrentinc.com http://www.concurrentinc.com
Re: Efficient algorithm for many-to-many reduce-side join?
Use the mapside join stuff, if I understand your problem it provides a good solution but requires getting over the learning hurdle. Well described in chapter 8 of my book :) On Thu, May 28, 2009 at 8:29 AM, Chris K Wensel ch...@wensel.net wrote: I believe PIG, and I know Cascading use a kind of 'spillable' list that can be re-iterated across. PIG's version is a bit more sophisticated last I looked. that said, if you were using either one of them, you wouldn't need to write your own many-to-many join. cheers, ckw On May 28, 2009, at 8:14 AM, Todd Lipcon wrote: One last possible trick to consider: If you were to subclass SequenceFileRecordReader, you'd have access to its seek method, allowing you to rewind the reducer input. You could then implement a block hash join with something like the following pseudocode: ahash = new HashMapKey, Val(); while (i have ram available) { read a record if the record is from table B, break put the record into ahash } nextAPos = reader.getPos() while (current record is an A record) { skip to next record } firstBPos = reader.getPos() while (current record has current key) { read and join against ahash process joined result } if firstBPos nextAPos { seek(nextAPos) go back to top } On Thu, May 28, 2009 at 8:05 AM, Todd Lipcon t...@cloudera.com wrote: Hi Stuart, It seems to me like you have a few options. Option 1: Just use a lot of RAM. Unless you really expect many millions of entries on both sides of the join, you might be able to get away with buffering despite its inefficiency. Option 2: Use LocalDirAllocator to find some local storage to spill all of the left table's records to disk in a MapFile format. Then as you iterate over the right table, do lookups in the MapFile. This is really the same as option 1, except that you're using disk as an extension of RAM. Option 3: Convert this to a map-side merge join. Basically what you need to do is sort both tables by the join key, and partition them with the same partitioner into the same number of columns. This way you have an equal number of part-N files for both tables, and within each part-N file they're ordered by join key. In each map task, you open both tableA/part-N and tableB/part-N and do a sequential merge to perform the join. I believe the CompositeInputFormat class helps with this, though I've never used it. Option 4: Perform the join in several passes. Whichever table is smaller, break into pieces that fit in RAM. Unless my relational algebra is off, A JOIN B = A JOIN (B1 UNION B2) = (A JOIN B1 UNION A JOIN B2) if B = B1 UNION B2. Hope that helps -Todd On Thu, May 28, 2009 at 5:02 AM, Stuart White stuart.whi...@gmail.com wrote: I need to do a reduce-side join of two datasets. It's a many-to-many join; that is, each dataset can can multiple records with any given key. Every description of a reduce-side join I've seen involves constructing your keys out of your mapper such that records from one dataset will be presented to the reducers before records from the second dataset. I should hold on to the value from the one dataset and remember it as I iterate across the values from the second dataset. This seems like it only works well for one-to-many joins (when one of your datasets will only have a single record with any given key). This scales well because you're only remembering one value. In a many-to-many join, if you apply this same algorithm, you'll need to remember all values from one dataset, which of course will be problematic (and won't scale) when dealing with large datasets with large numbers of records with the same keys. Does an efficient algorithm exist for a many-to-many reduce-side join? -- Chris K Wensel ch...@concurrentinc.com http://www.concurrentinc.com -- Alpha Chapters of my book on Hadoop are available http://www.apress.com/book/view/9781430219422 www.prohadoopbook.com a community for Hadoop Professionals