Sorry, see the class now. My LogicalOutput would also have to implement OutputCommitter or presumably have a helper class that does.
On Tue, Jul 22, 2014 at 9:49 AM, Thaddeus Diamond < [email protected]> wrote: > I see. Now when you say "both output operations write to unique > directories", this appears to be something I would have to enforce if I > wrote my own LogicalOutput class, correct? Presumably I could add a > commit() method to my LogicalOutput implementation that would actually > perform the atomic HDFS move when the processor calls it from its close() > method. Is that what you would suggest? > > > On Tue, Jul 22, 2014 at 2:38 AM, Gopal V <[email protected]> wrote: > >> On 7/22/14, 8:19 AM, Thaddeus Diamond wrote: >> >>> Also is OnFileSortedOutput allowed to be terminal? And is it immune to >>> the >>> race conditions described? It seems like flush() is actually writing to >>> a >>> file stream on HDFS, without using canCommit(), which would have the >>> aforementioned problems. >>> >> >> OnFileSortedOutput is not terminal, it is a local file output mode which >> writes to the local filesystem, not HDFS. That serves as the connecting >> edge between two vertices and it is not even total order sorted. >> >> The scatter-gather edge does total order, if you use a >> TotalOrderPartitioner (or in MR-like terms "a reducer"). >> >> You need something like MROutput, connected as a terminal operator after >> that total order scatter-gather (i.e send all partition0 to reducer0, have >> it write 00000_0). >> >> But for the sake of clarity, here is a vague simplification of how we >> avoid race conditions in output phase. >> >> There is a transactional system, which is similar to lock elision. >> >> There is no difference between how that works between task failure >> scenarios (split network, for instance) or speculation. It will be used if >> there are more than one attempt for a given task. >> >> The interface to that is called Task::canCommit(). It will return "true" >> for the very first task which gets there and false for every attempt after >> that (well, that's a simplification, but it sort of will do for now). >> >> To prevent clobbering each other's output files when 2 attempts are in >> flight (due to speculation or partial writes due to failure), both output >> operations write to unique directories. >> >> The unique directory name is something like <dest-dir>/_temporary/<task>/< >> attempt>/ >> >> To commit an attempt as the final output, files from its temp dir will be >> moved to the final output location. >> >> For people more familiar with native locking, this is an elision with a >> visibility criteria instead of being truly mutual exclusion (i.e nothing >> stops two attempts from running in parallel, the first one to finish wins, >> the loser's side-effects are removed). >> >> This is how the system works fast without race conditions. >> >> Cheers, >> Gopal >> >> >> >>> >>> On Mon, Jul 21, 2014 at 10:15 PM, Thaddeus Diamond < >>> [email protected]> wrote: >>> >>> Okay cool that makes sense. I will look into the OutputCommitter and >>>> OutputFormat plus upgrade to 0.5. >>>> >>>> Next question: In MR the OutputCollector provides a native way to do >>>> global sorting just by specifying the output key. Is there a way with >>>> the >>>> OutputCommitter or OutputFormat to do that? Basically I just want a >>>> way to >>>> do key-value output and have it be HDFS-sorted on disk. >>>> >>>> Thanks, >>>> Thad >>>> >>>> >>>> On Mon, Jul 21, 2014 at 2:02 PM, Hitesh Shah <[email protected]> wrote: >>>> >>>> Hi Thad, >>>>> >>>>> With respect to speculation related race conditions, the condition can >>>>> hold true even if speculation is disabled. A task attempt may lose >>>>> connection to the AM and a new attempt launched ( even though the >>>>> earlier >>>>> attempt continues to run). In either scenario, an Output has access to >>>>> a >>>>> canCommit() API that should be invoked from the task runtime. This >>>>> does a >>>>> check with the AM whether it is the valid attempt before it “commits” >>>>> its >>>>> data. For an HDFS based output, it would usually write all its data >>>>> into a >>>>> “/taskId/attemptId/” sub-dir and move its data into the parent >>>>> “taskId” dir >>>>> if the canCommit call returns true. >>>>> >>>>> thanks >>>>> — Hitesh >>>>> >>>>> On Jul 21, 2014, at 10:51 AM, Bikas Saha <[email protected]> >>>>> wrote: >>>>> >>>>> > If it’s the race condition that you are worried about then you should >>>>> look at o.a.t.r.a. OutputCommitter.java >>>>> > >>>>> > Whenever an output is specified, one can also specify a committer. >>>>> The >>>>> committer is executed after all the outputs have been written (at the >>>>> end >>>>> of that vertex completion or at the end of dag completion). Its job is >>>>> to >>>>> “commit” the output in a way that makes sense for that output. E.g. for >>>>> HDFS or any Hadoop Filesystem output, users typically specify a >>>>> FileOutputCommitter. It works in conjunction with the FileOutputFormat. >>>>> FileOutputFormat writes the individual task outputs to sub-dirs of the >>>>> actual output dir, thus avoiding collisions between speculative >>>>> executions >>>>> or re-executions. In the final output commit phase, the valid output >>>>> files >>>>> are moved to the actual output dir and the invalid ones are deleted. >>>>> > >>>>> > They are available in 0.5. If you are still in the POC phase of your >>>>> project, its recommended to work with the 0.5 (unreleased) as the APIs >>>>> have >>>>> been heavily cleaned up and simplified. Look at >>>>> OrderedPartitionedKVEdgeConfigurer or UnorderedPartitionedKVEdgeConf >>>>> igurer. >>>>> > >>>>> > From: Thaddeus Diamond [mailto:[email protected]] >>>>> > Sent: Sunday, July 20, 2014 9:02 PM >>>>> > To: [email protected] >>>>> > Cc: [email protected] >>>>> > Subject: Re: Writing to HDFS from an Output >>>>> > >>>>> > Okay so this means potentially this COULD be a race condition (though >>>>> presumably if you disable the speculative execution via conf it would >>>>> go >>>>> away). Would switching over to the new OutputFormat API solve this >>>>> issue >>>>> even if I don't use OutputCollector? I do want to be able to leverage >>>>> SE >>>>> when it's implemented. >>>>> > >>>>> > Quick aside: Is PartitionedKeyValue in 0.4.1? I am on branch >>>>> 0.4.1-incubating (79997ff). Couldn't find it. >>>>> > >>>>> > Thanks, >>>>> > Thad >>>>> > >>>>> > On Sun, Jul 20, 2014 at 8:08 PM, Bikas Saha <[email protected]> >>>>> wrote: >>>>> > The approach is correct from a purist point of view. >>>>> > >>>>> > Since Tez is data-type agnostic, there is no higher level entity for >>>>> handling logical data in Tez directly. However, input/output >>>>> implementations may provide them where it makes sense. Eg. The >>>>> PartitionedKeyValue outputs allow the specification of a partitioner >>>>> that >>>>> can partition key value data by the key. >>>>> > >>>>> > The MRHelper methods are mainly to help with MR compatibility, though >>>>> some of them are generic KeyValue helper methods that may be moved >>>>> into a >>>>> Tez (non-MR) helper utility. So depending on the method you are using, >>>>> you >>>>> may still be native Tez. >>>>> > >>>>> > IMO, MRInput may fall in the same vein. When dealing with KeyValue >>>>> data >>>>> types from disparate sources, the InputFormat and OutputFormat layers >>>>> form >>>>> a useful abstraction to do that translation. That’s why we decided to >>>>> include support for them instead of re-defining that translation layer. >>>>> This way we can leverage all the existing implementations of getting KV >>>>> data from HDFS/S3/LocalFiles/ZippedFiles/Text/RC etc. >>>>> > >>>>> > Speculation support is not there but tracked as a work item for a >>>>> near >>>>> term release, may be 0.6. >>>>> > >>>>> > Bikas >>>>> > >>>>> > From: Thaddeus Diamond [mailto:[email protected]] >>>>> > Sent: Sunday, July 20, 2014 4:27 PM >>>>> > To: [email protected] >>>>> > Subject: Writing to HDFS from an Output >>>>> > >>>>> > Hi, >>>>> > >>>>> > I'm trying to create a simple I/P/O to do the following: >>>>> > >>>>> > Input -> Generate data from Java objects (for now, just random >>>>> strings) >>>>> > Processor -> Bucket those strings into output groups >>>>> > Output -> Write each output group bucket to HDFS in a different file, >>>>> under the same subdirectory >>>>> > >>>>> > I have Input and Processor classes are uninteresting in this example. >>>>> The Output I've created (MyLogicalOutput) implements LogicalOutput and >>>>> creates a new file directly in HDFS using the Java API. This returns >>>>> an >>>>> FSDataOutputStream, which it then writes to. >>>>> > >>>>> > My question is this: is this the correct paradigm? I wondered if >>>>> there >>>>> were any native Tez abstractions like the OutputCollector in MR. >>>>> > >>>>> > Also, does Tez have speculative execution that could cause race >>>>> conditions? >>>>> > >>>>> > I don't want to use MRInput or any of the MRHelpers methods to >>>>> translate an existing MR job, I want this to be native Tez. >>>>> > >>>>> > Thanks! >>>>> > Thad >>>>> > >>>>> > CONFIDENTIALITY NOTICE >>>>> > NOTICE: This message is intended for the use of the individual or >>>>> entity to which it is addressed and may contain information that is >>>>> confidential, privileged and exempt from disclosure under applicable >>>>> law. >>>>> If the reader of this message is not the intended recipient, you are >>>>> hereby >>>>> notified that any printing, copying, dissemination, distribution, >>>>> disclosure or forwarding of this communication is strictly prohibited. >>>>> If >>>>> you have received this communication in error, please contact the >>>>> sender >>>>> immediately and delete it from your system. Thank You. >>>>> > >>>>> > >>>>> > CONFIDENTIALITY NOTICE >>>>> > NOTICE: This message is intended for the use of the individual or >>>>> entity to which it is addressed and may contain information that is >>>>> confidential, privileged and exempt from disclosure under applicable >>>>> law. >>>>> If the reader of this message is not the intended recipient, you are >>>>> hereby >>>>> notified that any printing, copying, dissemination, distribution, >>>>> disclosure or forwarding of this communication is strictly prohibited. >>>>> If >>>>> you have received this communication in error, please contact the >>>>> sender >>>>> immediately and delete it from your system. Thank You. >>>>> >>>>> >>>>> >>>> >>> >>> >> >
