Ashutosh Chauhan commented on PIG-845:

Hi Dmitriy,

Thanks for review. Please find my comments inline.

EndOfAllInput flags - could you add comments here about what the point of this 
flag is? You explain what EndOfAllInputSetter does (which is actually rather 
self-explanatory) but not what the meaning of the flag is and how it's used. 
There is a bit of an explanation in PigMapBase, but it really belongs here.
>> EndofAllInput flag is basically a flag to indicate that on close() call of 
>> map/reduce task, run the pipeline once more. Till now it was used only by 
>> POStream, but now POMergeJoin also make use of it.

Could you explain the relationship between EndOfAllInput and (deleted) POStream?
>> POStream is still there, I guess you are referring to MRStreamHandler which 
>> is deleted. Its renaming of class. Now that POMergeJoin also makes use of 
>> it, its better to give it a generic name like EndOfAllInput instead of 
>> MRStreamHandler.

Comments in MRCompiler alternate between referring to the left MROp as 
LeftMROper and curMROper. Choose one.
>> Ya, will update the comments.

I am curious about the decision to throw compiler exceptions if MergeJoin 
requirements re number of inputs, etc, aren't satisfied. It seems like a better 
user experience would be to log a warning and fall back to a regular join.
>> Ya, a good suggestion. It would be straight forward to do it while parsing 
>> (e.g. when there are more then two inputs). Though its not straight forward 
>> to do at logical to physical plan and physical to MRJobs translation time. 

Style notes for visitMergeJoin:

It's a 200-line method. Any way you can break it up into smaller components? As 
is, it's hard to follow.
>> I can break it up, but that will bloat the MRCompiler class size. Better 
>> idea is to have MRCompilerHelper or some such class where all the low level 
>> helper function lives, so that MRCompiler itself is small and thus easier to 
>> read. 

The if statements should be broken up into multiple lines to agree with the 
style guides.

Variable naming: you've got topPrj, prj, pkg, lr, ce, nig.. one at a time they 
are fine, but together in a 200-line method they are undreadable. Please 
consider more descriptive names.
>> Will use more descriptive names in next patch.

Kind of a global comment, since it applies to more than just MergeJoin:

It seems to me like we need a Builder for operators to clean up some of the 
new, set, set, set stuff.

Having the setters return this and a Plan's add() method return the plan, would 
let us replace this:

