Hi Thad, With respect to speculation related race conditions, the condition can hold true even if speculation is disabled. A task attempt may lose connection to the AM and a new attempt launched ( even though the earlier attempt continues to run). In either scenario, an Output has access to a canCommit() API that should be invoked from the task runtime. This does a check with the AM whether it is the valid attempt before it “commits” its data. For an HDFS based output, it would usually write all its data into a “/taskId/attemptId/” sub-dir and move its data into the parent “taskId” dir if the canCommit call returns true.
thanks — Hitesh On Jul 21, 2014, at 10:51 AM, Bikas Saha <[email protected]> wrote: > If it’s the race condition that you are worried about then you should look at > o.a.t.r.a. OutputCommitter.java > > Whenever an output is specified, one can also specify a committer. The > committer is executed after all the outputs have been written (at the end of > that vertex completion or at the end of dag completion). Its job is to > “commit” the output in a way that makes sense for that output. E.g. for HDFS > or any Hadoop Filesystem output, users typically specify a > FileOutputCommitter. It works in conjunction with the FileOutputFormat. > FileOutputFormat writes the individual task outputs to sub-dirs of the actual > output dir, thus avoiding collisions between speculative executions or > re-executions. In the final output commit phase, the valid output files are > moved to the actual output dir and the invalid ones are deleted. > > They are available in 0.5. If you are still in the POC phase of your project, > its recommended to work with the 0.5 (unreleased) as the APIs have been > heavily cleaned up and simplified. Look at OrderedPartitionedKVEdgeConfigurer > or UnorderedPartitionedKVEdgeConfigurer. > > From: Thaddeus Diamond [mailto:[email protected]] > Sent: Sunday, July 20, 2014 9:02 PM > To: [email protected] > Cc: [email protected] > Subject: Re: Writing to HDFS from an Output > > Okay so this means potentially this COULD be a race condition (though > presumably if you disable the speculative execution via conf it would go > away). Would switching over to the new OutputFormat API solve this issue > even if I don't use OutputCollector? I do want to be able to leverage SE > when it's implemented. > > Quick aside: Is PartitionedKeyValue in 0.4.1? I am on branch > 0.4.1-incubating (79997ff). Couldn't find it. > > Thanks, > Thad > > On Sun, Jul 20, 2014 at 8:08 PM, Bikas Saha <[email protected]> wrote: > The approach is correct from a purist point of view. > > Since Tez is data-type agnostic, there is no higher level entity for handling > logical data in Tez directly. However, input/output implementations may > provide them where it makes sense. Eg. The PartitionedKeyValue outputs allow > the specification of a partitioner that can partition key value data by the > key. > > The MRHelper methods are mainly to help with MR compatibility, though some of > them are generic KeyValue helper methods that may be moved into a Tez > (non-MR) helper utility. So depending on the method you are using, you may > still be native Tez. > > IMO, MRInput may fall in the same vein. When dealing with KeyValue data types > from disparate sources, the InputFormat and OutputFormat layers form a useful > abstraction to do that translation. That’s why we decided to include support > for them instead of re-defining that translation layer. This way we can > leverage all the existing implementations of getting KV data from > HDFS/S3/LocalFiles/ZippedFiles/Text/RC etc. > > Speculation support is not there but tracked as a work item for a near term > release, may be 0.6. > > Bikas > > From: Thaddeus Diamond [mailto:[email protected]] > Sent: Sunday, July 20, 2014 4:27 PM > To: [email protected] > Subject: Writing to HDFS from an Output > > Hi, > > I'm trying to create a simple I/P/O to do the following: > > Input -> Generate data from Java objects (for now, just random strings) > Processor -> Bucket those strings into output groups > Output -> Write each output group bucket to HDFS in a different file, under > the same subdirectory > > I have Input and Processor classes are uninteresting in this example. The > Output I've created (MyLogicalOutput) implements LogicalOutput and creates a > new file directly in HDFS using the Java API. This returns an > FSDataOutputStream, which it then writes to. > > My question is this: is this the correct paradigm? I wondered if there were > any native Tez abstractions like the OutputCollector in MR. > > Also, does Tez have speculative execution that could cause race conditions? > > I don't want to use MRInput or any of the MRHelpers methods to translate an > existing MR job, I want this to be native Tez. > > Thanks! > Thad > > CONFIDENTIALITY NOTICE > NOTICE: This message is intended for the use of the individual or entity to > which it is addressed and may contain information that is confidential, > privileged and exempt from disclosure under applicable law. If the reader of > this message is not the intended recipient, you are hereby notified that any > printing, copying, dissemination, distribution, disclosure or forwarding of > this communication is strictly prohibited. If you have received this > communication in error, please contact the sender immediately and delete it > from your system. Thank You. > > > CONFIDENTIALITY NOTICE > NOTICE: This message is intended for the use of the individual or entity to > which it is addressed and may contain information that is confidential, > privileged and exempt from disclosure under applicable law. If the reader of > this message is not the intended recipient, you are hereby notified that any > printing, copying, dissemination, distribution, disclosure or forwarding of > this communication is strictly prohibited. If you have received this > communication in error, please contact the sender immediately and delete it > from your system. Thank You.
