Sorry, see the class now.  My LogicalOutput would also have to implement
OutputCommitter or presumably have a helper class that does.


On Tue, Jul 22, 2014 at 9:49 AM, Thaddeus Diamond <
[email protected]> wrote:

> I see.  Now when you say "both output operations write to unique
> directories", this appears to be something I would have to enforce if I
> wrote my own LogicalOutput class, correct?  Presumably I could add a
> commit() method to my LogicalOutput implementation that would actually
> perform the atomic HDFS move when the processor calls it from its close()
> method.  Is that what you would suggest?
>
>
> On Tue, Jul 22, 2014 at 2:38 AM, Gopal V <[email protected]> wrote:
>
>> On 7/22/14, 8:19 AM, Thaddeus Diamond wrote:
>>
>>> Also is OnFileSortedOutput allowed to be terminal?  And is it immune to
>>> the
>>> race conditions described?  It seems like flush() is actually writing to
>>> a
>>> file stream on HDFS, without using canCommit(), which would have the
>>> aforementioned problems.
>>>
>>
>> OnFileSortedOutput is not terminal, it is a local file output mode which
>> writes to the local filesystem, not HDFS. That serves as the connecting
>> edge between two vertices and it is not even total order sorted.
>>
>> The scatter-gather edge does total order, if you use a
>> TotalOrderPartitioner (or in MR-like terms "a reducer").
>>
>> You need something like MROutput, connected as a terminal operator after
>> that total order scatter-gather (i.e send all partition0 to reducer0, have
>> it write 00000_0).
>>
>> But for the sake of clarity, here is a vague simplification of how we
>> avoid race conditions in output phase.
>>
>> There is a transactional system, which is similar to lock elision.
>>
>> There is no difference between how that works between task failure
>> scenarios (split network, for instance) or speculation. It will be used if
>> there are more than one attempt for a given task.
>>
>> The interface to that is called Task::canCommit(). It will return "true"
>> for the very first task which gets there and false for every attempt after
>> that (well, that's a simplification, but it sort of will do for now).
>>
>> To prevent clobbering each other's output files when 2 attempts are in
>> flight (due to speculation or partial writes due to failure), both output
>> operations write to unique directories.
>>
>> The unique directory name is something like <dest-dir>/_temporary/<task>/<
>> attempt>/
>>
>> To commit an attempt as the final output, files from its temp dir will be
>> moved to the final output location.
>>
>> For people more familiar with native locking, this is an elision with a
>> visibility criteria instead of being truly mutual exclusion (i.e nothing
>> stops two attempts from running in parallel, the first one to finish wins,
>> the loser's side-effects are removed).
>>
>> This is how the system works fast without race conditions.
>>
>> Cheers,
>> Gopal
>>
>>
>>
>>>
>>> On Mon, Jul 21, 2014 at 10:15 PM, Thaddeus Diamond <
>>> [email protected]> wrote:
>>>
>>>  Okay cool that makes sense.  I will look into the OutputCommitter and
>>>> OutputFormat plus upgrade to 0.5.
>>>>
>>>> Next question: In MR the OutputCollector provides a native way to do
>>>> global sorting just by specifying the output key.  Is there a way with
>>>> the
>>>> OutputCommitter or OutputFormat to do that?  Basically I just want a
>>>> way to
>>>> do key-value output and have it be HDFS-sorted on disk.
>>>>
>>>> Thanks,
>>>> Thad
>>>>
>>>>
>>>> On Mon, Jul 21, 2014 at 2:02 PM, Hitesh Shah <[email protected]> wrote:
>>>>
>>>>  Hi Thad,
>>>>>
>>>>> With respect to speculation related race conditions, the condition can
>>>>> hold true even if speculation is disabled. A task attempt may lose
>>>>> connection to the AM and a new attempt launched ( even though the
>>>>> earlier
>>>>> attempt continues to run). In either scenario, an Output has access to
>>>>> a
>>>>> canCommit() API that should be invoked from the task runtime. This
>>>>> does a
>>>>> check with the AM whether it is the valid attempt before it “commits”
>>>>> its
>>>>> data. For an HDFS based output, it would usually write all its data
>>>>> into a
>>>>> “/taskId/attemptId/” sub-dir and move its data into the parent
>>>>> “taskId” dir
>>>>> if the canCommit call returns true.
>>>>>
>>>>> thanks
>>>>> — Hitesh
>>>>>
>>>>> On Jul 21, 2014, at 10:51 AM, Bikas Saha <[email protected]>
>>>>> wrote:
>>>>>
>>>>> > If it’s the race condition that you are worried about then you should
>>>>> look at o.a.t.r.a. OutputCommitter.java
>>>>> >
>>>>> > Whenever an output is specified, one can also specify a committer.
>>>>> The
>>>>> committer is executed after all the outputs have been written (at the
>>>>> end
>>>>> of that vertex completion or at the end of dag completion). Its job is
>>>>> to
>>>>> “commit” the output in a way that makes sense for that output. E.g. for
>>>>> HDFS or any Hadoop Filesystem output, users typically specify a
>>>>> FileOutputCommitter. It works in conjunction with the FileOutputFormat.
>>>>> FileOutputFormat writes the individual task outputs to sub-dirs of the
>>>>> actual output dir, thus avoiding collisions between speculative
>>>>> executions
>>>>> or re-executions. In the final output commit phase, the valid output
>>>>> files
>>>>> are moved to the actual output dir and the invalid ones are deleted.
>>>>> >
>>>>> > They are available in 0.5. If you are still in the POC phase of your
>>>>> project, its recommended to work with the 0.5 (unreleased) as the APIs
>>>>> have
>>>>> been heavily cleaned up and simplified. Look at
>>>>> OrderedPartitionedKVEdgeConfigurer or UnorderedPartitionedKVEdgeConf
>>>>> igurer.
>>>>> >
>>>>> > From: Thaddeus Diamond [mailto:[email protected]]
>>>>> > Sent: Sunday, July 20, 2014 9:02 PM
>>>>> > To: [email protected]
>>>>> > Cc: [email protected]
>>>>> > Subject: Re: Writing to HDFS from an Output
>>>>> >
>>>>> > Okay so this means potentially this COULD be a race condition (though
>>>>> presumably if you disable the speculative execution via conf it would
>>>>> go
>>>>> away).  Would switching over to the new OutputFormat API solve this
>>>>> issue
>>>>> even if I don't use OutputCollector?  I do want to be able to leverage
>>>>> SE
>>>>> when it's implemented.
>>>>> >
>>>>> > Quick aside: Is PartitionedKeyValue in 0.4.1?  I am on branch
>>>>> 0.4.1-incubating (79997ff).  Couldn't find it.
>>>>> >
>>>>> > Thanks,
>>>>> > Thad
>>>>> >
>>>>> > On Sun, Jul 20, 2014 at 8:08 PM, Bikas Saha <[email protected]>
>>>>> wrote:
>>>>> > The approach is correct from a purist point of view.
>>>>> >
>>>>> > Since Tez is data-type agnostic, there is no higher level entity for
>>>>> handling logical data in Tez directly. However, input/output
>>>>> implementations may provide them where it makes sense. Eg. The
>>>>> PartitionedKeyValue outputs allow the specification of a partitioner
>>>>> that
>>>>> can partition key value data by the key.
>>>>> >
>>>>> > The MRHelper methods are mainly to help with MR compatibility, though
>>>>> some of them are generic KeyValue helper methods that may be moved
>>>>> into a
>>>>> Tez (non-MR) helper utility. So depending on the method you are using,
>>>>> you
>>>>> may still be native Tez.
>>>>> >
>>>>> > IMO, MRInput may fall in the same vein. When dealing with KeyValue
>>>>> data
>>>>> types from disparate sources, the InputFormat and OutputFormat layers
>>>>> form
>>>>> a useful abstraction to do that translation. That’s why we decided to
>>>>> include support for them instead of re-defining that translation layer.
>>>>> This way we can leverage all the existing implementations of getting KV
>>>>> data from HDFS/S3/LocalFiles/ZippedFiles/Text/RC etc.
>>>>> >
>>>>> > Speculation support is not there but tracked as a work item for a
>>>>> near
>>>>> term release, may be 0.6.
>>>>> >
>>>>> > Bikas
>>>>> >
>>>>> > From: Thaddeus Diamond [mailto:[email protected]]
>>>>> > Sent: Sunday, July 20, 2014 4:27 PM
>>>>> > To: [email protected]
>>>>> > Subject: Writing to HDFS from an Output
>>>>> >
>>>>> > Hi,
>>>>> >
>>>>> > I'm trying to create a simple I/P/O to do the following:
>>>>> >
>>>>> > Input -> Generate data from Java objects (for now, just random
>>>>> strings)
>>>>> > Processor -> Bucket those strings into output groups
>>>>> > Output -> Write each output group bucket to HDFS in a different file,
>>>>> under the same subdirectory
>>>>> >
>>>>> > I have Input and Processor classes are uninteresting in this example.
>>>>>  The Output I've created (MyLogicalOutput) implements LogicalOutput and
>>>>> creates a new file directly in HDFS using the Java API.  This returns
>>>>> an
>>>>> FSDataOutputStream, which it then writes to.
>>>>> >
>>>>> > My question is this: is this the correct paradigm?  I wondered if
>>>>> there
>>>>> were any native Tez abstractions like the OutputCollector in MR.
>>>>> >
>>>>> > Also, does Tez have speculative execution that could cause race
>>>>> conditions?
>>>>> >
>>>>> > I don't want to use MRInput or any of the MRHelpers methods to
>>>>> translate an existing MR job, I want this to be native Tez.
>>>>> >
>>>>> > Thanks!
>>>>> > Thad
>>>>> >
>>>>> > CONFIDENTIALITY NOTICE
>>>>> > NOTICE: This message is intended for the use of the individual or
>>>>> entity to which it is addressed and may contain information that is
>>>>> confidential, privileged and exempt from disclosure under applicable
>>>>> law.
>>>>> If the reader of this message is not the intended recipient, you are
>>>>> hereby
>>>>> notified that any printing, copying, dissemination, distribution,
>>>>> disclosure or forwarding of this communication is strictly prohibited.
>>>>> If
>>>>> you have received this communication in error, please contact the
>>>>> sender
>>>>> immediately and delete it from your system. Thank You.
>>>>> >
>>>>> >
>>>>> > CONFIDENTIALITY NOTICE
>>>>> > NOTICE: This message is intended for the use of the individual or
>>>>> entity to which it is addressed and may contain information that is
>>>>> confidential, privileged and exempt from disclosure under applicable
>>>>> law.
>>>>> If the reader of this message is not the intended recipient, you are
>>>>> hereby
>>>>> notified that any printing, copying, dissemination, distribution,
>>>>> disclosure or forwarding of this communication is strictly prohibited.
>>>>> If
>>>>> you have received this communication in error, please contact the
>>>>> sender
>>>>> immediately and delete it from your system. Thank You.
>>>>>
>>>>>
>>>>>
>>>>
>>>
>>>
>>
>

Reply via email to