I posted a similar question a week or two ago but got no responses.
Here's what I've done - I've taken to breaking my computations into stages
and then computing an estimated parallelism very simply. It may not be
optimal but it is certainly better than allocating 600 nodes for data that
will fit in 1.
Roughly speaking, I:
* sum the sizes of all the bzip compressed data files (sumcompressed)
* multiply by 10 to assume a 10:1 compression ratio
* multiply by 2 to assume account for 100% overhead of Java objects
* divide by the amount of RAM allocated to each Reduce job via
-Dmapred.job.reduce.memory.mb
So far, it seems to work reasonably well. Your mileage may vary. For
instance, adding a scalar parameter to fine tune the amount of memory might
be useful.
Here's the script that wrote to do the computations. I call it from within
my Pig-in-a-Blanket wrappers for Paths to Success analysis.
Examples:
Get summary statistics for all subdirs under dir, report in GB
./datasize.pl --memory=1536 --units=gb
/user/ciemo/pathstosuccess/revamp/paths.summary.daily/*/all-all/query-succes
s-section-url.bz2
Compute parallel nodes required for all data below this path
./datasize.pl --memory=1536 --units=gb --parallelonly --nosubdirs --nopath
/user/ciemo/pathstosuccess/revamp/paths.summary.daily/*/all-all/query-succes
s-section-url.bz2
#!/usr/bin/perl
use Getopt::Long;
my $subdirs = 1;
my $nosubdirs = 0;
my $nopath = 0;
my $nototal = 0;
my $sumonly = 0;
my $units = '';
my $unitscalar = 1.0;
my $unitformat = '%d';
my $memory = 512;
my $parallelonly = 0;
GetOptions(
"subdirs" => \$subdirs,
"nosubdirs" => \$nosubdirs,
"nopath" => \$nopath,
"nototal" => \$nototal,
"sumonly" => \$sumonly,
"parallelonly" => \$parallelonly,
"units=s" => \$units,
"memory=i" => \$memory,
);
if ($units =~ /^kb$/i) { $unitscalar = 1.0E3; $unitformat = '%0.3f'; }
if ($units =~ /^mb$/i) { $unitscalar = 1.0E6; $unitformat = '%0.3f'; }
if ($units =~ /^gb$/i) { $unitscalar = 1.0E9; $unitformat = '%0.3f'; }
if ($units =~ /^tb$/i) { $unitscalar = 1.0E12; $unitformat = '%0.3f'; }
if ($units =~ /^$/i) { $unitscalar = 1.0; $unitformat = '%d'; }
if ($nosubdirs) { $subdirs = 0; }
if ($ARGV[0]) {
$rootpath = $ARGV[0];
} else {
print STDERR "No path specified.\n";
}
open (FILES, qq{hadoop fs -lsr $rootpath |});
while (<FILES>) {
$line = $_;
s/\r*\n*$//;
my ($perms, $repl, $user, $group, $bytes, $date, $time, $path) = split(/
+/);
if ($perms =~ /^d/) { next; }
if ($path =~ /\/_temporary/) { next; }
my $leadpath = $path;
$leadpath =~ s/\/part-[0-9]+[^\/]*$//;
my $leadpath = $path;
$leadpath =~ s/\/part-[0-9]+[^\/]*$//;
$counts{$rootpath} ++;
$sums{$rootpath} += $bytes;
$sumssq{$rootpath} += $bytes * $bytes;
$mins{$rootpath} = ((defined $mins{$rootpath}) ? (($mins{$rootpath} <
$bytes) ? $mins{$rootpath} : $bytes) : $bytes);
$maxs{$rootpath} = ((defined $maxs{$rootpath}) ? (($maxs{$rootpath} >
$bytes) ? $maxs{$rootpath} : $bytes) : $bytes);
if (not defined $paths{$leadpath}) {
$paths{$leadpath} = [];
}
push @{$paths{$leadpath}}, $bytes;
$counts{$leadpath} ++;
$sums{$leadpath} += $bytes;
$sumssq{$leadpath} += $bytes * $bytes;
$mins{$leadpath} = ((defined $mins{$leadpath}) ? (($mins{$leadpath} <
$bytes) ? $mins{$leadpath} : $bytes) : $bytes);
$maxs{$leadpath} = ((defined $maxs{$leadpath}) ? (($maxs{$leadpath} >
$bytes) ? $maxs{$leadpath} : $bytes) : $bytes);
}
foreach my $path ( ((not $nototal) ? $rootpath : ()), (($subdirs) ? (sort
keys %paths) : () ) ) {
my $sum = $sums{$path};
my $count = $counts{$path};
my $sumsq = $sumssq{$path};
my $min = $mins{$path};
my $max = $maxs{$path};
my $mean = $sum / $count;
my $meansq = $sumsq / $count;
my $var = $meansq - $mean*$mean;
my $stdev = sqrt($var);
my $sterr = $stdev / sqrt($count);
my $parallelism = int(($sum * 10 * 2) / ($memory * 1.0E6) + 1);
if (not $nopath) { print $path . "\t"; }
if ($sumonly) {
print join("\t", sprintf($unitformat, $sum / $unitscalar)), "\n";
} if ($parallelonly) {
print join("\t", sprintf('%d', $parallelism)), "\n";
} else {
print join("\t", $count,
sprintf($unitformat, $min / $unitscalar),
sprintf($unitformat, $max / $unitscalar),
sprintf($unitformat, $mean / $unitscalar),
sprintf($unitformat, $sum / $unitscalar),
sprintf('%d', $parallelism),
), "\n";
}
}
On 11/12/09 8:25 AM, "Alan Gates" <[email protected]> wrote:
> I agree that it would be very useful to have a dynamic number of
> reducers. However, I'm not sure how to accomplish it. MapReduce
> requires that we set the number of reducers up front in JobConf, when
> we submit the job. But we don't know the number of maps until
> getSplits is called after job submission. I don't think MR will allow
> us to set the number of reducers once the job is started.
>
> Others have suggested that we use the file size to specify the number
> of reducers. We cannot always assume the inputs are HDFS files (it
> could be from HBase or something). Also different storage formats
> (text, sequence files, zebra) would need different ratios of bytes to
> reducers since they store data at different compression rates. Maybe
> this could still work assuming, only in the HDFS case, with the
> assumption that the user understands the compression ratios and thus
> can set the reducer input accordingly. But I'm not sure this will be
> simple enough to be useful.
>
> Thoughts?
>
> Alan.
>
>
> On Nov 12, 2009, at 12:12 AM, Jeff Zhang wrote:
>
>> Hi all,
>>
>> Often, I will run one script on different data set. Sometimes small
>> data set
>> and sometimes large data set. And different size of data set require
>> different number of reducers.
>> I know that the default reduce number is 1, and users can change the
>> reduce
>> number in script by keywords parallel.
>>
>> But I do not want to be bothered to change reduce number in script
>> each time
>> I run script.
>> So I have an idea that could pig provide some API that users can set
>> the
>> ratio between map task and reduce task. (and some new keyword in pig
>> latin
>> to set the ratio)
>>
>> e.g. If I set the ratio to be 2:1, then if I have 100 map tasks, it
>> will
>> have 50 reduce task accordingly.
>>
>> I think it will be convenient for pig users.
>>
>>
>> Jeff Zhang
>