To clarify, by information handshake, I meant how to tell the downstream vertex tasks where the generating task wrote data to and also when to start reading data. If this can be somehow be pre-defined at the plan build time, sure, you probably don’t need a lot of info to be sent downstream as it can be driven via some config + rules ( i.e. configured base path + appId + vertexId/Name + taskId ). However, there are some gotchas involved in terms of when the downstream vertex can start reading data. If the downstream task starts reading data before the upstream task has finished completely, this should be fine as long as the upstream task does not fail and a new attempt is not launched. If a new attempt has to be launched, the downstream task would need to revert all processing from earlier data and replay the new attempt’s data for correctness. A simple answer for this can be a signal trigger say by touching a file saying task1 is done which informs downstream tasks that task1 data is ready to be read. And obviously ( similar to shuffle data ), if the data location is more dynamic or more heavily protected ( say using dynamically generated secrets/tokens ), then additional information needs to be sent downstream.
thanks — Hitesh On Dec 8, 2015, at 5:50 PM, Raajay <[email protected]> wrote: > Thanks for the valuable inputs. > > A quick clarification : > > " - Tez uses DataMovementEvents to inform the downstream vertex on where to > pull data from. This information handshake is part of the Input/Output pair > implementation." > > If the edges had type PERSISTED_RELIABLE, the information handshake is > probably not needed. Is that right ? > > - Raajay > > On Tue, Dec 8, 2015 at 6:17 PM, Hitesh Shah <[email protected]> wrote: > The other way to look at this problem is that for a given edge between 2 > vertices, the data format and transfer mechanism is governed by the Output of > the upstream vertex and the Input of the downstream vertex. You can > potentially write your own Input and Output pair that work with HDFS or > tachyon for intermediate data but there are a few things to be aware of: > - Depending on the operator, the data is expected to be potentially > partitioned and/or sorted. This will drive how you store and read data > - Tez uses DataMovementEvents to inform the downstream vertex on where to > pull data from. This information handshake is part of the Input/Output pair > implementation. > - The failure systems also change depending on the kind of storage. Today, > most edges uses type PERSISTED. This means that the data can survive the > container going away but not if the node/machine disappears. If using HDFS, > that would become type PERSISTED_RELIABLE. This means that the data is always > reliably available ( even if the node on which the data was generated > disappears ). I don’t believe this is handled yet so that would be a new > enhancement to Tez to fix the failure semantics for such an edge. > > If you are using Hive, this would mean making changes to Hive too to change > the DAG plan as needed. > > thanks > — Hitesh > > > On Dec 8, 2015, at 3:54 PM, Siddharth Seth <[email protected]> wrote: > > > Using hdfs (or a filesystem other than local) is not supported yet. tmpfs > > would be your best bet in that case - we have tested with this before, but > > this has capacity limitations, and mixing tmpfs with regular disks does not > > provide a deterministic mechanism of selecting memory as the intermediate > > storage. > > Not sure if tachyon has an nfs interface to access it - otherwise that > > could have been an option. > > > > We have made simple changes in the past to use HDFS for shuffle - primarily > > as experiments. None of that is available as patches, but IIRC - the > > changes were not very complicated. This would involve changing the fetcher > > to skip HTTP and use a pre-determined path on a specified filesystem to > > fetch data. Also, the producer to write out to a specific path on a > > non-local FileSystem. > > > > On Mon, Dec 7, 2015 at 11:57 AM, Raajay <[email protected]> wrote: > > I wish to setup a Tez data analysis framework, where the data resides in > > memory. Currently, I have tez (and also Hive) setup such that it can read > > from an in-memory filesystem like Tachyon. > > > > However, the intermediate data is still written to disk at the each > > processing node. I considered writing to tmpfs, however, such a setup does > > not fall back to disk gracefully. > > > > Does Tez have an interface to write intermediate data to HDFS like > > filesystem ? If yes, what are the settings ? > > > > Does setting "yarn.nodemanager.local-dirs" to some HDFS or Tachyon URI > > suffice ? > > > > Thanks, > > Raajay > > > >
