Dear Wiki user,

You have subscribed to a wiki page or wiki category on "Pig Wiki" for change 

The "LoadStoreRedesignProposal" page has been changed by ThejasNair.


  ==== Mechanism to read side files ====
  Pig needs to read side files in many places like in Merge Join, Order by, 
Skew join, dump etc. To facilitate doing this in an easy manner, a utility 
!LoadFunc called !ReadToEndLoader has been introduced. Though this has been 
implemented as a !LoadFunc, the only !LoadFunc method which is truly 
implemented is getNext(). The usage pattern is to construct an instance using 
the constructor which would take a reference to the true !LoadFunc (which can 
read the side file data) and then repeatedly call getNext() till null is 
encountered in the return value. The implementation of !ReadToEndLoader hides 
the actions of getting !InputSplits from the underlying !InputFormat and then 
processing each split by getting the !RecordReader and processing data in the 
split before moving to the next.
+ ==== Changes to skew join sampling (PoissonSampleLoader) ====
+ See discussion in [[|PIG-1062]] 
+ '''Problem 1''':
+ Earlier version of !PoissonSampleLoader stored the size on disk as an extra 
last column in the sampled tuples it returned in map phase of sampling MR job. 
This was used in !PartitionSkwewedKeys udf in the reduce stage of sampling job 
to compute total number of tuples using 
input-file-size/avg-disk-sz-from-samples . Avg-disk-sz-from-samples is not 
available with new loader design, because getPosition() is not there.
+ '''Solution :'''
+ !PoissonSampleLoader returns a special tuple with number of rows in that Map, 
in addition to the sampled tuples. To create this special tuple, the max row 
length in input sampled tuples is tracked, and a new tuple with size of 
max_row_length + 2 is created.
+ And spl_tuple[max_row_length ] = "marker_string"
+  spl_tuple[max_row_length + 1] = num_rows
+ The size of max_row_length+2 is used because the join key can be an 
expression, which is evaluated on the columns in tuples returned by the 
sampler, and the expression might expect specific data types to be present in 
certain (<= max_row_length) locations of the tuple.
+ If number of tuples in sample is 0, the special tuple is not sent.
+ In !PartitionSkwewedKeys udf in the reduce stage,the udf iterates over the 
tuples to find these special tuples and calculate the total number of rows.
+ '''Problem 2''':
+ !PoissonSampleLoader samples 17 tuples from every set of tuples that will fit 
into reducer memory (see PigSkewedJoinSpec) . Let us call this number of tuples 
that fit into reducer memory - X. Ie we need to sample one tuple every X/17 
+ Earlier, the number of tuples to be sampled was calculated before the tuples 
were read, in !PoissonSampleLoader.computeSamples(..) . To get the number of 
samples to be sampled in a map, the formula used was = 
number-of-reducer-memories-needed * 17 / number-of-splits
+ Where -
+ number-of-reducer-memories-needed = (total_file_size * 
+ disk_to_mem_factor has default of 2.
+ Then !PoissonSampleLoader would return sampled tuples by  skipping 
split-size/num_samples bytes at a time.
+ With new loader we have to skip some number of tuples instead of bytes. But 
we don't have an estimate of total number of tuples in the input.
+ One way to work around this would be to use size of tuple in memory to 
estimate size of tuple in disk using above disk_to_mem_factor, then number of 
tuples to be skipped will be = (split-size/avg_mem_size_of_tuple)/numSamples
+ But the use of disk_to_mem_factor is very dubious, the real 
disk_to_mem_factor will vary based on compression-algorithm, data 
characteristics (sorting etc), and encoding.
+ '''Solution''':
+ The goal is to sample one tuple every X/17 tuples. (X = number of tuples that 
fit in available reducer memory)
+ To estimate X, we can use available_reducer_heap_size/average-tuple-mem-size
+ Number of tuples skipped for every sampled tuple = 1/17 * ( 
+ The average-tuple-mem-size and 
number-of-tuples-to-be-skippled-every-sampled-tuple is recalculated after a new 
tuple is sampled.
+ ==== Changes to order-by sampling (RandomSampler) ====
+ '''Problem''': With new interface, we cannot use the old approach of dividing 
the size of file by number of samples required and skipping that many bytes to 
get new sample.
+ '''Proposal''': 
+     In getNext(),allocate a buffer for T elements, populate it with the first 
T tuples, and continue scanning the partition. For every ith next() call, 
generate a random number r s.t. 0<=r<i, and if r<T insert the new tuple into 
our buffer at position r. This gives a nicely random sample of the tuples in 
the partition.
  === Remaining Tasks ===
   * !BinStorage needs to implement !LoadMetadata's getSchema() to replace 
current determineSchema()
   * piggybank loaders/storers need to be ported
@@ -610, +655 @@

  Nov 11, Dmitriy Ryaboy
   Minor clarification of meaning of mBytes in !ResourceStatistics
+ Nov 12 2009, Thejas Nair
+ Added sections -
+  * Changes to order-by sampling (!RandomSampler)
+  * Changes to skew join sampling (!PoissonSampleLoader)

Reply via email to