So the approach to doing this would be as follows:

* test the benefit in a realistic scenario, perhaps using some quick custom
scripts using streaming or plain MR

Assuming the benefits are substantial:

* introduce a copartitioned join type, which is a map-side join which works
when:
  -- all relations being joined produce the same number of splits
  -- there is a guarantee that all instances of a join key are in the same
split
  -- there is a guarantee that the same key will be found in the same split
index for all relations

* introduce a copartitioned merge join type, which adds another constraint:
  -- records in each split of each relation are sorted on the join key

(we may want to only do copartitioned merge.. though the copartitioned impl
may be useful in certain cases as well).

At that point, we can manually test the speedup in the pig context by
forcing these joins on materialized intermediate data.

The next step would be doing this automatically by tracing partitioning
functions, keys, and parallelism factors for all relations, and deriving the
fact that the join applies.

This is going to be a lot of work, someone's going to have to be really
motivated to do it. We've had good luck with GSoC students for that sort of
thing :-).

D

On Tue, Aug 30, 2011 at 5:21 PM, Kevin Burton <[email protected]> wrote:

> On Tue, Aug 30, 2011 at 4:19 PM, Dmitriy Ryaboy <[email protected]>
> wrote:
>
> > I am not sure I am following your argument about side files.
> >
> > We store the output of the first group-by to HDFS. Let's say we have 100
> > reducers in that case, so we create 100 files (each replicated 3x).
> >
> > We now read in the second relation and partition it into 100 bins on the
> > mappers. We then start 100 reducers (arbitrarily selected among our
> nodes).
> > Each node needs to fetch its map outputs (this is always the case). In
> this
> > optimized workflow, it also needs to fetch one of the 100 files we
> created
> > in the previous job. This will most likely have to go over a network in
> any
> > decent-sized cluster, as we only have 3/n chance of a local read here (n
> > being the number of nodes in your cluster). The amount of data we have to
> > read off a remote disk and move to the reducer is mostly the same in both
> > this and regular scenario. The regular scenario might be a bit worse in
> the
> > case when its mappers do not get started local to the data.
> >
> >
> Ah….. yes.  This is a good point.  This seems to be a flaw in the mapred
> scheduler but I imagine you would have ti hint it ahead of time by that you
> will need access to these files so that the map jobs line up where the
> chunks are written.
>
> It is further complicated by the fact the the only 'pure' machine to read
> the data from is the original box as if this machine fails you may need to
> read blocks from other machines and these might not line up with the same
> blocks from the current job or even be on the same hosts.
>
> The regular scenario also loses out on the fact that it does a fair bit of
> > extra IO -- it reads the previously generated chunk of data, writes it
> back
> > to disk, and only then ships it. So there is a double IO cost.
> >
> >
> I also think it is impacted by the CPI … in our situation it's CPU
> intensive.
>
>
> > Your proposed optimization also gets some savings from the fact that we
> > don't have to perform a sort of the output of the first job, and we can
> > simply do a merge join in each reducer.
> >
> >
> yes.  I think this is the main speed up as I see it.
>
> So there are really two options (this is the extra work involved).
>
> Option 1 (the current option)
>
>  - perform all the IO on the original file
>  - sort all the data
>  - send all the data over the network to the reducers
>
> Option 2:
>  - read data from potentially the incorrect hosts over the network during
> the reduce phase.
>
> … Option 2 still seems better.  Other than HDFS replication, the network
> shouldn't use as much bandwidth during this phase but it will still 'slow
> us
> down' but I don' think as much as option 1.
>
> Despite all this, my gut feeling is that the 5-10x speedup number is way
> too
> > optimistic. Is that a guess, or backed by an experiment?
> >
> >
> Mea culpa … I should have explained a bit more.
>
> The 5-10x speedup is because we perform this step repeatedly.
>
> We first do a initial group… then a number of recurring cogroups… usually
> 5-10x but this is just for our initial prototype.  It could be much more in
> production.
>
>
> > We should probably open a jira, try a quick prototype experiment, and
> then
> > if the prototype is promising, spec out how this can actually be
> > implemented.
> >
>
> I think a modification of my original .pig script (but maybe with a
> significant amount of data) could be used to help prove out the performance
> difference.
>
> Kevin
>
> --
>
> Founder/CEO Spinn3r.com
>
> Location: *San Francisco, CA*
> Skype: *burtonator*
>
> Skype-in: *(415) 871-0687*
>

Reply via email to