[
https://issues.apache.org/jira/browse/PIG-1062?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12772017#action_12772017
]
Dmitriy V. Ryaboy commented on PIG-1062:
----------------------------------------
I have ResourceStats hooked up to LogicalOperators already, need to port the
code to the new branch. This will let us take statistics, if they are
available, and pass them into the PoissonSampleLoader at initialization time,
so it can get the number of tuples and avg tuple size directly from Stats.
That being said, statistics may not always be available...
Before I go into the more fanciful suggestion below -- perhaps a simple hack
will do. We have counters in Hadoop. Any reason we can't just read "bytes read
in map", "records read in map", "bytes written in map", "records written in
map" counters directly?
If I am overlooking something obvious, here's the "ignore counters" suggestion:
If my understanding is correct, in PoissonSampleLoader we are interested in the
average size of a tuple more than # of tuples -- # of tuples is just used as a
way of crudely estimating avg size of tuple on disk, which is in turn used to
crudely estimate the size of tuple in memory. The estimate is likely to be
very off, by the way, if we are not loading from BinStorage, but from arbitrary
loadFuncs, as the underlying data, even if it is a file, might be compressed.
Perhaps we can get the average tuple size directly, instead? We could get that
in the mappers of the sampling job by recording memory usage at the first
getNext() call, forcing garbage collection, buffering up K tuples, and getting
memory usage again.
We now have the following variables available to each sampling mapper in the
SkewedPartitioner:
* sample rate S (for the appropriate Poisson distribution)
* total # of mappers, M
* available heap size on the reducer, H
* estimated avg size of tuple, s
The number of tuples we want to sample is then simply T = max(10, S*H/(s*M))
In getNext(), we can now allocate a buffer for T elements, populate it with the
first T tuples, and continue scanning the partition. For every ith next() call,
we generate a random number r s.t. 0<=r<i, and if r<T we insert the new tuple
into our buffer at position r. This gives us a nicely random sample of the
tuples in the partition.
So this gets around the need for file size info on that side.
Now, PartitionSkewedKey uses the file size / avg_tuple_disk_size to estimate
total number of tuples, and uses this estimate, plus the ratio of instances of
a given key in the sample to the total sample size to predict the total number
of records with a given key in the input. But given the number of sampled
tuples, and the sample rate, couldn't we calculate the total number of records
in the original file by simply reversing the formula for determining the number
of tuples to sample? If we do this, no need to append any metadata.
Lastly, if we do want to move around metadata such as number of records in
input, etc, and we don't want to use Hadoop counters, we should extend
BinStorage with ResourceStats serialization, and use ResourceStatistics for
this. Even if the original data might not have stats, there is no reason we
can't generate these basic counts at runtime for the data we write ourselves.
-D
> load-store-redesign branch: change SampleLoader and subclasses to work with
> new LoadFunc interface
> ---------------------------------------------------------------------------------------------------
>
> Key: PIG-1062
> URL: https://issues.apache.org/jira/browse/PIG-1062
> Project: Pig
> Issue Type: Sub-task
> Reporter: Thejas M Nair
>
> This is part of the effort to implement new load store interfaces as laid out
> in http://wiki.apache.org/pig/LoadStoreRedesignProposal .
> PigStorage and BinStorage are now working.
> SampleLoader and subclasses -RandomSampleLoader, PoissonSampleLoader need to
> be changed to work with new LoadFunc interface.
> Fixing SampleLoader and RandomSampleLoader will get order-by queries working.
> PoissonSampleLoader is used by skew join.
--
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.