Raajay, I was able to locate a temporary patch which provided this
functionality for broadcast. I don't think the patch will apply to the
current code base, but should provide an idea around what needs to be done.
I'll upload this in a bit to TEZ-2442.
For the regular OrderedOutput-Input pair - you'll need to look at
DefaultSorter/PipelinedSorter, FetcherOrderedGrouped, ShuffleScheduler,
TezSpillRecord.

HTH
- Sid

On Thu, Dec 10, 2015 at 12:59 PM, Raajay <[email protected]> wrote:

> I am looking to hack something up quick to see if there is any performance
> improvement by using in-memory lookup for intermediate data.
>
> @Siddarth: I am not well versed with Tez code base. Which packages (source
> class) should I be looking at to implement the hack you suggested ?
>
> Thanks,
> Raajay.
>
> On Wed, Dec 9, 2015 at 4:30 PM, Bikas Saha <[email protected]> wrote:
>
>> If you want to go down this path then it may be easier for you to follow
>> Sid’s comments about changing the existing intermediate Inputs/Outputs in
>> order to make this happen. That may be far less work than writing your own
>> IOs from scratch and dealing with Datamovement events and other details.
>> This would be the way forward if you want to quickly hack up something to
>> experiment with or demo.
>>
>>
>>
>> However, if you are looking at this for something more stable and long
>> term, then the following plan might be an overall better approach. Today
>> the intermediate data Tez Inputs and Outputs (OrderedGroupedKVInput,
>> OrderedPartitionedKVOutput etc.) are doing 2 things
>>
>> 1)      Logical data partitioning for outputs and logical data merging
>> for inputs
>>
>> 2)      Physical data writing for outputs and physical data reading for
>> inputs.
>>
>> In the above, 1) is a common operation because its logical and has
>> identical semantics regardless of environment while 2) is potentially a
>> pluggable environment specific activity. So we consider a project where we
>> refactor 2) in the our existing Inputs/Outputs to put them behind an
>> interface with the default being the current local file writer and HTTP
>> reader. Then, going forward the core logical part of these Inputs/Outputs
>> becomes reusable and retargetable to different physical targets – e.g.
>> in-memory HDFS, or Tachyon or NFS or S3 etc.
>>
>>
>>
>> Thoughts?
>>
>> Bikas
>>
>>
>>
>> *From:* Raajay [mailto:[email protected]]
>> *Sent:* Tuesday, December 8, 2015 5:50 PM
>> *To:* [email protected]
>> *Subject:* Re: Writing intermediate data
>>
>>
>>
>> Thanks for the valuable inputs.
>>
>>
>>
>> A quick clarification :
>>
>>
>>
>> " - Tez uses DataMovementEvents to inform the downstream vertex on where
>> to pull data from. This information handshake is part of the Input/Output
>> pair implementation."
>>
>>
>>
>> If the edges had type PERSISTED_RELIABLE, the information handshake is
>> probably not needed. Is that right ?
>>
>>
>>
>> - Raajay
>>
>>
>>
>> On Tue, Dec 8, 2015 at 6:17 PM, Hitesh Shah <[email protected]> wrote:
>>
>> The other way to look at this problem is that for a given edge between 2
>> vertices, the data format and transfer mechanism is governed by the Output
>> of the upstream vertex and the Input of the downstream vertex. You can
>> potentially write your own Input and Output pair that work with HDFS or
>> tachyon for intermediate data but there are a few things to be aware of:
>>    - Depending on the operator, the data is expected to be potentially
>> partitioned and/or sorted. This will drive how you store and read data
>>    - Tez uses DataMovementEvents to inform the downstream vertex on where
>> to pull data from. This information handshake is part of the Input/Output
>> pair implementation.
>>    - The failure systems also change depending on the kind of storage.
>> Today, most edges uses type PERSISTED. This means that the data can survive
>> the container going away but not if the node/machine disappears. If using
>> HDFS, that would become type PERSISTED_RELIABLE. This means that the data
>> is always reliably available ( even if the node on which the data was
>> generated disappears ). I don’t believe this is handled yet so that would
>> be a new enhancement to Tez to fix the failure semantics for such an edge.
>>
>> If you are using Hive, this would mean making changes to Hive too to
>> change the DAG plan as needed.
>>
>> thanks
>> — Hitesh
>>
>>
>>
>> On Dec 8, 2015, at 3:54 PM, Siddharth Seth <[email protected]> wrote:
>>
>> > Using hdfs (or a filesystem other than local) is not supported yet.
>> tmpfs would be your best bet in that case - we have tested with this
>> before, but this has capacity limitations, and mixing tmpfs with regular
>> disks does not provide a deterministic mechanism of selecting memory as the
>> intermediate storage.
>> > Not sure if tachyon has an nfs interface to access it - otherwise that
>> could have been an option.
>> >
>> > We have made simple changes in the past to use HDFS for shuffle -
>> primarily as experiments. None of that is available as patches, but IIRC -
>> the changes were not very complicated. This would involve changing the
>> fetcher to skip HTTP and use a pre-determined path on a specified
>> filesystem to fetch data. Also, the producer to write out to a specific
>> path on a non-local FileSystem.
>> >
>> > On Mon, Dec 7, 2015 at 11:57 AM, Raajay <[email protected]> wrote:
>> > I wish to setup a Tez data analysis framework, where the data resides
>> in memory. Currently, I have tez (and also Hive) setup such that it can
>> read from an in-memory filesystem like Tachyon.
>> >
>> > However, the intermediate data is still written to disk at the each
>> processing node. I considered writing to tmpfs, however, such a setup does
>> not fall back to disk gracefully.
>> >
>> > Does Tez have an interface to write intermediate data to HDFS like
>> filesystem ? If yes, what are the settings ?
>> >
>> > Does setting "yarn.nodemanager.local-dirs" to some HDFS or Tachyon URI
>> suffice ?
>> >
>> > Thanks,
>> > Raajay
>> >
>>
>>
>>
>
>

Reply via email to