If you want to go down this path then it may be easier for you to follow Sid’s comments about changing the existing intermediate Inputs/Outputs in order to make this happen. That may be far less work than writing your own IOs from scratch and dealing with Datamovement events and other details. This would be the way forward if you want to quickly hack up something to experiment with or demo.
However, if you are looking at this for something more stable and long term, then the following plan might be an overall better approach. Today the intermediate data Tez Inputs and Outputs (OrderedGroupedKVInput, OrderedPartitionedKVOutput etc.) are doing 2 things 1) Logical data partitioning for outputs and logical data merging for inputs 2) Physical data writing for outputs and physical data reading for inputs. In the above, 1) is a common operation because its logical and has identical semantics regardless of environment while 2) is potentially a pluggable environment specific activity. So we consider a project where we refactor 2) in the our existing Inputs/Outputs to put them behind an interface with the default being the current local file writer and HTTP reader. Then, going forward the core logical part of these Inputs/Outputs becomes reusable and retargetable to different physical targets – e.g. in-memory HDFS, or Tachyon or NFS or S3 etc. Thoughts? Bikas From: Raajay [mailto:[email protected]] Sent: Tuesday, December 8, 2015 5:50 PM To: [email protected] Subject: Re: Writing intermediate data Thanks for the valuable inputs. A quick clarification : " - Tez uses DataMovementEvents to inform the downstream vertex on where to pull data from. This information handshake is part of the Input/Output pair implementation." If the edges had type PERSISTED_RELIABLE, the information handshake is probably not needed. Is that right ? - Raajay On Tue, Dec 8, 2015 at 6:17 PM, Hitesh Shah <[email protected] <mailto:[email protected]> > wrote: The other way to look at this problem is that for a given edge between 2 vertices, the data format and transfer mechanism is governed by the Output of the upstream vertex and the Input of the downstream vertex. You can potentially write your own Input and Output pair that work with HDFS or tachyon for intermediate data but there are a few things to be aware of: - Depending on the operator, the data is expected to be potentially partitioned and/or sorted. This will drive how you store and read data - Tez uses DataMovementEvents to inform the downstream vertex on where to pull data from. This information handshake is part of the Input/Output pair implementation. - The failure systems also change depending on the kind of storage. Today, most edges uses type PERSISTED. This means that the data can survive the container going away but not if the node/machine disappears. If using HDFS, that would become type PERSISTED_RELIABLE. This means that the data is always reliably available ( even if the node on which the data was generated disappears ). I don’t believe this is handled yet so that would be a new enhancement to Tez to fix the failure semantics for such an edge. If you are using Hive, this would mean making changes to Hive too to change the DAG plan as needed. thanks — Hitesh On Dec 8, 2015, at 3:54 PM, Siddharth Seth <[email protected] <mailto:[email protected]> > wrote: > Using hdfs (or a filesystem other than local) is not supported yet. tmpfs > would be your best bet in that case - we have tested with this before, but > this has capacity limitations, and mixing tmpfs with regular disks does not > provide a deterministic mechanism of selecting memory as the intermediate > storage. > Not sure if tachyon has an nfs interface to access it - otherwise that could > have been an option. > > We have made simple changes in the past to use HDFS for shuffle - primarily > as experiments. None of that is available as patches, but IIRC - the changes > were not very complicated. This would involve changing the fetcher to skip > HTTP and use a pre-determined path on a specified filesystem to fetch data. > Also, the producer to write out to a specific path on a non-local FileSystem. > > On Mon, Dec 7, 2015 at 11:57 AM, Raajay <[email protected] > <mailto:[email protected]> > wrote: > I wish to setup a Tez data analysis framework, where the data resides in > memory. Currently, I have tez (and also Hive) setup such that it can read > from an in-memory filesystem like Tachyon. > > However, the intermediate data is still written to disk at the each > processing node. I considered writing to tmpfs, however, such a setup does > not fall back to disk gracefully. > > Does Tez have an interface to write intermediate data to HDFS like filesystem > ? If yes, what are the settings ? > > Does setting "yarn.nodemanager.local-dirs" to some HDFS or Tachyon URI > suffice ? > > Thanks, > Raajay >
