Okay this looks good.  Since I'm doing a move on commit() then the close()
method in the output would be a no-op (this seems fine).

Thanks!
Thad


On Wed, Jul 23, 2014 at 5:37 PM, Bikas Saha <[email protected]> wrote:

> The processor is expected to get the commit permission from the AM (be
> told it’s the valid task) and then call output.commit().
>
>
>
> The framework calls output.close() upon task completion irrespective of
> whether the task succeeded/failed/killed/could-not-commit. close() is for
> cleanup and not for commit.
>
>
>
> Bikas
>
>
>
> *From:* Thaddeus Diamond [mailto:[email protected]]
> *Sent:* Wednesday, July 23, 2014 1:25 PM
> *To:* [email protected]
> *Cc:* Gopal V
>
> *Subject:* Re: Writing to HDFS from an Output
>
>
>
> Also, can the LogicalOutput figure out if it is still valid?  I want to
> commit() as the very last thing in LogicalOutput#close().
>
>
>
> On Wed, Jul 23, 2014 at 12:56 PM, Thaddeus Diamond <
> [email protected]> wrote:
>
> Oh wait, OutputCommitter is called from the AM looking at the javadoc.
>  Does it have a list of successful attempt id's or something that could
> allow it to iterate through batches and move files?
>
>
>
> On Tue, Jul 22, 2014 at 4:13 PM, Thaddeus Diamond <
> [email protected]> wrote:
>
> Okay I'll look at that.  Continuing down the thought of an
> OutputCommitter, what happens if my code looks as follows:
>
>
>
> Processor
>
> --------------
>
> if (context.canCommit()) {
>
>   outputCommitter.commit();
>
> }
>
>
>
> OutputCommitter
>
> --------------
>
> void commit() {
>
>   hdfsClient.move(temporaryFile, finalDestination);
>
> }
>
>
>
> What happens if the JVM crashes between the if statement and the commit()
> function call.  Will any other tasks be able to commit?  Will my job just
> fail or will there be incomplete results?
>
>
>
> On Tue, Jul 22, 2014 at 1:21 PM, Bikas Saha <[email protected]> wrote:
>
> That’s correct. That’s why you should probably use MROutput to write to
> HDFS if you data is already Key-Value. Don’t think of MROutput as MR
> compatibility. Its more like a Key-Value data writer using the OutputFormat
> abstraction. We should perhaps change the name of MROutput to remove the MR
> connotation.
>
>
>
> Of course, if your data is not KeyValue or you have some other special
> needs, then yes you need to write your own output.
>
>
>
> Bikas
>
>
>
> *From:* Thaddeus Diamond [mailto:[email protected]]
> *Sent:* Tuesday, July 22, 2014 6:53 AM
> *To:* Gopal V
> *Cc:* [email protected]
>
>
> *Subject:* Re: Writing to HDFS from an Output
>
>
>
> Sorry, see the class now.  My LogicalOutput would also have to implement
> OutputCommitter or presumably have a helper class that does.
>
>
>
> On Tue, Jul 22, 2014 at 9:49 AM, Thaddeus Diamond <
> [email protected]> wrote:
>
> I see.  Now when you say "both output operations write to unique
> directories", this appears to be something I would have to enforce if I
> wrote my own LogicalOutput class, correct?  Presumably I could add a
> commit() method to my LogicalOutput implementation that would actually
> perform the atomic HDFS move when the processor calls it from its close()
> method.  Is that what you would suggest?
>
>
>
> On Tue, Jul 22, 2014 at 2:38 AM, Gopal V <[email protected]> wrote:
>
> On 7/22/14, 8:19 AM, Thaddeus Diamond wrote:
>
> Also is OnFileSortedOutput allowed to be terminal?  And is it immune to the
> race conditions described?  It seems like flush() is actually writing to a
> file stream on HDFS, without using canCommit(), which would have the
> aforementioned problems.
>
>
>
> OnFileSortedOutput is not terminal, it is a local file output mode which
> writes to the local filesystem, not HDFS. That serves as the connecting
> edge between two vertices and it is not even total order sorted.
>
> The scatter-gather edge does total order, if you use a
> TotalOrderPartitioner (or in MR-like terms "a reducer").
>
> You need something like MROutput, connected as a terminal operator after
> that total order scatter-gather (i.e send all partition0 to reducer0, have
> it write 00000_0).
>
> But for the sake of clarity, here is a vague simplification of how we
> avoid race conditions in output phase.
>
> There is a transactional system, which is similar to lock elision.
>
> There is no difference between how that works between task failure
> scenarios (split network, for instance) or speculation. It will be used if
> there are more than one attempt for a given task.
>
> The interface to that is called Task::canCommit(). It will return "true"
> for the very first task which gets there and false for every attempt after
> that (well, that's a simplification, but it sort of will do for now).
>
> To prevent clobbering each other's output files when 2 attempts are in
> flight (due to speculation or partial writes due to failure), both output
> operations write to unique directories.
>
> The unique directory name is something like
> <dest-dir>/_temporary/<task>/<attempt>/
>
> To commit an attempt as the final output, files from its temp dir will be
> moved to the final output location.
>
> For people more familiar with native locking, this is an elision with a
> visibility criteria instead of being truly mutual exclusion (i.e nothing
> stops two attempts from running in parallel, the first one to finish wins,
> the loser's side-effects are removed).
>
> This is how the system works fast without race conditions.
>
> Cheers,
> Gopal
>
>
>
>
>
> On Mon, Jul 21, 2014 at 10:15 PM, Thaddeus Diamond <
> [email protected]> wrote:
>
> Okay cool that makes sense.  I will look into the OutputCommitter and
> OutputFormat plus upgrade to 0.5.
>
> Next question: In MR the OutputCollector provides a native way to do
> global sorting just by specifying the output key.  Is there a way with the
> OutputCommitter or OutputFormat to do that?  Basically I just want a way to
> do key-value output and have it be HDFS-sorted on disk.
>
> Thanks,
> Thad
>
>
> On Mon, Jul 21, 2014 at 2:02 PM, Hitesh Shah <[email protected]> wrote:
>
> Hi Thad,
>
> With respect to speculation related race conditions, the condition can
> hold true even if speculation is disabled. A task attempt may lose
> connection to the AM and a new attempt launched ( even though the earlier
> attempt continues to run). In either scenario, an Output has access to a
> canCommit() API that should be invoked from the task runtime. This does a
> check with the AM whether it is the valid attempt before it “commits” its
> data. For an HDFS based output, it would usually write all its data into a
> “/taskId/attemptId/” sub-dir and move its data into the parent “taskId” dir
> if the canCommit call returns true.
>
> thanks
> — Hitesh
>
> On Jul 21, 2014, at 10:51 AM, Bikas Saha <[email protected]> wrote:
>
> > If it’s the race condition that you are worried about then you should
> look at o.a.t.r.a. OutputCommitter.java
> >
> > Whenever an output is specified, one can also specify a committer. The
> committer is executed after all the outputs have been written (at the end
> of that vertex completion or at the end of dag completion). Its job is to
> “commit” the output in a way that makes sense for that output. E.g. for
> HDFS or any Hadoop Filesystem output, users typically specify a
> FileOutputCommitter. It works in conjunction with the FileOutputFormat.
> FileOutputFormat writes the individual task outputs to sub-dirs of the
> actual output dir, thus avoiding collisions between speculative executions
> or re-executions. In the final output commit phase, the valid output files
> are moved to the actual output dir and the invalid ones are deleted.
> >
> > They are available in 0.5. If you are still in the POC phase of your
> project, its recommended to work with the 0.5 (unreleased) as the APIs have
> been heavily cleaned up and simplified. Look at
> OrderedPartitionedKVEdgeConfigurer or UnorderedPartitionedKVEdgeConfigurer.
> >
> > From: Thaddeus Diamond [mailto:[email protected]]
> > Sent: Sunday, July 20, 2014 9:02 PM
> > To: [email protected]
> > Cc: [email protected]
> > Subject: Re: Writing to HDFS from an Output
> >
> > Okay so this means potentially this COULD be a race condition (though
> presumably if you disable the speculative execution via conf it would go
> away).  Would switching over to the new OutputFormat API solve this issue
> even if I don't use OutputCollector?  I do want to be able to leverage SE
> when it's implemented.
> >
> > Quick aside: Is PartitionedKeyValue in 0.4.1?  I am on branch
> 0.4.1-incubating (79997ff).  Couldn't find it.
> >
> > Thanks,
> > Thad
> >
> > On Sun, Jul 20, 2014 at 8:08 PM, Bikas Saha <[email protected]>
> wrote:
> > The approach is correct from a purist point of view.
> >
> > Since Tez is data-type agnostic, there is no higher level entity for
> handling logical data in Tez directly. However, input/output
> implementations may provide them where it makes sense. Eg. The
> PartitionedKeyValue outputs allow the specification of a partitioner that
> can partition key value data by the key.
> >
> > The MRHelper methods are mainly to help with MR compatibility, though
> some of them are generic KeyValue helper methods that may be moved into a
> Tez (non-MR) helper utility. So depending on the method you are using, you
> may still be native Tez.
> >
> > IMO, MRInput may fall in the same vein. When dealing with KeyValue data
> types from disparate sources, the InputFormat and OutputFormat layers form
> a useful abstraction to do that translation. That’s why we decided to
> include support for them instead of re-defining that translation layer.
> This way we can leverage all the existing implementations of getting KV
> data from HDFS/S3/LocalFiles/ZippedFiles/Text/RC etc.
> >
> > Speculation support is not there but tracked as a work item for a near
> term release, may be 0.6.
> >
> > Bikas
> >
> > From: Thaddeus Diamond [mailto:[email protected]]
> > Sent: Sunday, July 20, 2014 4:27 PM
> > To: [email protected]
> > Subject: Writing to HDFS from an Output
> >
> > Hi,
> >
> > I'm trying to create a simple I/P/O to do the following:
> >
> > Input -> Generate data from Java objects (for now, just random strings)
> > Processor -> Bucket those strings into output groups
> > Output -> Write each output group bucket to HDFS in a different file,
> under the same subdirectory
> >
> > I have Input and Processor classes are uninteresting in this example.
>  The Output I've created (MyLogicalOutput) implements LogicalOutput and
> creates a new file directly in HDFS using the Java API.  This returns an
> FSDataOutputStream, which it then writes to.
> >
> > My question is this: is this the correct paradigm?  I wondered if there
> were any native Tez abstractions like the OutputCollector in MR.
> >
> > Also, does Tez have speculative execution that could cause race
> conditions?
> >
> > I don't want to use MRInput or any of the MRHelpers methods to
> translate an existing MR job, I want this to be native Tez.
> >
> > Thanks!
> > Thad
> >
> > CONFIDENTIALITY NOTICE
> > NOTICE: This message is intended for the use of the individual or
> entity to which it is addressed and may contain information that is
> confidential, privileged and exempt from disclosure under applicable law.
> If the reader of this message is not the intended recipient, you are hereby
> notified that any printing, copying, dissemination, distribution,
> disclosure or forwarding of this communication is strictly prohibited. If
> you have received this communication in error, please contact the sender
> immediately and delete it from your system. Thank You.
> >
> >
> > CONFIDENTIALITY NOTICE
> > NOTICE: This message is intended for the use of the individual or
> entity to which it is addressed and may contain information that is
> confidential, privileged and exempt from disclosure under applicable law.
> If the reader of this message is not the intended recipient, you are hereby
> notified that any printing, copying, dissemination, distribution,
> disclosure or forwarding of this communication is strictly prohibited. If
> you have received this communication in error, please contact the sender
> immediately and delete it from your system. Thank You.
>
>
>
>
>
>
>
>
>
>
>
>
> CONFIDENTIALITY NOTICE
> NOTICE: This message is intended for the use of the individual or entity
> to which it is addressed and may contain information that is confidential,
> privileged and exempt from disclosure under applicable law. If the reader
> of this message is not the intended recipient, you are hereby notified that
> any printing, copying, dissemination, distribution, disclosure or
> forwarding of this communication is strictly prohibited. If you have
> received this communication in error, please contact the sender immediately
> and delete it from your system. Thank You.
>
>
>
>
>
>
>
> CONFIDENTIALITY NOTICE
> NOTICE: This message is intended for the use of the individual or entity
> to which it is addressed and may contain information that is confidential,
> privileged and exempt from disclosure under applicable law. If the reader
> of this message is not the intended recipient, you are hereby notified that
> any printing, copying, dissemination, distribution, disclosure or
> forwarding of this communication is strictly prohibited. If you have
> received this communication in error, please contact the sender immediately
> and delete it from your system. Thank You.
>

Reply via email to