Okay I'll look at that. Continuing down the thought of an OutputCommitter,
what happens if my code looks as follows:
Processor
--------------
if (context.canCommit()) {
outputCommitter.commit();
}
OutputCommitter
--------------
void commit() {
hdfsClient.move(temporaryFile, finalDestination);
}
What happens if the JVM crashes between the if statement and the commit()
function call. Will any other tasks be able to commit? Will my job just
fail or will there be incomplete results?
On Tue, Jul 22, 2014 at 1:21 PM, Bikas Saha <[email protected]> wrote:
> That’s correct. That’s why you should probably use MROutput to write to
> HDFS if you data is already Key-Value. Don’t think of MROutput as MR
> compatibility. Its more like a Key-Value data writer using the OutputFormat
> abstraction. We should perhaps change the name of MROutput to remove the MR
> connotation.
>
>
>
> Of course, if your data is not KeyValue or you have some other special
> needs, then yes you need to write your own output.
>
>
>
> Bikas
>
>
>
> *From:* Thaddeus Diamond [mailto:[email protected]]
> *Sent:* Tuesday, July 22, 2014 6:53 AM
> *To:* Gopal V
> *Cc:* [email protected]
>
> *Subject:* Re: Writing to HDFS from an Output
>
>
>
> Sorry, see the class now. My LogicalOutput would also have to implement
> OutputCommitter or presumably have a helper class that does.
>
>
>
> On Tue, Jul 22, 2014 at 9:49 AM, Thaddeus Diamond <
> [email protected]> wrote:
>
> I see. Now when you say "both output operations write to unique
> directories", this appears to be something I would have to enforce if I
> wrote my own LogicalOutput class, correct? Presumably I could add a
> commit() method to my LogicalOutput implementation that would actually
> perform the atomic HDFS move when the processor calls it from its close()
> method. Is that what you would suggest?
>
>
>
> On Tue, Jul 22, 2014 at 2:38 AM, Gopal V <[email protected]> wrote:
>
> On 7/22/14, 8:19 AM, Thaddeus Diamond wrote:
>
> Also is OnFileSortedOutput allowed to be terminal? And is it immune to the
> race conditions described? It seems like flush() is actually writing to a
> file stream on HDFS, without using canCommit(), which would have the
> aforementioned problems.
>
>
>
> OnFileSortedOutput is not terminal, it is a local file output mode which
> writes to the local filesystem, not HDFS. That serves as the connecting
> edge between two vertices and it is not even total order sorted.
>
> The scatter-gather edge does total order, if you use a
> TotalOrderPartitioner (or in MR-like terms "a reducer").
>
> You need something like MROutput, connected as a terminal operator after
> that total order scatter-gather (i.e send all partition0 to reducer0, have
> it write 00000_0).
>
> But for the sake of clarity, here is a vague simplification of how we
> avoid race conditions in output phase.
>
> There is a transactional system, which is similar to lock elision.
>
> There is no difference between how that works between task failure
> scenarios (split network, for instance) or speculation. It will be used if
> there are more than one attempt for a given task.
>
> The interface to that is called Task::canCommit(). It will return "true"
> for the very first task which gets there and false for every attempt after
> that (well, that's a simplification, but it sort of will do for now).
>
> To prevent clobbering each other's output files when 2 attempts are in
> flight (due to speculation or partial writes due to failure), both output
> operations write to unique directories.
>
> The unique directory name is something like
> <dest-dir>/_temporary/<task>/<attempt>/
>
> To commit an attempt as the final output, files from its temp dir will be
> moved to the final output location.
>
> For people more familiar with native locking, this is an elision with a
> visibility criteria instead of being truly mutual exclusion (i.e nothing
> stops two attempts from running in parallel, the first one to finish wins,
> the loser's side-effects are removed).
>
> This is how the system works fast without race conditions.
>
> Cheers,
> Gopal
>
>
>
>
>
> On Mon, Jul 21, 2014 at 10:15 PM, Thaddeus Diamond <
> [email protected]> wrote:
>
> Okay cool that makes sense. I will look into the OutputCommitter and
> OutputFormat plus upgrade to 0.5.
>
> Next question: In MR the OutputCollector provides a native way to do
> global sorting just by specifying the output key. Is there a way with the
> OutputCommitter or OutputFormat to do that? Basically I just want a way to
> do key-value output and have it be HDFS-sorted on disk.
>
> Thanks,
> Thad
>
>
> On Mon, Jul 21, 2014 at 2:02 PM, Hitesh Shah <[email protected]> wrote:
>
> Hi Thad,
>
> With respect to speculation related race conditions, the condition can
> hold true even if speculation is disabled. A task attempt may lose
> connection to the AM and a new attempt launched ( even though the earlier
> attempt continues to run). In either scenario, an Output has access to a
> canCommit() API that should be invoked from the task runtime. This does a
> check with the AM whether it is the valid attempt before it “commits” its
> data. For an HDFS based output, it would usually write all its data into a
> “/taskId/attemptId/” sub-dir and move its data into the parent “taskId” dir
> if the canCommit call returns true.
>
> thanks
> — Hitesh
>
> On Jul 21, 2014, at 10:51 AM, Bikas Saha <[email protected]> wrote:
>
> > If it’s the race condition that you are worried about then you should
> look at o.a.t.r.a. OutputCommitter.java
> >
> > Whenever an output is specified, one can also specify a committer. The
> committer is executed after all the outputs have been written (at the end
> of that vertex completion or at the end of dag completion). Its job is to
> “commit” the output in a way that makes sense for that output. E.g. for
> HDFS or any Hadoop Filesystem output, users typically specify a
> FileOutputCommitter. It works in conjunction with the FileOutputFormat.
> FileOutputFormat writes the individual task outputs to sub-dirs of the
> actual output dir, thus avoiding collisions between speculative executions
> or re-executions. In the final output commit phase, the valid output files
> are moved to the actual output dir and the invalid ones are deleted.
> >
> > They are available in 0.5. If you are still in the POC phase of your
> project, its recommended to work with the 0.5 (unreleased) as the APIs have
> been heavily cleaned up and simplified. Look at
> OrderedPartitionedKVEdgeConfigurer or UnorderedPartitionedKVEdgeConfigurer.
> >
> > From: Thaddeus Diamond [mailto:[email protected]]
> > Sent: Sunday, July 20, 2014 9:02 PM
> > To: [email protected]
> > Cc: [email protected]
> > Subject: Re: Writing to HDFS from an Output
> >
> > Okay so this means potentially this COULD be a race condition (though
> presumably if you disable the speculative execution via conf it would go
> away). Would switching over to the new OutputFormat API solve this issue
> even if I don't use OutputCollector? I do want to be able to leverage SE
> when it's implemented.
> >
> > Quick aside: Is PartitionedKeyValue in 0.4.1? I am on branch
> 0.4.1-incubating (79997ff). Couldn't find it.
> >
> > Thanks,
> > Thad
> >
> > On Sun, Jul 20, 2014 at 8:08 PM, Bikas Saha <[email protected]>
> wrote:
> > The approach is correct from a purist point of view.
> >
> > Since Tez is data-type agnostic, there is no higher level entity for
> handling logical data in Tez directly. However, input/output
> implementations may provide them where it makes sense. Eg. The
> PartitionedKeyValue outputs allow the specification of a partitioner that
> can partition key value data by the key.
> >
> > The MRHelper methods are mainly to help with MR compatibility, though
> some of them are generic KeyValue helper methods that may be moved into a
> Tez (non-MR) helper utility. So depending on the method you are using, you
> may still be native Tez.
> >
> > IMO, MRInput may fall in the same vein. When dealing with KeyValue data
> types from disparate sources, the InputFormat and OutputFormat layers form
> a useful abstraction to do that translation. That’s why we decided to
> include support for them instead of re-defining that translation layer.
> This way we can leverage all the existing implementations of getting KV
> data from HDFS/S3/LocalFiles/ZippedFiles/Text/RC etc.
> >
> > Speculation support is not there but tracked as a work item for a near
> term release, may be 0.6.
> >
> > Bikas
> >
> > From: Thaddeus Diamond [mailto:[email protected]]
> > Sent: Sunday, July 20, 2014 4:27 PM
> > To: [email protected]
> > Subject: Writing to HDFS from an Output
> >
> > Hi,
> >
> > I'm trying to create a simple I/P/O to do the following:
> >
> > Input -> Generate data from Java objects (for now, just random strings)
> > Processor -> Bucket those strings into output groups
> > Output -> Write each output group bucket to HDFS in a different file,
> under the same subdirectory
> >
> > I have Input and Processor classes are uninteresting in this example.
> The Output I've created (MyLogicalOutput) implements LogicalOutput and
> creates a new file directly in HDFS using the Java API. This returns an
> FSDataOutputStream, which it then writes to.
> >
> > My question is this: is this the correct paradigm? I wondered if there
> were any native Tez abstractions like the OutputCollector in MR.
> >
> > Also, does Tez have speculative execution that could cause race
> conditions?
> >
> > I don't want to use MRInput or any of the MRHelpers methods to
> translate an existing MR job, I want this to be native Tez.
> >
> > Thanks!
> > Thad
> >
> > CONFIDENTIALITY NOTICE
> > NOTICE: This message is intended for the use of the individual or
> entity to which it is addressed and may contain information that is
> confidential, privileged and exempt from disclosure under applicable law.
> If the reader of this message is not the intended recipient, you are hereby
> notified that any printing, copying, dissemination, distribution,
> disclosure or forwarding of this communication is strictly prohibited. If
> you have received this communication in error, please contact the sender
> immediately and delete it from your system. Thank You.
> >
> >
> > CONFIDENTIALITY NOTICE
> > NOTICE: This message is intended for the use of the individual or
> entity to which it is addressed and may contain information that is
> confidential, privileged and exempt from disclosure under applicable law.
> If the reader of this message is not the intended recipient, you are hereby
> notified that any printing, copying, dissemination, distribution,
> disclosure or forwarding of this communication is strictly prohibited. If
> you have received this communication in error, please contact the sender
> immediately and delete it from your system. Thank You.
>
>
>
>
>
>
>
>
>
>
>
> CONFIDENTIALITY NOTICE
> NOTICE: This message is intended for the use of the individual or entity
> to which it is addressed and may contain information that is confidential,
> privileged and exempt from disclosure under applicable law. If the reader
> of this message is not the intended recipient, you are hereby notified that
> any printing, copying, dissemination, distribution, disclosure or
> forwarding of this communication is strictly prohibited. If you have
> received this communication in error, please contact the sender immediately
> and delete it from your system. Thank You.
>