Furcy,
Yes knowing that all DAGs have some sort of access to the metastore is the
one big advantage I can see to adding a state storage to Airflow. I agree
that we shouldn't expect it to handle millions of operations a second, but
if every task in every DAG is reading and writing to the state then that is
potentially a lot of added load on the metastore. Again there are trade
offs. I certainly don't think it's the worst idea to add to Airflow, but I
don't think it is the best either.

But without any others voicing agreement with my viewpoint, I'll turn away
from the "should this be added" question, and move on to the "what should
it look like".

Jarek,
To that point what would be the reasoning for only allowing keys of a
particular structure like (dag_id, task_id, execution_date)? Why not allow
for the possibility of arbitrary keys? Putting restrictions on the keys
seems unnecessary. We can certainly add convenience methods to make
standardized keys based on a task instance or a task.

I would suggest we mirror much of structure of the Secrets backend code.

   - Create a BaseStateBackend abstract base class so that it is easy to
   implement other storage options.
   - Implement a MetastoreStateBackend as the first simple implementation
   of the abstract class.

I couldn't help myself, so here is a pastebin of a really quick rough draft
of what the BaseStateBackend abstract base class could look like:
https://pastebin.com/vKx8W9DG

It has a few methods that are left up to subclasses to implement and then a
few convenience functions that take the context dictionary passed to
operators execute method and build a few predefined keys. Obviously it's
lacking in documentation and type hinting and probably a few other issues,
but I think it covers the basics.

Chris

On Tue, Jun 2, 2020 at 8:00 AM Jarek Potiuk <[email protected]>
wrote:

