Jeremiah Lowin created AIRFLOW-825:
--------------------------------------

             Summary: Add Dataflow semantics
                 Key: AIRFLOW-825
                 URL: https://issues.apache.org/jira/browse/AIRFLOW-825
             Project: Apache Airflow
          Issue Type: Improvement
          Components: Dataflow
            Reporter: Jeremiah Lowin
            Assignee: Jeremiah Lowin


Following discussion on the dev list, this adds first-class Dataflow semantics 
to Airflow. 

Please see my PR for examples and unit tests. From the documentation:

A Dataflow object represents the result of an upstream task. If the upstream
task has multiple outputs contained in a tuple, dict, or other indexable form,
an index may be provided so the Dataflow only uses the appropriate output.


Dataflows are passed to downstream tasks with a key. This has two effects:
    1. It sets up a dependency between the upstream and downstream tasks to
       ensure that the downstream task does not run before the upstream result
       is available.
    2. It ensures that the [indexed] upstream result is available in the
       downstream task's context as ``context['dataflows'][key]``. In addition,
       the result will be passed directly to PythonOperators as a keyword
       argument.

Dataflows use the XCom mechanism to exchange data. Data is passed through the
following series of steps:
    1. After the upstream task runs, data is passed to the Dataflow object's
       _set_data() method.
    2. The Dataflow's serialize() method is called on the data. This method
       takes the data object and returns a representation that can be used to
       reconstruct it later.
    3. _set_data() stores the serialized result as an XCom.
    4. Before the downstream task runs, it calls the Dataflow _get_data()
       method.
    5. _get_data() retrieves the upstream XCom.
    6. The Dataflow's deserialize() method is called. This method takes the
       serialiezd representation and returns the data object.
    7. The data object is passed to the downstream task.

The basic Dataflow object has identity serialize and deserialize methods,
meaning data is stored directly in the Airflow database. Therefore, for
performance and practical reasons, basic Dataflows should not be used with
large or complex results.

Dataflows can easily be extended to use remote storage. In this case, the
serialize method should write the data in to storage and return a URI, which
will be stored as an XCom. The URI will be passed to deserialize() so that
the data can be downloaded and reconstructed.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to