POProject topPrj = new POProject(new 
rightMROpr.reducePlan.connect(pkg, topPrj);

with this:

POProject topPrj = new POProject(new 

rightMROpr.reducePlan.add(topPrj).connect(pkg, topPrj)

>>I agree. At many places there are too many parameters to set. Setters should 
>>be smart and should return the object instead of being void and then this 
>>idea of chaining will help to cut down the number of lines. 

Is the change to List<List<Byte>> keyTypes in POFRJoin related to MergeJoin or 
just rolled in?
POFRJoin can do without this change, but to avoid code duplication, I update 
the POFRJoin to use List<List<Byte>> keyTypes.

8. MergeJoin

break getNext() into components.
>> I dont want to do that because it already has lots of class members which 
>> are getting updated at various places. Making those variables live in 
>> multiple functions will make logic even more harder to follow. Also, I am 
>> not sure if java compiler can always inline the private methods.

I don't see you supporting Left outer joins. Plans for that? At least document 
the planned approach.
>> Ya, outer joins are currently not supported. Its documented in 
>> specification. Will include comment in code also.

Error codes being declared deep inside classes, and documented on the wiki, is 
a poor practice, imo. They should be pulled out into PigErrors (as lightweight 
final objects that have an error code, a name, and a description..) I thought 
Santhosh made progress on this already, no?
>> Not sure if I understand you completely. I am using ExecException, 
>> FrontEndException etc. Arent these are lightweight final objects you are 
>> referring to ?

Could you explain the problem with splits and streams? Why can't this work for 
>> Streaming after the join will be supported. There was a bug which I fixed 
>> and will be a part of next patch. Streaming before Join will not be 
>> supported because in endOfAllInput case, streaming may potentially produce 
>> multiple tuples when runPipeline() is run last time. Current implementation 
>> of POMergeJoin makes an assumption that when endofAllInput flag is set, all 
>> the input tuples are exhausted and pipeline is running with EOP status. To 
>> fix that, logic within POMergeJoin needs to be updated. 
On the side note, I found that when there is a streaming in a pipeline (with or 
without merge join) counters reported are always wrong. Reading the code in 
ExecutableManager, I can see why that's the case, but it is certainly confusing 
for end-user. 

9. Sampler/Indexer:
Looks like you create the same number of map tasks for this as you do for a 
join; all a sampling map task does is read one record and emit a single tuple. 
That seems wasteful; there is a lot of overhead in setting up these tiny jobs 
which might get stuck behind other jobs running on the cluster, etc. If the 
underlying file has syncpoints, a smaller number of MR tasks can be created. If 
we know the ratio of sample tasks to "full" tasks, we can figure out how many 
records we should emit per job ( ceil(full_tasks/sample_tasks) ). We can 
approximately achieve this through seeking trough (end-offset)/num_to_emit and 
doing a sync() after that seek. It's approximate, but close enough for an index.
>> Indexing task themselves don't take much time and finish in less then 10 
>> seconds, while actual join may run for many hours. So, there is hardly any 
>> overhead because of that. But I agree, there is a possibility of these map 
>> tasks getting stuck in overloaded cluster. Your suggestion seems to be a 
>> good one. Will need further investigation. Current indexing technique is 
>> naive and definitely could be improved upon later. 
Consider renaming to something like SortedFileIndexer, since it's coneivable 
that this component can be reused in a context other than a Merge Join.
>> Naming it as MergeJoinIndexer does tie it with merge-join but also makes it 
>> explicit that who uses it and why. I think we should avoid giving generic 
>> names until there is a need.  
Would it make sense to expose this to the users via a 'CREATE INDEX' (or 
similar) command?
That way the index could be persisted, and the user could tell you to use an 
existing index instead of rescanning the data.
>> If we allow that then we also need to deal with managing and persisting the 
>> index. Once Owl is integrated, we could make use of that to do all this for 
>> Pig. Till then, we can continue creating index every time and as I said 
>> overhead of index creation is negligible as compared to run times of actual 
>> joins.

I am not sure about the approach of pushing sampling above filters. Have you 
guys benchmarked this? Seems like you'd wind up reading the whole file in the 
sample job if the filter is selective enough (and high filter selectivity would 
also make materialize->sample go much faster).
>> I guess you mean reading whole file in join job. That is an attractive idea 
>> and we also thought about it. In the end, we decided not to do that way 
>> because in that case your index is on different data then on the one you are 
>> reading in join job. Remember, we yank the plan on right side and run it 
>> within POMergeJoin after reading data from HDFS. And in that case, its not 
>> easy to guarantee the consistency of index with data.

You should test for refusal to do 3-way join and other error condition (or a 
warning and successful failover to regular join - my preference)
>> Will include this test.

You should do a proper unit test for the MergeJoinIndexer (or whatever we are 
calling it).
>> Can do that, but this test will require firing of multiple map tasks which 
>> will further prolong the time taken to complete the unit test. TestMergeJoin 
>> already has 10 tests which take about 10 minutes to finish. Adding more time 
>> consuming tests will further increase build time. If that is okay, I can add 
>> this one more test.

> -----------------------
>                 Key: PIG-845
>                 URL: https://issues.apache.org/jira/browse/PIG-845
>             Project: Pig
>          Issue Type: Improvement
>            Reporter: Olga Natkovich
>            Assignee: Ashutosh Chauhan
>         Attachments: merge-join-1.patch, merge-join-for-review.patch
> Thsi join would work if the data for both tables is sorted on the join key.

This message is automatically generated by JIRA.
You can reply to this email to add a comment to the issue online.

Reply via email to