The processor is expected to get the commit permission from the AM (be told it’s the valid task) and then call output.commit().
The framework calls output.close() upon task completion irrespective of whether the task succeeded/failed/killed/could-not-commit. close() is for cleanup and not for commit. Bikas *From:* Thaddeus Diamond [mailto:[email protected]] *Sent:* Wednesday, July 23, 2014 1:25 PM *To:* [email protected] *Cc:* Gopal V *Subject:* Re: Writing to HDFS from an Output Also, can the LogicalOutput figure out if it is still valid? I want to commit() as the very last thing in LogicalOutput#close(). On Wed, Jul 23, 2014 at 12:56 PM, Thaddeus Diamond < [email protected]> wrote: Oh wait, OutputCommitter is called from the AM looking at the javadoc. Does it have a list of successful attempt id's or something that could allow it to iterate through batches and move files? On Tue, Jul 22, 2014 at 4:13 PM, Thaddeus Diamond < [email protected]> wrote: Okay I'll look at that. Continuing down the thought of an OutputCommitter, what happens if my code looks as follows: Processor -------------- if (context.canCommit()) { outputCommitter.commit(); } OutputCommitter -------------- void commit() { hdfsClient.move(temporaryFile, finalDestination); } What happens if the JVM crashes between the if statement and the commit() function call. Will any other tasks be able to commit? Will my job just fail or will there be incomplete results? On Tue, Jul 22, 2014 at 1:21 PM, Bikas Saha <[email protected]> wrote: That’s correct. That’s why you should probably use MROutput to write to HDFS if you data is already Key-Value. Don’t think of MROutput as MR compatibility. Its more like a Key-Value data writer using the OutputFormat abstraction. We should perhaps change the name of MROutput to remove the MR connotation. Of course, if your data is not KeyValue or you have some other special needs, then yes you need to write your own output. Bikas *From:* Thaddeus Diamond [mailto:[email protected]] *Sent:* Tuesday, July 22, 2014 6:53 AM *To:* Gopal V *Cc:* [email protected] *Subject:* Re: Writing to HDFS from an Output Sorry, see the class now. My LogicalOutput would also have to implement OutputCommitter or presumably have a helper class that does. On Tue, Jul 22, 2014 at 9:49 AM, Thaddeus Diamond < [email protected]> wrote: I see. Now when you say "both output operations write to unique directories", this appears to be something I would have to enforce if I wrote my own LogicalOutput class, correct? Presumably I could add a commit() method to my LogicalOutput implementation that would actually perform the atomic HDFS move when the processor calls it from its close() method. Is that what you would suggest? On Tue, Jul 22, 2014 at 2:38 AM, Gopal V <[email protected]> wrote: On 7/22/14, 8:19 AM, Thaddeus Diamond wrote: Also is OnFileSortedOutput allowed to be terminal? And is it immune to the race conditions described? It seems like flush() is actually writing to a file stream on HDFS, without using canCommit(), which would have the aforementioned problems. OnFileSortedOutput is not terminal, it is a local file output mode which writes to the local filesystem, not HDFS. That serves as the connecting edge between two vertices and it is not even total order sorted. The scatter-gather edge does total order, if you use a TotalOrderPartitioner (or in MR-like terms "a reducer"). You need something like MROutput, connected as a terminal operator after that total order scatter-gather (i.e send all partition0 to reducer0, have it write 00000_0). But for the sake of clarity, here is a vague simplification of how we avoid race conditions in output phase. There is a transactional system, which is similar to lock elision. There is no difference between how that works between task failure scenarios (split network, for instance) or speculation. It will be used if there are more than one attempt for a given task. The interface to that is called Task::canCommit(). It will return "true" for the very first task which gets there and false for every attempt after that (well, that's a simplification, but it sort of will do for now). To prevent clobbering each other's output files when 2 attempts are in flight (due to speculation or partial writes due to failure), both output operations write to unique directories. The unique directory name is something like <dest-dir>/_temporary/<task>/<attempt>/ To commit an attempt as the final output, files from its temp dir will be moved to the final output location. For people more familiar with native locking, this is an elision with a visibility criteria instead of being truly mutual exclusion (i.e nothing stops two attempts from running in parallel, the first one to finish wins, the loser's side-effects are removed). This is how the system works fast without race conditions. Cheers, Gopal On Mon, Jul 21, 2014 at 10:15 PM, Thaddeus Diamond < [email protected]> wrote: Okay cool that makes sense. I will look into the OutputCommitter and OutputFormat plus upgrade to 0.5. Next question: In MR the OutputCollector provides a native way to do global sorting just by specifying the output key. Is there a way with the OutputCommitter or OutputFormat to do that? Basically I just want a way to do key-value output and have it be HDFS-sorted on disk. Thanks, Thad On Mon, Jul 21, 2014 at 2:02 PM, Hitesh Shah <[email protected]> wrote: Hi Thad, With respect to speculation related race conditions, the condition can hold true even if speculation is disabled. A task attempt may lose connection to the AM and a new attempt launched ( even though the earlier attempt continues to run). In either scenario, an Output has access to a canCommit() API that should be invoked from the task runtime. This does a check with the AM whether it is the valid attempt before it “commits” its data. For an HDFS based output, it would usually write all its data into a “/taskId/attemptId/” sub-dir and move its data into the parent “taskId” dir if the canCommit call returns true. thanks — Hitesh On Jul 21, 2014, at 10:51 AM, Bikas Saha <[email protected]> wrote: > If it’s the race condition that you are worried about then you should look at o.a.t.r.a. OutputCommitter.java > > Whenever an output is specified, one can also specify a committer. The committer is executed after all the outputs have been written (at the end of that vertex completion or at the end of dag completion). Its job is to “commit” the output in a way that makes sense for that output. E.g. for HDFS or any Hadoop Filesystem output, users typically specify a FileOutputCommitter. It works in conjunction with the FileOutputFormat. FileOutputFormat writes the individual task outputs to sub-dirs of the actual output dir, thus avoiding collisions between speculative executions or re-executions. In the final output commit phase, the valid output files are moved to the actual output dir and the invalid ones are deleted. > > They are available in 0.5. If you are still in the POC phase of your project, its recommended to work with the 0.5 (unreleased) as the APIs have been heavily cleaned up and simplified. Look at OrderedPartitionedKVEdgeConfigurer or UnorderedPartitionedKVEdgeConfigurer. > > From: Thaddeus Diamond [mailto:[email protected]] > Sent: Sunday, July 20, 2014 9:02 PM > To: [email protected] > Cc: [email protected] > Subject: Re: Writing to HDFS from an Output > > Okay so this means potentially this COULD be a race condition (though presumably if you disable the speculative execution via conf it would go away). Would switching over to the new OutputFormat API solve this issue even if I don't use OutputCollector? I do want to be able to leverage SE when it's implemented. > > Quick aside: Is PartitionedKeyValue in 0.4.1? I am on branch 0.4.1-incubating (79997ff). Couldn't find it. > > Thanks, > Thad > > On Sun, Jul 20, 2014 at 8:08 PM, Bikas Saha <[email protected]> wrote: > The approach is correct from a purist point of view. > > Since Tez is data-type agnostic, there is no higher level entity for handling logical data in Tez directly. However, input/output implementations may provide them where it makes sense. Eg. The PartitionedKeyValue outputs allow the specification of a partitioner that can partition key value data by the key. > > The MRHelper methods are mainly to help with MR compatibility, though some of them are generic KeyValue helper methods that may be moved into a Tez (non-MR) helper utility. So depending on the method you are using, you may still be native Tez. > > IMO, MRInput may fall in the same vein. When dealing with KeyValue data types from disparate sources, the InputFormat and OutputFormat layers form a useful abstraction to do that translation. That’s why we decided to include support for them instead of re-defining that translation layer. This way we can leverage all the existing implementations of getting KV data from HDFS/S3/LocalFiles/ZippedFiles/Text/RC etc. > > Speculation support is not there but tracked as a work item for a near term release, may be 0.6. > > Bikas > > From: Thaddeus Diamond [mailto:[email protected]] > Sent: Sunday, July 20, 2014 4:27 PM > To: [email protected] > Subject: Writing to HDFS from an Output > > Hi, > > I'm trying to create a simple I/P/O to do the following: > > Input -> Generate data from Java objects (for now, just random strings) > Processor -> Bucket those strings into output groups > Output -> Write each output group bucket to HDFS in a different file, under the same subdirectory > > I have Input and Processor classes are uninteresting in this example. The Output I've created (MyLogicalOutput) implements LogicalOutput and creates a new file directly in HDFS using the Java API. This returns an FSDataOutputStream, which it then writes to. > > My question is this: is this the correct paradigm? I wondered if there were any native Tez abstractions like the OutputCollector in MR. > > Also, does Tez have speculative execution that could cause race conditions? > > I don't want to use MRInput or any of the MRHelpers methods to translate an existing MR job, I want this to be native Tez. > > Thanks! > Thad > > CONFIDENTIALITY NOTICE > NOTICE: This message is intended for the use of the individual or entity to which it is addressed and may contain information that is confidential, privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any printing, copying, dissemination, distribution, disclosure or forwarding of this communication is strictly prohibited. If you have received this communication in error, please contact the sender immediately and delete it from your system. Thank You. > > > CONFIDENTIALITY NOTICE > NOTICE: This message is intended for the use of the individual or entity to which it is addressed and may contain information that is confidential, privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any printing, copying, dissemination, distribution, disclosure or forwarding of this communication is strictly prohibited. If you have received this communication in error, please contact the sender immediately and delete it from your system. Thank You. CONFIDENTIALITY NOTICE NOTICE: This message is intended for the use of the individual or entity to which it is addressed and may contain information that is confidential, privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any printing, copying, dissemination, distribution, disclosure or forwarding of this communication is strictly prohibited. If you have received this communication in error, please contact the sender immediately and delete it from your system. Thank You. -- CONFIDENTIALITY NOTICE NOTICE: This message is intended for the use of the individual or entity to which it is addressed and may contain information that is confidential, privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any printing, copying, dissemination, distribution, disclosure or forwarding of this communication is strictly prohibited. If you have received this communication in error, please contact the sender immediately and delete it from your system. Thank You.
