Thank you so so much, Rajesh ~.
After read your explain for question 3) and 4), I have two more questions:
(1)Can tez map outputs'storage ideas be interpreted as MR's. There will be a 
ring buffer to store map output first?
(2)For MRR, does the first reduce vertex's outputs stored in disk? I traced 
logs,but can't diagnose.
(3)As https://tez.apache.org/index.html picture described, throughout the tez 
DAG process, there are just a little data need write to disk,does this means 
just write little data to HDFS?

best wishes & thank you

Maria~.

At 2016-04-15 07:26:59, "Rajesh Balamohan" <[email protected]> wrote:
 


Answers inline


1) how to understand "pipelined shuffle"? Does it is becase the pipeline sort? 
I find some comments about pipelined shuffle in 
ShuffleSchaduler.copySucceeded(),but still cannot fully understand:
      * In case of pipelined shuffle, it is quite possible that fetchers pulled 
the FINAL_UPDATE spill in advance due to smaller output size.  In such 
scenarios, we need to wait until we retrieve all spill
      * details to claim success.
Can you please explain the meaning more?


>>  In case of ordered outputs, only when the data generation (sort+spill+final 
>> merge etc) is complete, data would be available to downstream vertex. With 
>> pipelined shuffle, data can be made available as an when a sorted segment is 
>> available from PipelinedSorter. Also, it avoids the expensive final merge 
>> operation in the producer side. To put it short, as the data is being 
>> genreated in producer, data can made available to downstream for 
>> downloading.  And When overall data generation is over, FINAL_UPDATE event 
>> is sent out. 


Similar case with undered partitioned outputs, where data could be made 
available only when entire data is written out. With PipelinedShuffle, as and 
when the data for a partition is generated, event is sent out downstream for 
consuming.


It is quite possible that the data generated in the final stage is quite small 
as compared with the previous segments. However, in the downstream multiple 
fetchers would be used for downloading the data. For example, assume 5 events 
are generated (4 normal events + 1 FINAL_UPDATE event) and 10 threads are 
available in downstream. In this case, all segments can be theoritically be 
downloaded in parallel and it is quite possible that data pertaining to 
FINAL_UPDATE event could get downloaded faster (could be due to lesser amount 
of data. e.g 1 MB as opposed to 1000 MB in other segments).  In such cases, 
fetchers have to wait until all segments are downloaded (to prevent downstream 
from proceeding with partially downloaded dataset which could lead to 
correctness issues).




2) Are there any other shuffle mode besides pipelined shuffle?  the legacy 
mapreduce shuffle? (I know that tez borrows much of the MR shuffle.)


>>  You can explore more about ordered shuffle in 
>> https://github.com/apache/tez/tree/dacd0191b684208d71ea457ca849f2d01212bb7e/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped
>>  and unordered shuffle in 
>> https://github.com/apache/tez/tree/dacd0191b684208d71ea457ca849f2d01212bb7e/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl


3) Where is the map output data stored? how to control its storage,Is there any 
parameters for that?
>>  Map output is stored in disk (one of the directories of 
>> yarn.nodemanager.local-dirs). 
>> https://github.com/apache/tez/blob/dacd0191b684208d71ea457ca849f2d01212bb7e/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutputFiles.java


4) If the map output stored in memory, how does custom vertex and tasks to 
fetch them from memory? And if we do not re-use container,who manage map 
outputs?


>>  Map outputs are stored in disk. Higher level programs can choose to cache 
>> using ObjectRegistry (e.g hashtable loading 
>> https://github.com/apache/hive/blob/26b5c7b56a4f28ce3eabc0207566cce46b29b558/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java#L179).
>>  In which case, when tasks are scheduled on the same container again, they 
>> check if the data is locally present in memory and if not download from 
>> remote again.  In fetcher side, based on the amount of memory available, 
>> downloaded data can be stored in memory/disk for processing.




5) Does one fetcher  corresponds with one mapoutput? And a  fetcher just  pull 
one-time of all the data produced by one map output?


>> Set of fetcher threads are allocated in downstream task and they can 
>> download data from different tasks from the source vertex based on the 
>> allocations from shuffle manager.




On Thu, Apr 14, 2016 at 9:58 PM, Maria <[email protected]> wrote:



Hi, all:

   I have several questions about tez shuffle stage:

1) how to understand "pipelined shuffle"? Does it is becase the pipeline sort? 
I find some comments about pipelined shuffle in 
ShuffleSchaduler.copySucceeded(),but still cannot fully understand:

      * In case of pipelined shuffle, it is quite possible that fetchers pulled 
the FINAL_UPDATE spill in advance due to smaller output size.  In such 
scenarios, we need to wait until we retrieve all spill

      * details to claim success.

Can you please explain the meaning more?



2) Are there any other shuffle mode besides pipelined shuffle?  the legacy 
mapreduce shuffle? (I know that tez borrows much of the MR shuffle.)

3) Where is the map output data stored? how to control its storage,Is there any 
parameters for that?

4) If the map output stored in memory, how does custom vertex and tasks to 
fetch them from memory? And if we do not re-use container,who manage map 
outputs?

5) Does one fetcher  corresponds with one mapoutput? And a  fetcher just  pull 
one-time of all the data produced by one map output?



Any reply will be much appreciated.



Maria~.



-- 

~Rajesh.B

Reply via email to