Dear Wiki user,
You have subscribed to a wiki page or wiki category on Pig Wiki for change
notification.
The LoadStoreRedesignProposal page has been changed by ThejasNair.
http://wiki.apache.org/pig/LoadStoreRedesignProposal?action=diffrev1=31rev2=32
--
Mechanism to read side files
Pig needs to read side files in many places like in Merge Join, Order by,
Skew join, dump etc. To facilitate doing this in an easy manner, a utility
!LoadFunc called !ReadToEndLoader has been introduced. Though this has been
implemented as a !LoadFunc, the only !LoadFunc method which is truly
implemented is getNext(). The usage pattern is to construct an instance using
the constructor which would take a reference to the true !LoadFunc (which can
read the side file data) and then repeatedly call getNext() till null is
encountered in the return value. The implementation of !ReadToEndLoader hides
the actions of getting !InputSplits from the underlying !InputFormat and then
processing each split by getting the !RecordReader and processing data in the
split before moving to the next.
+
+ Changes to skew join sampling (PoissonSampleLoader)
+ See discussion in [[https://issues.apache.org/jira/browse/PIG-1062|PIG-1062]]
.
+
+ '''Problem 1''':
+ Earlier version of !PoissonSampleLoader stored the size on disk as an extra
last column in the sampled tuples it returned in map phase of sampling MR job.
This was used in !PartitionSkwewedKeys udf in the reduce stage of sampling job
to compute total number of tuples using
input-file-size/avg-disk-sz-from-samples . Avg-disk-sz-from-samples is not
available with new loader design, because getPosition() is not there.
+
+ '''Solution :'''
+ !PoissonSampleLoader returns a special tuple with number of rows in that Map,
in addition to the sampled tuples. To create this special tuple, the max row
length in input sampled tuples is tracked, and a new tuple with size of
max_row_length + 2 is created.
+ And spl_tuple[max_row_length ] = marker_string
+ spl_tuple[max_row_length + 1] = num_rows
+ The size of max_row_length+2 is used because the join key can be an
expression, which is evaluated on the columns in tuples returned by the
sampler, and the expression might expect specific data types to be present in
certain (= max_row_length) locations of the tuple.
+ If number of tuples in sample is 0, the special tuple is not sent.
+
+ In !PartitionSkwewedKeys udf in the reduce stage,the udf iterates over the
tuples to find these special tuples and calculate the total number of rows.
+
+
+ '''Problem 2''':
+ !PoissonSampleLoader samples 17 tuples from every set of tuples that will fit
into reducer memory (see PigSkewedJoinSpec) . Let us call this number of tuples
that fit into reducer memory - X. Ie we need to sample one tuple every X/17
tuples.
+ Earlier, the number of tuples to be sampled was calculated before the tuples
were read, in !PoissonSampleLoader.computeSamples(..) . To get the number of
samples to be sampled in a map, the formula used was =
number-of-reducer-memories-needed * 17 / number-of-splits
+ Where -
+ number-of-reducer-memories-needed = (total_file_size *
disk_to_mem_factor)/available_reducer_heap_size
+ disk_to_mem_factor has default of 2.
+
+ Then !PoissonSampleLoader would return sampled tuples by skipping
split-size/num_samples bytes at a time.
+
+ With new loader we have to skip some number of tuples instead of bytes. But
we don't have an estimate of total number of tuples in the input.
+ One way to work around this would be to use size of tuple in memory to
estimate size of tuple in disk using above disk_to_mem_factor, then number of
tuples to be skipped will be = (split-size/avg_mem_size_of_tuple)/numSamples
+
+ But the use of disk_to_mem_factor is very dubious, the real
disk_to_mem_factor will vary based on compression-algorithm, data
characteristics (sorting etc), and encoding.
+
+ '''Solution''':
+ The goal is to sample one tuple every X/17 tuples. (X = number of tuples that
fit in available reducer memory)
+ To estimate X, we can use available_reducer_heap_size/average-tuple-mem-size
+ Number of tuples skipped for every sampled tuple = 1/17 * (
available_reducer_heap_size/average-tuple-mem-size)
+
+ The average-tuple-mem-size and
number-of-tuples-to-be-skippled-every-sampled-tuple is recalculated after a new
tuple is sampled.
+
+ Changes to order-by sampling (RandomSampler)
+
+ '''Problem''': With new interface, we cannot use the old approach of dividing
the size of file by number of samples required and skipping that many bytes to
get new sample.
+
+ '''Proposal''':
+ In getNext(),allocate a buffer for T elements, populate it with the first
T tuples, and continue scanning the partition. For every ith next() call,
generate a random number r s.t. 0=ri, and if rT insert the new tuple into
our buffer at position r. This gives a