Exactly!
Thanks Dave for a much better explanation than mine!

-eran



On Thu, Jun 9, 2011 at 00:35, Dave Latham <[email protected]> wrote:

> I believe this is what Eran is suggesting:
>
> Table A
> -------
> Row1 (has joinVal_1)
> Row2 (has joinVal_2)
> Row3 (has joinVal_1)
>
> Table B
> -------
> Row4 (has joinVal_1)
> Row5 (has joinVal_3)
> Row6 (has joinVal_2)
>
> Mapper receives a list of input rows (union of both input tables in any
> order), and produces (=>) intermediate key, value pairs, where the key is
> the join field, and the value is whatever portion of the row you want
> available in your output
>
> Map
> ----------
> A, Row1 => (joinVal_1, [A,Row1])
> A, Row2 => (joinVal_2, [A,Row2])
> A, Row3 => (joinVal_1, [A,Row3])
> B, Row4 => (joinVal_1, [B,Row4])
> B, Row5 => (joinVal_3, [B,Row5])
> B, Row6 => (joinVal_2, [B,Row6])
>
> Shuffle phase partitions and sorts by the map output key (which is the join
> value)
> The Reduce phase then gets a key for the join value and a list of values
> containing all of the input rows (from either table) with that join value.
> It can then perform whatever operations you want (like enumerate the subset
> of the Cartesian product for that join value)
>
> Reduce
> ------------
> joinVal_1, {[A,Row1], [A,Row3], [B,Row4]} => Row1 x Row4, Row3 x Row4
> joinVal_2, {[A,Row2], [B,Row6]} => Row2 x Row6
> joinVal_3, {[A,Row3]} => {}
>
> This sounds effective to me, so long as you can perform any desired
> operations on the all the rows matching a single join value via a single
> iteration through the stream of reduce input values (for example, if the
> set
> of data for each join value fits in memory).  Otherwise you'd need to put
> the list of matches from table A some place that you can iterate over it
> again for each match in table B.
>
> Dave
>
> On Wed, Jun 8, 2011 at 1:45 PM, Buttler, David <[email protected]> wrote:
>
> > Let's make a toy example to see if we can capture all of the edge
> > conditions:
> > Table A
> > -------
> > Key1 joinVal_1
> > Key2 joinVal_2
> > Key3 joinVal_1
> >
> > Table B
> > -------
> > Key4 joinVal_1
> > Key5 joinVal_3
> > Key6 joinVal_2
> >
> > Now, assume that we have a mapper that takes two values, one row from A,
> > and one row from B.  Are you suggesting that we get the following map
> calls:
> > Key1 & key4
> > Key2 & key5
> > Key3 & key6
> >
> > Or are you suggesting we get the following:
> > Key1 & key4
> > Key1 & key5
> > Key1 & key6
> > Key2 & key4
> > Key2 & key5
> > Key2 & key6
> > Key3 & key4
> > Key3 & key5
> > Key3 & key6
> >
> > Or are you suggesting something different?
> >
> > Dave
> >
> > -----Original Message-----
> > From: [email protected] [mailto:[email protected]] On Behalf Of Eran
> > Kutner
> > Sent: Wednesday, June 08, 2011 11:47 AM
> > To: [email protected]
> > Subject: Re: How to efficiently join HBase tables?
> >
> > I'd like to clarify, again what I'm trying to do and why I still think
> it's
> > the best way to do it.
> > I want to join two large tables, I'm assuming, and this is the key to the
> > efficiency of this method, that: 1) I'm getting a lot of data from table
> A,
> > something which is close enough top a full table scan, and 2) this
> implies
> > that I will need to join with most of table B as well.
> > All the suggestions from the SQL world are doing lookups, one way or
> > another
> > in table B. My suggestion is to use the power of the shuffle phase to do
> > the
> > join. It is obviously doable, so I don't understand the statement that it
> > can't be done.
> > So to go over it again:
> > 1. You feed all the rows from table A and B into the mapper.
> > 2. For each row, the mapper should output a new row with a key
> constructed
> > from the join fields and a value which is the row itself (same as the
> input
> > value it got).
> > 3. The shuffle phase will make sure all rows with the same values in the
> > join fields will end up together.
> > 4. The reducer will get all the rows for a single set of join field
> values
> > together and perform the actual join. The reducer can be programmed to do
> > an
> > inner or outer join at this point.
> >
> > I can't prove it without actually writing and testing it but I have a
> > strong
> > feeling this will be much more efficient for large joins than any form of
> > lookup.
> >
> > -eran
> >
> >
> >
> > On Wed, Jun 8, 2011 at 16:01, Doug Meil <[email protected]
> > >wrote:
> >
> > >
> > > Re: " With respect to Doug's posts, you can't do a multi-get off the
> bat"
> > >
> > > That's an assumption, but you're entitled to your opinion.
> > >
> > > -----Original Message-----
> > > From: Michael Segel [mailto:[email protected]]
> > > Sent: Monday, June 06, 2011 10:08 PM
> > > To: [email protected]
> > > Subject: RE: How to efficiently join HBase tables?
> > >
> > >
> > > Well....
> > >
> > > David, is correct.
> > >
> > > Eran wanted to do a join which is a relational concept that isn't
> > natively
> > > supported by a NoSQL database. A better model would be a hierarchical
> > model
> > > like Dick Pick's Revelation. (Univers aka U2 from
> Ardent/Informix/IBM/now
> > > JRockit?).
> > > And yes, we're looking back 40 some odd years in to either a merge/sort
> > > solution or how databases do a relational join. :-)
> > >
> > > Eran wants to do this in a single m/r job. The short answer is you
> can't.
> > >  Longer answer is that if your main class implements Tool Runner, you
> can
> > > launch two jobs in parallel to get your subsets, and then when they
> both
> > > complete, you run the join job on them. So I guess its a single 'job'
> or
> > > rather app. :-)
> > >
> > > With respect to Doug's posts, you can't do a multi-get off the bat
> > because
> > > in the general case you're not fetching based on the row key but a
> column
> > > which is not part of the row key. (It could be a foreign key which
> would
> > > mean that at least one of your table fetches will be off the row key
> but
> > you
> > > can't guarantee it.)
> > >
> > > So if you don't want to use temp tables, then you have to put your
> > results
> > > in a sorted order, and you still want to get the unique set of the
> > join-keys
> > > which means you have to run a reduce job. Then you can use the unique
> key
> > > set and then do the scans. (You can't do a multi-get because you're
> doing
> > a
> > > scan with a start and stop row(s).)
> > >
> > > The reason I suggest that if you're going to do a join operation, you
> > want
> > > to use temp tables because it makes your life easier and probably
> faster
> > > too.
> > >
> > > Bottom line... I guess many data architects are going to need rethink
> > their
> > > data models when working on big data. :-)
> > >
> > > -Mike
> > >
> > > PS. If I get a spare moment, I may code this up...
> > >
> > >
> > > > From: [email protected]
> > > > To: [email protected]
> > > > Date: Mon, 6 Jun 2011 17:19:44 -0400
> > > > Subject: RE: How to efficiently join HBase tables?
> > > >
> > > > Re:  " So, you all realize the joins have been talked about in the
> > > database community for 40 years?"
> > > >
> > > > Great point.  What's old is new!    :-)
> > > >
> > > > My suggested from earlier in the thread was a variant of nested loops
> > by
> > > using multi-get in HTable, which would reduce the number of RPC calls.
> >  So
> > > it's a "bulk-select nested loops" of sorts (i.e., as opposed to the
> > 1-by-1
> > > lookup of regular nested loops).
> > > >
> > > >
> > > > -----Original Message-----
> > > > From: Buttler, David [mailto:[email protected]]
> > > > Sent: Monday, June 06, 2011 4:30 PM
> > > > To: [email protected]
> > > > Subject: RE: How to efficiently join HBase tables?
> > > >
> > > > So, you all realize the joins have been talked about in the database
> > > community for 40 years?  There are two main types of joins:
> > > > Nested loops
> > > > Hash table
> > > >
> > > > Mike, in his various emails seems to be trying to re-imagine how to
> > > implement both types of joins in HBase (which seems like a reasonable
> > goal).
> > > I am not exactly sure what Eran is going for here, but it seems like
> Eran
> > is
> > > glossing over a piece.  If you have two scanners for table A and B,
> then
> > > table B needs to be rescanned for every unique part of the join
> condition
> > in
> > > table A.  There are certain ways of improving the efficiency of that:
> the
> > > two most obvious are pushing the selection criteria down to the scans,
> > and
> > > scanning all of the same join values from table B at the same time
> (which
> > > requires that Table B's key is the join, or a secondary structure that
> > > stores the join values as the primary order).
> > > >
> > > > Dave
> > > >
> > > > -----Original Message-----
> > > > From: [email protected] [mailto:[email protected]] On Behalf Of
> Eran
> > > > Kutner
> > > > Sent: Friday, June 03, 2011 12:24 AM
> > > > To: [email protected]
> > > > Subject: Re: How to efficiently join HBase tables?
> > > >
> > > > Mike, this more or less what I tried to  describe in my initial post,
> > > only you explained it much better.
> > > > The problem is that I want to do all of this in one M/R run, not 3
> and
> > > without explicit temp tables. If there was only a way to feed both
> table
> > A
> > > and table B into the M/R job then it could be done.
> > > >
> > > > Let's take your query and assumptions, for example.
> > > > So we configure scanner A to return rows where c=xxx and d=yyy We
> then
> > > configure scanner B to return rows where e=zzz Now we feed all those
> rows
> > to
> > > the mapper.
> > > > For each row the mapper gets it outputs a new key which is "a|b" and
> > the
> > > same value it received, if either one doesn't exist in the row the
> mapper
> > > doesn't output anything for that row.
> > > > The is an implicit "temp table" created at this stage by hadoop.
> > > > Now the reducer is run, for every key "a|b" generated by the mapper
> it
> > > would get one or more value sets, each one representing a row from the
> > > original two tables. For simplicity lets assume we got two rows, one
> from
> > > table A the other from table B. Now the reducer can combine the two
> rows
> > and
> > > output the combined row. This will work just the same if there were
> > multiple
> > > rows from each table with the same "a|b" key, in that case the reducer
> > would
> > > have to generate the Cartesian product of all the rows. Outer joins can
> > also
> > > be done this way, in an outer join you only get one row in the reducer
> > for a
> > > given "a|b" key but still generate an output.
> > > >
> > > > -eran
> > > >
> > > >
> > > >
> > > > On Fri, Jun 3, 2011 at 00:05, Michael Segel <
> [email protected]
> > > >wrote:
> > > >
> > > > >
> > > > > Not to beat a dead horse, but I thought a bit more about the
> problem.
> > > > > If you want to do this all in HBase using a M/R job...
> > > > >
> > > > > Lets define the following:
> > > > > SELECT *
> > > > > FROM A, B
> > > > > WHERE A.a = B.a
> > > > > AND     A.b = B.b
> > > > > AND     A.c = xxx
> > > > > AND     A.d = yyy
> > > > > AND     B.e = zzz
> > > > >
> > > > > Is the sample query.
> > > > >
> > > > > So our join key is "a|b" because we're matching on columns a and b.
> > > > > (The pipe is to delimit the columns, assuming the columns
> themselves
> > > > > don't contain pipes...)
> > > > >
> > > > > Our filters on A are c and d while e is the filter on B.
> > > > >
> > > > > So we want to do the following:
> > > > >
> > > > > M/R Map job 1 gets the subset from table A along with a set of
> unique
> > > keys.
> > > > > M/R Map job 2 gets the subset from table B along with a set of
> unique
> > > keys.
> > > > > M/R Map job 3 takes either set of unique keys as the input list and
> > > > > you split it based on the number of parallel mappers you want to
> use.
> > > > >
> > > > > You have a couple of options on how you want to proceed.
> > > > > In each Mapper.map() your input is a unique key.
> > > > > I guess you could create two scanners, one for tempTableA, and one
> > > > > for tempTableB.
> > > > > It looks like you can get the iterator for each result set, and
> then
> > > > > for each row in temp table A, you iterate through the result set
> > > > > from temp table B, writing out the joined set.
> > > > >
> > > > > The only problem is that your result set file isn't in sort order.
> > > > > So I guess you could take the output from this job and reduce it to
> > > > > get it in to sort order.
> > > > >
> > > > > Option B. Using HDFS files for temp 'tables'.
> > > > > You can do this... but you would still have to track the unique
> keys
> > > > > and also sort both the keys and the files which will require a
> reduce
> > > job.
> > > > >
> > > > >
> > > > > Now this is just my opinion, but if I use HBase, I don't have to
> > > > > worry about using a reducer except to order the final output set.
> > > > > So I can save the time it takes to do the reduce step. So I have to
> > > ask...
> > > > > how much time is spent by HBase in splitting and compacting the
> temp
> > > tables?
> > > > > Also can't you pre-split the temp table before you use them?
> > > > >
> > > > > Or am I still missing something?
> > > > >
> > > > > Note: In this example, you'd have to write an input format that
> > > > > takes a java list object (or something similar) as your input and
> > > > > then you can split it to get it to run in parallel.
> > > > > Or you could just write this on the client and split the list up
> and
> > > > > run the join in parallel threads on the client node. Or a single
> > > > > thread which would mean that it would run and output in sort order.
> > > > >
> > > > > HTH
> > > > >
> > > > > -Mike
> > > > >
> > > > > > Date: Wed, 1 Jun 2011 07:47:30 -0700
> > > > > > Subject: Re: How to efficiently join HBase tables?
> > > > > > From: [email protected]
> > > > > > To: [email protected]
> > > > > >
> > > > > > > you somehow need to flush all in-memory data *and* perform a
> > > > > > > major compaction
> > > > > >
> > > > > > This makes sense.  Without compaction the linear HDFS scan isn't
> > > > > > possible.  I suppose one could compact 'offline' in a different
> > > > > > Map Reduce job.  However that would have it's own issues.
> > > > > >
> > > > > > > The files do have a flag if they were made by a major
> > > > > > > compaction, so you scan only those and ignore the newer ones -
> > > > > > > but then you are
> > > > > trailing
> > > > > >
> > > > > > This could be ok in many cases.  The key would be to create a
> > > > > > sync'd cut off point enabling a frozen point-in-time 'view' of
> the
> > > data.
> > > > > > I'm not sure how that would be implemented.
> > > > > >
> > > > > > On Wed, Jun 1, 2011 at 6:54 AM, Lars George
> > > > > > <[email protected]>
> > > > > wrote:
> > > > > > > Hi Jason,
> > > > > > >
> > > > > > > This was discussed in the past, using the HFileInputFormat. The
> > > > > > > issue is that you somehow need to flush all in-memory data
> *and*
> > > > > > > perform a major compaction - or else you would need all the
> > > > > > > logic of the ColumnTracker in the HFIF. Since that needs to
> scan
> > > > > > > all storage files in parallel to achieve its job, the MR task
> > > > > > > would not really be able to use the same approach.
> > > > > > >
> > > > > > > Running a major compaction creates a lot of churn, so it is
> > > > > > > questionable what the outcome is. The files do have a flag if
> > > > > > > they were made by a major compaction, so you scan only those
> and
> > > > > > > ignore the newer ones - but then you are trailing, and you
> still
> > > > > > > do not handle delete markers/updates in newer files. No easy
> > feat.
> > > > > > >
> > > > > > > Lars
> > > > > > >
> > > > > > > On Wed, Jun 1, 2011 at 2:41 AM, Jason Rutherglen
> > > > > > > <[email protected]> wrote:
> > > > > > >>> I'd imagine that join operations do not require
> realtime-ness,
> > > > > > >>> and so faster batch jobs using Hive -> frozen HBase files in
> > > > > > >>> HDFS could be the optimal way to go?
> > > > > > >>
> > > > > > >> In addition to lessening the load on the perhaps live
> > > RegionServer.
> > > > > > >> There's no Jira for this, I'm tempted to open one.
> > > > > > >>
> > > > > > >> On Tue, May 31, 2011 at 5:18 PM, Jason Rutherglen
> > > > > > >> <[email protected]> wrote:
> > > > > > >>>> The Hive-HBase integration allows you to create Hive tables
> > > > > > >>>> that are
> > > > > backed
> > > > > > >>>> by HBase
> > > > > > >>>
> > > > > > >>> In addition, HBase can be made to go faster for MapReduce
> > > > > > >>> jobs, if
> > > > > the
> > > > > > >>> HFile's could be used directly in HDFS, rather than proxying
> > > > > > >>> through the RegionServer.
> > > > > > >>>
> > > > > > >>> I'd imagine that join operations do not require
> realtime-ness,
> > > > > > >>> and so faster batch jobs using Hive -> frozen HBase files in
> > > > > > >>> HDFS could be the optimal way to go?
> > > > > > >>>
> > > > > > >>> On Tue, May 31, 2011 at 1:41 PM, Patrick Angeles <
> > > > > [email protected]> wrote:
> > > > > > >>>> On Tue, May 31, 2011 at 3:19 PM, Eran Kutner <
> [email protected]>
> > > > > wrote:
> > > > > > >>>>
> > > > > > >>>>> For my need I don't really need the general case, but even
> > > > > > >>>>> if I did
> > > > > I think
> > > > > > >>>>> it can probably be done simpler.
> > > > > > >>>>> The main problem is getting the data from both tables into
> > > > > > >>>>> the same
> > > > > MR job,
> > > > > > >>>>> without resorting to lookups. So without the theoretical
> > > > > > >>>>> MutliTableInputFormat, I could just copy all the data from
> > > > > > >>>>> both
> > > > > tables into
> > > > > > >>>>> a temp table, just append the source table name to the row
> > > > > > >>>>> keys to
> > > > > make
> > > > > > >>>>> sure
> > > > > > >>>>> there are no conflicts. When all the data from both tables
> > > > > > >>>>> is in
> > > > > the same
> > > > > > >>>>> temp table, run a MR job. For each row the mapper should
> > > > > > >>>>> emit a key
> > > > > which
> > > > > > >>>>> is
> > > > > > >>>>> composed of all the values of the join fields in that row
> > > > > > >>>>> (the
> > > > > value can be
> > > > > > >>>>> emitted as is). This will cause all the rows from both
> > > > > > >>>>> tables, with
> > > > > same
> > > > > > >>>>> join field values to arrive at the reducer together. The
> > > > > > >>>>> reducer
> > > > > could then
> > > > > > >>>>> iterate over them and produce the Cartesian product as
> > needed.
> > > > > > >>>>>
> > > > > > >>>>> I still don't like having to copy all the data into a temp
> > > > > > >>>>> table
> > > > > just
> > > > > > >>>>> because I can't feed two tables into the MR job.
> > > > > > >>>>>
> > > > > > >>>>
> > > > > > >>>> Loading the smaller table in memory is called a map join,
> > > > > > >>>> versus a reduce-side join (a.k.a. common join). One reason
> to
> > > > > > >>>> prefer a map
> > > > > join is
> > > > > > >>>> you avoid the shuffle phase which potentially involves
> > > > > > >>>> several trips
> > > > > to disk
> > > > > > >>>> for the intermediate records due to spills, and also once
> > > > > > >>>> through
> > > > > the
> > > > > > >>>> network to get each intermediate KV pair to the right
> reducer.
> > > > > > >>>> With
> > > > > a map
> > > > > > >>>> join, everything is local, except for the part where you
> load
> > > > > > >>>> the
> > > > > small
> > > > > > >>>> table.
> > > > > > >>>>
> > > > > > >>>>
> > > > > > >>>>>
> > > > > > >>>>> As Jason Rutherglen mentioned above, Hive can do joins. I
> > > > > > >>>>> don't
> > > > > know if it
> > > > > > >>>>> can do them for HBase and it will not suit my needs, but it
> > > > > > >>>>> would
> > > > > be
> > > > > > >>>>> interesting to know how is it doing them, if anyone knows.
> > > > > > >>>>>
> > > > > > >>>>
> > > > > > >>>> The Hive-HBase integration allows you to create Hive tables
> > > > > > >>>> that are
> > > > > backed
> > > > > > >>>> by HBase. You can do joins on those tables (and also with
> > > > > > >>>> standard
> > > > > Hive
> > > > > > >>>> tables). It might be worth trying out in your case as it
> lets
> > > > > > >>>> you
> > > > > easily see
> > > > > > >>>> the load characteristics and the job runtime without much
> > > > > > >>>> coding
> > > > > investment.
> > > > > > >>>>
> > > > > > >>>> There are probably some specific optimizations that can be
> > > > > > >>>> applied
> > > > > to your
> > > > > > >>>> situation, but it's hard to say without knowing your
> use-case.
> > > > > > >>>>
> > > > > > >>>> Regards,
> > > > > > >>>>
> > > > > > >>>> - Patrick
> > > > > > >>>>
> > > > > > >>>>
> > > > > > >>>>> -eran
> > > > > > >>>>>
> > > > > > >>>>>
> > > > > > >>>>>
> > > > > > >>>>> On Tue, May 31, 2011 at 22:02, Ted Dunning
> > > > > > >>>>> <[email protected]>
> > > > > wrote:
> > > > > > >>>>>
> > > > > > >>>>> > The Cartesian product often makes an honest-to-god join
> > > > > > >>>>> > not such
> > > > > a good
> > > > > > >>>>> > idea
> > > > > > >>>>> > on large data.  The common alternative is co-group which
> > > > > > >>>>> > is basically like doing the hard work of the join, but
> > > > > involves
> > > > > > >>>>> > stopping just before emitting the cartesian product.
>  This
> > > > > > >>>>> > allows you to inject whatever cleverness you need at this
> > > point.
> > > > > > >>>>> >
> > > > > > >>>>> > Common kinds of cleverness include down-sampling of
> > > > > problematically large
> > > > > > >>>>> > sets of candidates.
> > > > > > >>>>> >
> > > > > > >>>>> > On Tue, May 31, 2011 at 11:56 AM, Michael Segel
> > > > > > >>>>> > <[email protected]>wrote:
> > > > > > >>>>> >
> > > > > > >>>>> > > So the underlying problem that the OP was trying to
> > > > > > >>>>> > > solve was
> > > > > how to
> > > > > > >>>>> join
> > > > > > >>>>> > > two tables from HBase.
> > > > > > >>>>> > > Unfortunately I goofed.
> > > > > > >>>>> > > I gave a quick and dirty solution that is a bit
> > incomplete.
> > > > > They row
> > > > > > >>>>> key
> > > > > > >>>>> > in
> > > > > > >>>>> > > the temp table has to be unique and I forgot about the
> > > > > Cartesian
> > > > > > >>>>> > > product. So my solution wouldn't work in the general
> > case.
> > > > > > >>>>> > >
> > > > > > >>>>> >
> > > > > > >>>>>
> > > > > > >>>>
> > > > > > >>>
> > > > > > >>
> > > > > > >
> > > > >
> > > > >
> > >
> > >
> >
>

Reply via email to