> I personally think that *if* we decide to implement "state service" - it
> should be as simple as possible - and let operators decide what to do with
> it. And do not do anything with it by the Airflow Core.
>
> I think we should decide what key should be used for that data (task_id,
> dag_id, execution_date ? ) and delegate full management of the state
> lifecycle to Operator implementation. I see this as a "service" delivered
> by Airflow that the running task instance (by all instantiations of that
> task instance)  could use to access its own private data.
>
> It's more of a convenient "service" available to all operators without
> having to implement your own storage than anything else.
>
> I see the benefit for the users for that, I do not see now how abusing it
> can harm Airflow (and it's core) more than current abuse with Variables.
>
> J.
>
>
> On Tue, Jun 2, 2020 at 11:23 AM Furcy Pin <[email protected]> wrote:
>
>> Chris,
>>
>> I think that the main downside of asking users to use another external
>> storage like S3 or whatever is that it doesn't make Operator easily
>> reusable across the community.
>> If I open-source a LivyOperator that handles my problem by storing stuff
>> in Azure Blob Storage, someone on AWS won't be able to use it as-is and
>> will have to customize it,
>> or make a pull request to support S3. Then someone else will make a pull
>> request to support GS, etc. It is doable but I believe it would add
>> avoidable complexity on
>> the Operator's side.
>>
>> This is the main advantage I see with providing such functionality. I
>> think that MySQL and Postgres handle key-value use-cases just fine as long
>> as you create
>> an index for the key in the table that will store them. Of course it
>> won't handle millions of operations per second, but if you are using
>> Airflow for that you probably
>> are doing something wrong.
>>
>> I agree that in my mind Variables were meant to hold configuration and
>> secrets that can be updated manually by the users for certain use cases,
>> but not holding task internal states.
>> I don't really mind and will comply if the consensus is to use Variables
>> for that. The fact that Variables can be made read-only it a good point for
>> separating the XState use-case from it, though.
>>
>> One of the key questions if a StateService is implemented would be:
>> should the state be removed on retry or on reschedule or not at all?
>> Or maybe it's the Operator's job to know what to do depending on whether
>> it is retrying or rescheduling, but I'm not sure Operators have a way to
>> know that.
>>
>> Regards,
>>
>> Furcy
>>
>>
>> On Tue, 2 Jun 2020 at 11:03, Jarek Potiuk <[email protected]>
>> wrote:
>>
>>> I think no-one wants to remove xcom really :)
>>>
>>> On Tue, Jun 2, 2020 at 10:32 AM Michael Lutz <[email protected]>
>>> wrote:
>>>
>>>> please do not remove existing xcom functionality.   I am using it
>>>> extensively.   If you implement something more robust or elegant, that
>>>> would be fine.  I feel that a more robust state management system would be
>>>> helpful it feels like an area of improvement.
>>>>
>>>> On Tue, Jun 2, 2020, 3:05 AM Jarek Potiuk <[email protected]>
>>>> wrote:
>>>>
>>>>> I think this subject came so often, that I also change my mind slowly
>>>>> in favor of making an explicit state persistence "service".
>>>>>
>>>>> Whether it's only one key or more, it's secondary, I think but if
>>>>> users are already using Variables to keep state for tasks - this is a 
>>>>> clear
>>>>> sign that we miss a crucial feature and our users are abusing Airflow
>>>>> already in the way we try to prevent by not introducing "State service".
>>>>>
>>>>> With the recent SecretBackend implementation where Variables might be
>>>>> kept in a Secret backend - not only MetaStore - potentially you might have
>>>>> no write access to the backend. There is even no "write" support in the
>>>>> current "MetastoreBackend" implementation for writing variables. So we
>>>>> already have configurations where if we try to write variables and read it
>>>>> elsewhere might not work - as far as I can see. You can set several
>>>>> backends of course and the Metastore as the last fallback of course, but
>>>>> for me, it opens up different problems - what happens if the key is 
>>>>> present
>>>>> in both, tasks writes it to metastore, but another task reads it from the
>>>>> Secret Backend.
>>>>>
>>>>> I think it seems that variables are being abused in exactly the way we
>>>>> want to prevent the "StateService" to be abused - and shying away from 
>>>>> that
>>>>> is really like closing our eyes and pretending it's not happening.
>>>>>
>>>>> So maybe we can make a change AIP with this approach:
>>>>>
>>>>> 1) Variables -> mostly read-only (for tasks)  and used to keep
>>>>> configuration shared between workers (but not on a task level).
>>>>> 2) StateService (or wherever we call it) where we keep state
>>>>> information for specific dag + task + execution_date.
>>>>>
>>>>> J.
>>>>>
>>>>>
>>>>> On Tue, Jun 2, 2020 at 12:13 AM Daniel Standish <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Airflow already provides a mechanism for state persistence: the
>>>>>> Variable, and, with caveats and flaws, XCom.
>>>>>>
>>>>>> I personally persist state to the airflow metastore database for a
>>>>>> large percentage of our jobs.  They are incremental jobs and it is 
>>>>>> helpful
>>>>>> to keep track of watermark.
>>>>>>
>>>>>> I think that incremental jobs are probably very very common in
>>>>>> airflow implementations.  Though probably often times users resort to
>>>>>> imperfect vehicles for this such as `execution_date` or xcom.
>>>>>>
>>>>>> I have a very draftey draft aip that i haven't had enough time to
>>>>>> work on, which explores adding explicit support for state persistence to
>>>>>> airflow:
>>>>>> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence.
>>>>>> Though I understand it is a controversial idea.  (note: The AIP is not
>>>>>> ready for primetime.)
>>>>>>
>>>>>> I am of the mind that being able to persist some state is not a
>>>>>> fundamental change to airflow and would just add explicit (and more
>>>>>> user-friendly) support for something that is already quite common, and 
>>>>>> fits
>>>>>> fully within the wheelhouse of what airflow exists to do.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Mon, Jun 1, 2020 at 2:57 PM Chris Palmer <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> Furcy,
>>>>>>>
>>>>>>> To clarify, when I say that Airflow should not be in the business of
>>>>>>> keeping state about external systems, I specifically mean it shouldn't 
>>>>>>> be
>>>>>>> keeping state to be shared between task instances. I completely 
>>>>>>> understand
>>>>>>> that there may be external systems that are harder to work with, and 
>>>>>>> like
>>>>>>> in your case require the operator to be able to store some piece of
>>>>>>> information to make them idempotent. I just don't think that Airflow 
>>>>>>> should
>>>>>>> provide that storage mechanism.
>>>>>>>
>>>>>>> I would think that most users of Airflow have access to some sort of
>>>>>>> cloud storage like S3 (which are really just key-value stores), and it's
>>>>>>> easy enough to write your job_id or whatever value you care about to a 
>>>>>>> file
>>>>>>> with a prefix computed from the dag_id, task_id, execution_date or 
>>>>>>> whatever
>>>>>>> combination of them you care about. Yes it makes your operators more
>>>>>>> complex and they have to know about another system, but it keeps that
>>>>>>> complexity out of core Airflow. That's the trade off.
>>>>>>>
>>>>>>> Ash,
>>>>>>> I'm not suggesting that XCom be removed from Airflow, and I
>>>>>>> understand there are use cases where it makes some things convenient. In
>>>>>>> your example though, it would be just as easy for the sensor to write 
>>>>>>> the
>>>>>>> found object path as the contents of another file in S3, with a 
>>>>>>> computable
>>>>>>> prefix based on the dag/task/execution_date.
>>>>>>>
>>>>>>>
>>>>>>> At its heart XCom is just a key-value store where the keys are
>>>>>>> limited to a very specific set of possibilities, and where key-value 
>>>>>>> pairs
>>>>>>> are managed in some specific ways. The request here is to add another
>>>>>>> narrowly defined set of allowable keys, and as far as I can tell with no
>>>>>>> extra management of them. The only real advantage of using the Airflow
>>>>>>> database for XCom or any expansion/variation on it is that we know that 
>>>>>>> all
>>>>>>> operators have access to the database.
>>>>>>>
>>>>>>> I'm not an expert but I would wonder how well Postgres or MySQL
>>>>>>> perform as high volume key value stores. Does anyone actually use XCom 
>>>>>>> at
>>>>>>> scale, and does that extra load on the database impact scheduling and 
>>>>>>> other
>>>>>>> performance aspects of Airflow?
>>>>>>>
>>>>>>> Chris
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Jun 1, 2020 at 5:03 PM Ash Berlin-Taylor <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Just to touch on one point about XCom, and to re-assure people that
>>>>>>>> they, or something very like them are in Airflow for the foreseeable 
>>>>>>>> future.
>>>>>>>>
>>>>>>>> As an example of an appropriate use for XCom: Let's say a third
>>>>>>>> party delivers you a set of files once a week, but the exact name of 
>>>>>>>> the
>>>>>>>> files isn't known (by you) in advance. So you write a sensor that
>>>>>>>> polls/checks S3 for the Objects to appear in our bucket, and the sensor
>>>>>>>> outputs the S3 Object path to XCom, that then next processing step then
>>>>>>>> examines to process the files.
>>>>>>>>
>>>>>>>> That sort of use case is not going anywhere.
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> -ash
>>>>>>>>
>>>>>>>> On Jun 1 2020, at 7:37 pm, Chris Palmer <[email protected]> wrote:
>>>>>>>>
>>>>>>>> At the risk of repeating myself (from the previous thread that
>>>>>>>> touched on this topic), I don't think Airflow should be in the 
>>>>>>>> business of
>>>>>>>> keeping state about external systems. Airflow is about authoring and
>>>>>>>> running workflows; it's not a messaging tool or a cluster management 
>>>>>>>> tool.
>>>>>>>> I'm not convinced that the existing XCom functionality should really 
>>>>>>>> be a
>>>>>>>> part of Airflow, and I certainly don't think it should be expanded 
>>>>>>>> upon or
>>>>>>>> new variations added. I think storing state is especially risky, if 
>>>>>>>> for no
>>>>>>>> other reason than the fact that Airflow is not the source of truth 
>>>>>>>> about
>>>>>>>> those systems. It's very likely that at some times the "state" that 
>>>>>>>> Airflow
>>>>>>>> has saved will diverge from the actual state of the external system.
>>>>>>>> Handling that nicely, probably requires a bunch of custom code in the
>>>>>>>> operators/hooks anyway, so I don't think it saves anything in terms of
>>>>>>>> operator code complexity. Users would be much better served going to 
>>>>>>>> the
>>>>>>>> source of truth to determine state. If part of the problem is that 
>>>>>>>> Livy is
>>>>>>>> lacking in features (like being able to query the status of a 
>>>>>>>> particular
>>>>>>>> job_id) then I think it would be more appropriate to add the needed
>>>>>>>> features to that project. Airflow at its core shouldn't be concerned 
>>>>>>>> with
>>>>>>>> making up for failures of other tools.
>>>>>>>>
>>>>>>>> Also as can be seen by just this discussion, it's hard to keep
>>>>>>>> these extra features from expanding in scope. Jarek proposed something 
>>>>>>>> that
>>>>>>>> would just store a single string, and immediately Furcy wants to 
>>>>>>>> expand it
>>>>>>>> to store multiple strings. Either way we are really just talking about 
>>>>>>>> a
>>>>>>>> key-value store, and putting limits on how that key can be structured; 
>>>>>>>> the
>>>>>>>> key is made up of some predefined set of Airflow entities (for Jarek's
>>>>>>>> proposal) or some arbitrary key along with those Airflow entities 
>>>>>>>> (Furcy's
>>>>>>>> proposal).
>>>>>>>>
>>>>>>>> I know in the past that I had a situation where I wanted to reuse a
>>>>>>>> cluster across multiple data intervals, if one was already running 
>>>>>>>> (this
>>>>>>>> was before I discovered Airflow so wasn't "execution dates" 
>>>>>>>> precisely). I
>>>>>>>> can equally see use cases where I might want to share some resource for
>>>>>>>> multiple tasks in a DAG, or across similar tasks in multiple DAGs. So 
>>>>>>>> if we
>>>>>>>> added this then why limit it to any one of those combinations? But 
>>>>>>>> then we
>>>>>>>> just have an arbitrary key-value store. If you want to use Airflow for 
>>>>>>>> that
>>>>>>>> then you can use Variables, if you want to use something else then you 
>>>>>>>> can.
>>>>>>>>
>>>>>>>> Unless Airflow is doing some extra management of these key-values
>>>>>>>> in some way (like it does with clearing out XCom's on reruns), then I 
>>>>>>>> see
>>>>>>>> absolutely no added benefit. And even with some potential management by
>>>>>>>> Airflow I'm still not convinced that Airflow is the right place for it.
>>>>>>>>
>>>>>>>> Chris
>>>>>>>>
>>>>>>>> On Mon, Jun 1, 2020 at 1:19 PM Furcy Pin <[email protected]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>> Thank you Jarek for the detailed explanation,
>>>>>>>>
>>>>>>>> That's exactly what I wanted to do: write a feature request to
>>>>>>>> summarize all those discussions.
>>>>>>>> I agree with you that the feature should be marked distinct from
>>>>>>>> the XCom feature and that we should not piggyback this feature into 
>>>>>>>> XCom.
>>>>>>>>
>>>>>>>> The crux of the problem, I think is that with XCom you do want the
>>>>>>>> task to delete it's xcom on the beginning of the retry.
>>>>>>>> Correct me if I'm wrong but one use cases where it was necessary
>>>>>>>> was having a task A and a task B that starts immediately after A, and 
>>>>>>>> wait
>>>>>>>> from some 'signal' from A.
>>>>>>>> If A and B restart and A doesn't reset it's signal, then B will use
>>>>>>>> the signal from A's first try, which is incorrect.
>>>>>>>>
>>>>>>>> About the 3 solutions you mention:
>>>>>>>>
>>>>>>>> 1) Providing the job_id from outside. That works indeed. Sadly in
>>>>>>>> my use-case Livy's API is poorly designed and only returns a generated
>>>>>>>> job_id, you can't specify a custom one.
>>>>>>>> You can't even find a job by name, I would have to list all the
>>>>>>>> active job_ids, and do a GET for each of them to get it's name and find
>>>>>>>> which one is the one I want. It's doable but inelegant.
>>>>>>>>
>>>>>>>> 2) Store the id in an external storage. Of course it would work but
>>>>>>>> it requires an external storage. More on that below.
>>>>>>>>
>>>>>>>> 3) I'm not sure I understand completely what you mean there, but I
>>>>>>>> think you mean that the idempotency can be handled by the service you 
>>>>>>>> call
>>>>>>>> (for instance BigQuery). Indeed that is another solution. If we were 
>>>>>>>> using
>>>>>>>> Spark with a Hive metastore + locking or the deltalake storage format, 
>>>>>>>> we
>>>>>>>> could have something to prevent a job that run twice from creating
>>>>>>>> duplicates. This is another solution we are considering, but it is
>>>>>>>> coslty to change now.
>>>>>>>>
>>>>>>>> You guess correctly that the feature I was asking for me would be
>>>>>>>> to provide some utility to let the users implement solution 2) without
>>>>>>>> requiring an external storage.
>>>>>>>> I think it would be a QOL improvement for some use cases, just like
>>>>>>>> it could be argued that XCom is just a QOL improvement and users could 
>>>>>>>> have
>>>>>>>> used an external storage themselves.
>>>>>>>> The main advantage that it brings is making the custom operators
>>>>>>>> much easier to share and reuse across the Apache Airflow community,
>>>>>>>> compared to having to set up some external
>>>>>>>> storage.
>>>>>>>>
>>>>>>>> I have seen that some users used the metadata store itself as an
>>>>>>>> external storage by adding a new table to the airflow model:
>>>>>>>>
>>>>>>>> http://mail-archives.apache.org/mod_mbox/airflow-dev/201809.mbox/%3ccaerdx9ektwu5urq+pnq_8q-hb-nhtfnq_xwkggpxvo4mhb_...@mail.gmail.com%3e
>>>>>>>>
>>>>>>>> And others suggested using XCom itself as an external storage by
>>>>>>>> storing information with a special task_id:
>>>>>>>> https://stackoverflow.com/a/57515143/2087478
>>>>>>>>
>>>>>>>> In the discussion thread you provided it was also suggested to use
>>>>>>>> Variables to store some persisting information.
>>>>>>>>
>>>>>>>> These 3 approaches work but feel quite "hacky" and I believe that
>>>>>>>> providing such functionality would be good.
>>>>>>>>
>>>>>>>> Finally, I don't see the point of limiting the functionality to
>>>>>>>> such extent, providing a "IdempotencyIdStorage" that only allows you to
>>>>>>>> store a string
>>>>>>>> will just force people who need to store more than one id for one
>>>>>>>> task (for whatever reason) to use some hack again, like storing a json
>>>>>>>> inside the storage.
>>>>>>>>
>>>>>>>> I was more thinking about something quite similar to XCom (I liked
>>>>>>>> the XState name suggestion), where the entry would be keyed by 
>>>>>>>> "(dag_id,
>>>>>>>> task_id, execution_date, key)"
>>>>>>>> where "key" can be whatever you want and would be kept across
>>>>>>>> retries.
>>>>>>>>
>>>>>>>> I have read (quickly) through the "Pandora's Box" thread you
>>>>>>>> linked. Indeed it looks like there would be many ways to misuse such
>>>>>>>> feature.
>>>>>>>> I do understand the important of idempotency, and it looks like my
>>>>>>>> use case is one of the first ever listed where I do need to persist a 
>>>>>>>> state
>>>>>>>> across retries to make my operator really idempotent.
>>>>>>>>
>>>>>>>> I'm surprised no one came up with it given how frequent the Spark +
>>>>>>>> Airflow combination is (well, the BigQueryOperator was one too but 
>>>>>>>> found
>>>>>>>> another solution).
>>>>>>>>
>>>>>>>> Of course we can blame it on Livy for being poorly conceived
>>>>>>>> (unlike BigQuery) or we can blame it on Spark for not having a built-in
>>>>>>>> security mechanism to prevent double-writes,
>>>>>>>> but I think that as the above hacks show, you can't really prevent
>>>>>>>> users from shooting themselves in the foot if that's what they really 
>>>>>>>> want
>>>>>>>> to.
>>>>>>>>
>>>>>>>> While I do think that making things foolproof is important, I
>>>>>>>> believe it's also in Python's philosophy to *not* make things
>>>>>>>> foolproof at the detriment of simplicity for the right use cases.
>>>>>>>> But I do understand that the use cases are different and
>>>>>>>> contradictory: some would require the state to be persisted across
>>>>>>>> reschedule and not retries, mine would require the state to be 
>>>>>>>> persisted
>>>>>>>> across retries and not reschedule.
>>>>>>>>
>>>>>>>> Maybe the Airflow-y way for that would be to have one task that
>>>>>>>> does the submit and an xcom with the job, then one task that check the
>>>>>>>> progress of the job, but that feels very cumbersome to double the 
>>>>>>>> number of
>>>>>>>> tasks just for that. Plus I'm not sure we could make the first task 
>>>>>>>> retry
>>>>>>>> if the second task fails...
>>>>>>>>
>>>>>>>> Thanks again,
>>>>>>>>
>>>>>>>> Furcy
>>>>>>>>
>>>>>>>>
>>>>>>>> On Mon, 1 Jun 2020 at 16:01, Jarek Potiuk <[email protected]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>> I think we've discussed several approaches like that and using Xcom
>>>>>>>> name (which for many people would mean "let's just extend XCom table 
>>>>>>>> for
>>>>>>>> that" is not a very good idea to use it IMHO. I think this is very
>>>>>>>> different functionality/logic which we might or might not agree to
>>>>>>>> implement as a community. Naming it "Xcom" to trying to extend the XCom
>>>>>>>> table behavior might be problematic.
>>>>>>>>
>>>>>>>> Not sure if you are aware but we had very similar discussion about
>>>>>>>> it recently (without clear conclusions but at least you can see what 
>>>>>>>> kind
>>>>>>>> of issues/problems different people have with this approach)
>>>>>>>>
>>>>>>>> https://lists.apache.org/thread.html/rc6f56234342c87f154865489e3a6555609e4b98a8c62ca4997cb6a6c%40%3Cdev.airflow.apache.org%3E
>>>>>>>>
>>>>>>>> I am not saying it is impossible to do, but I think it's a matter
>>>>>>>> of how we formulate the "use case". It's very tempting to implement a
>>>>>>>> generic - intra-task communication mechanism, indeed. But it can very
>>>>>>>> easily lead to people abusing it and bypassing the guarantees 
>>>>>>>> (idempotency
>>>>>>>> mainly) that Airflow provides for backfilling and re-running tasks. I
>>>>>>>> thought a bit after the latest discussion kind of died out, and I have 
>>>>>>>> one
>>>>>>>> possible solution to the problem.
>>>>>>>>
>>>>>>>> Let me explain what I think about it (but others can have different
>>>>>>>> opinions of course):
>>>>>>>>
>>>>>>>> So far the discussion was that there are several ways to achieve
>>>>>>>> what you want (and it's really about what entity is providing the
>>>>>>>> "idempotency" guarantee:
>>>>>>>>
>>>>>>>> 1) Similarly as just merged in the BigQuery Insert Job
>>>>>>>> https://github.com/apache/airflow/pull/8868/files - you can
>>>>>>>> provide job_id from outside. You'd need to work out the job_id naming 
>>>>>>>> that
>>>>>>>> works in your case and make sure that when you re-run your task with 
>>>>>>>> the
>>>>>>>> same (dag_id, task_id, execution date) you will get the same id. Then 
>>>>>>>> the
>>>>>>>> "uniqueness" thus idempotency is handled by the logic written in the 
>>>>>>>> DAG.
>>>>>>>>
>>>>>>>> 2) Store the DAG id in some external storage (via one of the hooks
>>>>>>>> - where it can be queried in the way that will work for you). Then the
>>>>>>>> idempotency is actually handled by the logic in your Operator + some
>>>>>>>> external storage.
>>>>>>>>
>>>>>>>> 3) Query your service and retrieve the JOB ID from it - but you
>>>>>>>> have to have a way to query for the job related to your "dag id  + task
>>>>>>>> + execution_date". Then - the idempotency is actually handling by the
>>>>>>>> Service you are using.
>>>>>>>>
>>>>>>>> In the use case, you describe - this is the only thing you need -
>>>>>>>> "idempotency source". I believe you would like to get the case 2) from
>>>>>>>> above but without having to use external storage to store the "unique 
>>>>>>>> id".
>>>>>>>> Something that will let each task in the same dag run to set or 
>>>>>>>> retrieve a
>>>>>>>> unique value for that particular task. One value should be enough -
>>>>>>>> assuming that each operator/task works on one external data "source".
>>>>>>>>
>>>>>>>> My current thinking is:
>>>>>>>>
>>>>>>>> Why don't we provide such a dedicated, idempotency service inside
>>>>>>>> Airflow? We already have a DB and we could have 
>>>>>>>> an"IdempotencyIdStorage"
>>>>>>>> class with two methods:
>>>>>>>>
>>>>>>>>   * .set(id: str) and
>>>>>>>>   * .get() -> str
>>>>>>>>
>>>>>>>> And the data stored there should be a string keyed by "dag_id,
>>>>>>>> task_id, execution_date)" - available also via Jinja templating. There 
>>>>>>>> is
>>>>>>>> no intra-task communication, here, very little possibility of abuse 
>>>>>>>> and it
>>>>>>>> seems to solve the major pain point where you have to provide your own
>>>>>>>> storage to get the idempotency if your service does not provide one or 
>>>>>>>> you
>>>>>>>> do not want to delegate it to the DAG writer.
>>>>>>>>
>>>>>>>> J.
>>>>>>>>
>>>>>>>>
>>>>>>>> On Mon, Jun 1, 2020 at 2:12 PM Furcy Pin <[email protected]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>> The use case I'm referring to is that you can't use xcom to let a
>>>>>>>> task read information from it's past attempts, because when a task 
>>>>>>>> starts
>>>>>>>> it's xcom is automatically deleted.
>>>>>>>>
>>>>>>>> My specific use case is that we have a custom LivyOperator that
>>>>>>>> calls Livy to start batch Spark Jobs.
>>>>>>>> When you start a batch job Livy returns a job_id
>>>>>>>> Sometimes our operator can fail for one reason or another (for
>>>>>>>> instance if Livy is unreachable for a while)
>>>>>>>> When the task retries, it calls Livy again, which start the same
>>>>>>>> spark job, but the problem is that the spark job from the first 
>>>>>>>> attempt can
>>>>>>>> still be running,
>>>>>>>> and then we have a batch job that runs twice simultaneously and
>>>>>>>> creates duplicates in the output.
>>>>>>>>
>>>>>>>> What we tried to do is getting the job_id from the first try, to
>>>>>>>> check if the job is still running, and wait for it to complete if it 
>>>>>>>> is.
>>>>>>>>
>>>>>>>> We tried using xcom to let the task send a message to itself (to
>>>>>>>> it's next try) but xcom is meant for "inter-task communication" only so
>>>>>>>> this doesn't work and is not intended to work.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Mon, 1 Jun 2020 at 13:15, Ash Berlin-Taylor <[email protected]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>> Hi Furcy,
>>>>>>>>
>>>>>>>> Can you give a concrete example of what you mean by intra-task
>>>>>>>> xcom? Depending your use case this may already be possible.
>>>>>>>>
>>>>>>>> On Jun 1 2020, at 11:45 am, Furcy Pin <[email protected]> wrote:
>>>>>>>>
>>>>>>>> Hello,
>>>>>>>>
>>>>>>>> I would like to open a feature request for Airflow to support
>>>>>>>> "intra-task xcom".
>>>>>>>>
>>>>>>>> It seems that there are several distinct use cases for it already
>>>>>>>> and only ugly workarounds and I wanted to list them in a JIRA
>>>>>>>> ticket.
>>>>>>>>
>>>>>>>> I wanted to summarize links to the use cases and past attempts,
>>>>>>>> and the recommended approach (which apparently would be to create
>>>>>>>> a distinct feature from xcom to support this, it could be calle
>>>>>>>> intra-com or self-com ?)
>>>>>>>>
>>>>>>>> Do you know if such ticket already exists? I couldn't find one.
>>>>>>>> Also I can't create any ticket due to some obscure bug (see my
>>>>>>>> other email).
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>>
>>>>>>>> Furcy
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>>
>>>>>>>> Jarek Potiuk
>>>>>>>> Polidea <https://www.polidea.com/> | Principal Software Engineer
>>>>>>>>
>>>>>>>> M: +48 660 796 129 <+48660796129>
>>>>>>>> [image: Polidea] <https://www.polidea.com/>
>>>>>>>>
>>>>>>>>
>>>>>
>>>>> --
>>>>>
>>>>> Jarek Potiuk
>>>>> Polidea <https://www.polidea.com/> | Principal Software Engineer
>>>>>
>>>>> M: +48 660 796 129 <+48660796129>
>>>>> [image: Polidea] <https://www.polidea.com/>
>>>>>
>>>>>
>>>
>>> --
>>>
>>> Jarek Potiuk
>>> Polidea <https://www.polidea.com/> | Principal Software Engineer
>>>
>>> M: +48 660 796 129 <+48660796129>
>>> [image: Polidea] <https://www.polidea.com/>
>>>
>>>
>
> --
>
> Jarek Potiuk
> Polidea <https://www.polidea.com/> | Principal Software Engineer
>
> M: +48 660 796 129 <+48660796129>
> [image: Polidea] <https://www.polidea.com/>
>
>

Reply via email to