Dear Wiki user, You have subscribed to a wiki page or wiki category on "Pig Wiki" for change notification.

The "LoadStoreRedesignProposal" page has been changed by ThejasNair. http://wiki.apache.org/pig/LoadStoreRedesignProposal?action=diff&rev1=31&rev2=32 -------------------------------------------------- ==== Mechanism to read side files ==== Pig needs to read side files in many places like in Merge Join, Order by, Skew join, dump etc. To facilitate doing this in an easy manner, a utility !LoadFunc called !ReadToEndLoader has been introduced. Though this has been implemented as a !LoadFunc, the only !LoadFunc method which is truly implemented is getNext(). The usage pattern is to construct an instance using the constructor which would take a reference to the true !LoadFunc (which can read the side file data) and then repeatedly call getNext() till null is encountered in the return value. The implementation of !ReadToEndLoader hides the actions of getting !InputSplits from the underlying !InputFormat and then processing each split by getting the !RecordReader and processing data in the split before moving to the next. + + ==== Changes to skew join sampling (PoissonSampleLoader) ==== + See discussion in [[https://issues.apache.org/jira/browse/PIG-1062|PIG-1062]] . + + '''Problem 1''': + Earlier version of !PoissonSampleLoader stored the size on disk as an extra last column in the sampled tuples it returned in map phase of sampling MR job. This was used in !PartitionSkwewedKeys udf in the reduce stage of sampling job to compute total number of tuples using input-file-size/avg-disk-sz-from-samples . Avg-disk-sz-from-samples is not available with new loader design, because getPosition() is not there. + + '''Solution :''' + !PoissonSampleLoader returns a special tuple with number of rows in that Map, in addition to the sampled tuples. To create this special tuple, the max row length in input sampled tuples is tracked, and a new tuple with size of max_row_length + 2 is created. + And spl_tuple[max_row_length ] = "marker_string" + spl_tuple[max_row_length + 1] = num_rows + The size of max_row_length+2 is used because the join key can be an expression, which is evaluated on the columns in tuples returned by the sampler, and the expression might expect specific data types to be present in certain (<= max_row_length) locations of the tuple. + If number of tuples in sample is 0, the special tuple is not sent. + + In !PartitionSkwewedKeys udf in the reduce stage,the udf iterates over the tuples to find these special tuples and calculate the total number of rows. + + + '''Problem 2''': + !PoissonSampleLoader samples 17 tuples from every set of tuples that will fit into reducer memory (see PigSkewedJoinSpec) . Let us call this number of tuples that fit into reducer memory - X. Ie we need to sample one tuple every X/17 tuples. + Earlier, the number of tuples to be sampled was calculated before the tuples were read, in !PoissonSampleLoader.computeSamples(..) . To get the number of samples to be sampled in a map, the formula used was = number-of-reducer-memories-needed * 17 / number-of-splits + Where - + number-of-reducer-memories-needed = (total_file_size * disk_to_mem_factor)/available_reducer_heap_size + disk_to_mem_factor has default of 2. + + Then !PoissonSampleLoader would return sampled tuples by skipping split-size/num_samples bytes at a time. + + With new loader we have to skip some number of tuples instead of bytes. But we don't have an estimate of total number of tuples in the input. + One way to work around this would be to use size of tuple in memory to estimate size of tuple in disk using above disk_to_mem_factor, then number of tuples to be skipped will be = (split-size/avg_mem_size_of_tuple)/numSamples + + But the use of disk_to_mem_factor is very dubious, the real disk_to_mem_factor will vary based on compression-algorithm, data characteristics (sorting etc), and encoding. + + '''Solution''': + The goal is to sample one tuple every X/17 tuples. (X = number of tuples that fit in available reducer memory) + To estimate X, we can use available_reducer_heap_size/average-tuple-mem-size + Number of tuples skipped for every sampled tuple = 1/17 * ( available_reducer_heap_size/average-tuple-mem-size) + + The average-tuple-mem-size and number-of-tuples-to-be-skippled-every-sampled-tuple is recalculated after a new tuple is sampled. + + ==== Changes to order-by sampling (RandomSampler) ==== + + '''Problem''': With new interface, we cannot use the old approach of dividing the size of file by number of samples required and skipping that many bytes to get new sample. + + '''Proposal''': + In getNext(),allocate a buffer for T elements, populate it with the first T tuples, and continue scanning the partition. For every ith next() call, generate a random number r s.t. 0<=r<i, and if r<T insert the new tuple into our buffer at position r. This gives a nicely random sample of the tuples in the partition. + === Remaining Tasks === * !BinStorage needs to implement !LoadMetadata's getSchema() to replace current determineSchema() * piggybank loaders/storers need to be ported @@ -610, +655 @@ Nov 11, Dmitriy Ryaboy Minor clarification of meaning of mBytes in !ResourceStatistics + Nov 12 2009, Thejas Nair + Added sections - + * Changes to order-by sampling (!RandomSampler) + * Changes to skew join sampling (!PoissonSampleLoader) +