Raajay, I was able to locate a temporary patch which provided this functionality for broadcast. I don't think the patch will apply to the current code base, but should provide an idea around what needs to be done. I'll upload this in a bit to TEZ-2442. For the regular OrderedOutput-Input pair - you'll need to look at DefaultSorter/PipelinedSorter, FetcherOrderedGrouped, ShuffleScheduler, TezSpillRecord.
HTH - Sid On Thu, Dec 10, 2015 at 12:59 PM, Raajay <[email protected]> wrote: > I am looking to hack something up quick to see if there is any performance > improvement by using in-memory lookup for intermediate data. > > @Siddarth: I am not well versed with Tez code base. Which packages (source > class) should I be looking at to implement the hack you suggested ? > > Thanks, > Raajay. > > On Wed, Dec 9, 2015 at 4:30 PM, Bikas Saha <[email protected]> wrote: > >> If you want to go down this path then it may be easier for you to follow >> Sid’s comments about changing the existing intermediate Inputs/Outputs in >> order to make this happen. That may be far less work than writing your own >> IOs from scratch and dealing with Datamovement events and other details. >> This would be the way forward if you want to quickly hack up something to >> experiment with or demo. >> >> >> >> However, if you are looking at this for something more stable and long >> term, then the following plan might be an overall better approach. Today >> the intermediate data Tez Inputs and Outputs (OrderedGroupedKVInput, >> OrderedPartitionedKVOutput etc.) are doing 2 things >> >> 1) Logical data partitioning for outputs and logical data merging >> for inputs >> >> 2) Physical data writing for outputs and physical data reading for >> inputs. >> >> In the above, 1) is a common operation because its logical and has >> identical semantics regardless of environment while 2) is potentially a >> pluggable environment specific activity. So we consider a project where we >> refactor 2) in the our existing Inputs/Outputs to put them behind an >> interface with the default being the current local file writer and HTTP >> reader. Then, going forward the core logical part of these Inputs/Outputs >> becomes reusable and retargetable to different physical targets – e.g. >> in-memory HDFS, or Tachyon or NFS or S3 etc. >> >> >> >> Thoughts? >> >> Bikas >> >> >> >> *From:* Raajay [mailto:[email protected]] >> *Sent:* Tuesday, December 8, 2015 5:50 PM >> *To:* [email protected] >> *Subject:* Re: Writing intermediate data >> >> >> >> Thanks for the valuable inputs. >> >> >> >> A quick clarification : >> >> >> >> " - Tez uses DataMovementEvents to inform the downstream vertex on where >> to pull data from. This information handshake is part of the Input/Output >> pair implementation." >> >> >> >> If the edges had type PERSISTED_RELIABLE, the information handshake is >> probably not needed. Is that right ? >> >> >> >> - Raajay >> >> >> >> On Tue, Dec 8, 2015 at 6:17 PM, Hitesh Shah <[email protected]> wrote: >> >> The other way to look at this problem is that for a given edge between 2 >> vertices, the data format and transfer mechanism is governed by the Output >> of the upstream vertex and the Input of the downstream vertex. You can >> potentially write your own Input and Output pair that work with HDFS or >> tachyon for intermediate data but there are a few things to be aware of: >> - Depending on the operator, the data is expected to be potentially >> partitioned and/or sorted. This will drive how you store and read data >> - Tez uses DataMovementEvents to inform the downstream vertex on where >> to pull data from. This information handshake is part of the Input/Output >> pair implementation. >> - The failure systems also change depending on the kind of storage. >> Today, most edges uses type PERSISTED. This means that the data can survive >> the container going away but not if the node/machine disappears. If using >> HDFS, that would become type PERSISTED_RELIABLE. This means that the data >> is always reliably available ( even if the node on which the data was >> generated disappears ). I don’t believe this is handled yet so that would >> be a new enhancement to Tez to fix the failure semantics for such an edge. >> >> If you are using Hive, this would mean making changes to Hive too to >> change the DAG plan as needed. >> >> thanks >> — Hitesh >> >> >> >> On Dec 8, 2015, at 3:54 PM, Siddharth Seth <[email protected]> wrote: >> >> > Using hdfs (or a filesystem other than local) is not supported yet. >> tmpfs would be your best bet in that case - we have tested with this >> before, but this has capacity limitations, and mixing tmpfs with regular >> disks does not provide a deterministic mechanism of selecting memory as the >> intermediate storage. >> > Not sure if tachyon has an nfs interface to access it - otherwise that >> could have been an option. >> > >> > We have made simple changes in the past to use HDFS for shuffle - >> primarily as experiments. None of that is available as patches, but IIRC - >> the changes were not very complicated. This would involve changing the >> fetcher to skip HTTP and use a pre-determined path on a specified >> filesystem to fetch data. Also, the producer to write out to a specific >> path on a non-local FileSystem. >> > >> > On Mon, Dec 7, 2015 at 11:57 AM, Raajay <[email protected]> wrote: >> > I wish to setup a Tez data analysis framework, where the data resides >> in memory. Currently, I have tez (and also Hive) setup such that it can >> read from an in-memory filesystem like Tachyon. >> > >> > However, the intermediate data is still written to disk at the each >> processing node. I considered writing to tmpfs, however, such a setup does >> not fall back to disk gracefully. >> > >> > Does Tez have an interface to write intermediate data to HDFS like >> filesystem ? If yes, what are the settings ? >> > >> > Does setting "yarn.nodemanager.local-dirs" to some HDFS or Tachyon URI >> suffice ? >> > >> > Thanks, >> > Raajay >> > >> >> >> > >
