please do not remove existing xcom functionality. I am using it extensively. If you implement something more robust or elegant, that would be fine. I feel that a more robust state management system would be helpful it feels like an area of improvement.
On Tue, Jun 2, 2020, 3:05 AM Jarek Potiuk <[email protected]> wrote: > I think this subject came so often, that I also change my mind slowly in > favor of making an explicit state persistence "service". > > Whether it's only one key or more, it's secondary, I think but if users > are already using Variables to keep state for tasks - this is a clear sign > that we miss a crucial feature and our users are abusing Airflow already in > the way we try to prevent by not introducing "State service". > > With the recent SecretBackend implementation where Variables might be kept > in a Secret backend - not only MetaStore - potentially you might have no > write access to the backend. There is even no "write" support in the > current "MetastoreBackend" implementation for writing variables. So we > already have configurations where if we try to write variables and read it > elsewhere might not work - as far as I can see. You can set several > backends of course and the Metastore as the last fallback of course, but > for me, it opens up different problems - what happens if the key is present > in both, tasks writes it to metastore, but another task reads it from the > Secret Backend. > > I think it seems that variables are being abused in exactly the way we > want to prevent the "StateService" to be abused - and shying away from that > is really like closing our eyes and pretending it's not happening. > > So maybe we can make a change AIP with this approach: > > 1) Variables -> mostly read-only (for tasks) and used to keep > configuration shared between workers (but not on a task level). > 2) StateService (or wherever we call it) where we keep state information > for specific dag + task + execution_date. > > J. > > > On Tue, Jun 2, 2020 at 12:13 AM Daniel Standish <[email protected]> > wrote: > >> Airflow already provides a mechanism for state persistence: the Variable, >> and, with caveats and flaws, XCom. >> >> I personally persist state to the airflow metastore database for a large >> percentage of our jobs. They are incremental jobs and it is helpful to >> keep track of watermark. >> >> I think that incremental jobs are probably very very common in airflow >> implementations. Though probably often times users resort to imperfect >> vehicles for this such as `execution_date` or xcom. >> >> I have a very draftey draft aip that i haven't had enough time to work >> on, which explores adding explicit support for state persistence to >> airflow: >> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence. >> Though I understand it is a controversial idea. (note: The AIP is not >> ready for primetime.) >> >> I am of the mind that being able to persist some state is not a >> fundamental change to airflow and would just add explicit (and more >> user-friendly) support for something that is already quite common, and fits >> fully within the wheelhouse of what airflow exists to do. >> >> >> >> >> On Mon, Jun 1, 2020 at 2:57 PM Chris Palmer <[email protected]> wrote: >> >>> Furcy, >>> >>> To clarify, when I say that Airflow should not be in the business of >>> keeping state about external systems, I specifically mean it shouldn't be >>> keeping state to be shared between task instances. I completely understand >>> that there may be external systems that are harder to work with, and like >>> in your case require the operator to be able to store some piece of >>> information to make them idempotent. I just don't think that Airflow should >>> provide that storage mechanism. >>> >>> I would think that most users of Airflow have access to some sort of >>> cloud storage like S3 (which are really just key-value stores), and it's >>> easy enough to write your job_id or whatever value you care about to a file >>> with a prefix computed from the dag_id, task_id, execution_date or whatever >>> combination of them you care about. Yes it makes your operators more >>> complex and they have to know about another system, but it keeps that >>> complexity out of core Airflow. That's the trade off. >>> >>> Ash, >>> I'm not suggesting that XCom be removed from Airflow, and I understand >>> there are use cases where it makes some things convenient. In your example >>> though, it would be just as easy for the sensor to write the found object >>> path as the contents of another file in S3, with a computable prefix based >>> on the dag/task/execution_date. >>> >>> >>> At its heart XCom is just a key-value store where the keys are limited >>> to a very specific set of possibilities, and where key-value pairs are >>> managed in some specific ways. The request here is to add another narrowly >>> defined set of allowable keys, and as far as I can tell with no extra >>> management of them. The only real advantage of using the Airflow database >>> for XCom or any expansion/variation on it is that we know that all >>> operators have access to the database. >>> >>> I'm not an expert but I would wonder how well Postgres or MySQL perform >>> as high volume key value stores. Does anyone actually use XCom at scale, >>> and does that extra load on the database impact scheduling and other >>> performance aspects of Airflow? >>> >>> Chris >>> >>> >>> On Mon, Jun 1, 2020 at 5:03 PM Ash Berlin-Taylor <[email protected]> wrote: >>> >>>> Just to touch on one point about XCom, and to re-assure people that >>>> they, or something very like them are in Airflow for the foreseeable >>>> future. >>>> >>>> As an example of an appropriate use for XCom: Let's say a third party >>>> delivers you a set of files once a week, but the exact name of the files >>>> isn't known (by you) in advance. So you write a sensor that polls/checks S3 >>>> for the Objects to appear in our bucket, and the sensor outputs the S3 >>>> Object path to XCom, that then next processing step then examines to >>>> process the files. >>>> >>>> That sort of use case is not going anywhere. >>>> >>>> Cheers, >>>> -ash >>>> >>>> On Jun 1 2020, at 7:37 pm, Chris Palmer <[email protected]> wrote: >>>> >>>> At the risk of repeating myself (from the previous thread that touched >>>> on this topic), I don't think Airflow should be in the business of keeping >>>> state about external systems. Airflow is about authoring and running >>>> workflows; it's not a messaging tool or a cluster management tool. I'm not >>>> convinced that the existing XCom functionality should really be a part of >>>> Airflow, and I certainly don't think it should be expanded upon or new >>>> variations added. I think storing state is especially risky, if for no >>>> other reason than the fact that Airflow is not the source of truth about >>>> those systems. It's very likely that at some times the "state" that Airflow >>>> has saved will diverge from the actual state of the external system. >>>> Handling that nicely, probably requires a bunch of custom code in the >>>> operators/hooks anyway, so I don't think it saves anything in terms of >>>> operator code complexity. Users would be much better served going to the >>>> source of truth to determine state. If part of the problem is that Livy is >>>> lacking in features (like being able to query the status of a particular >>>> job_id) then I think it would be more appropriate to add the needed >>>> features to that project. Airflow at its core shouldn't be concerned with >>>> making up for failures of other tools. >>>> >>>> Also as can be seen by just this discussion, it's hard to keep these >>>> extra features from expanding in scope. Jarek proposed something that would >>>> just store a single string, and immediately Furcy wants to expand it to >>>> store multiple strings. Either way we are really just talking about a >>>> key-value store, and putting limits on how that key can be structured; the >>>> key is made up of some predefined set of Airflow entities (for Jarek's >>>> proposal) or some arbitrary key along with those Airflow entities (Furcy's >>>> proposal). >>>> >>>> I know in the past that I had a situation where I wanted to reuse a >>>> cluster across multiple data intervals, if one was already running (this >>>> was before I discovered Airflow so wasn't "execution dates" precisely). I >>>> can equally see use cases where I might want to share some resource for >>>> multiple tasks in a DAG, or across similar tasks in multiple DAGs. So if we >>>> added this then why limit it to any one of those combinations? But then we >>>> just have an arbitrary key-value store. If you want to use Airflow for that >>>> then you can use Variables, if you want to use something else then you can. >>>> >>>> Unless Airflow is doing some extra management of these key-values in >>>> some way (like it does with clearing out XCom's on reruns), then I see >>>> absolutely no added benefit. And even with some potential management by >>>> Airflow I'm still not convinced that Airflow is the right place for it. >>>> >>>> Chris >>>> >>>> On Mon, Jun 1, 2020 at 1:19 PM Furcy Pin <[email protected]> wrote: >>>> >>>> Thank you Jarek for the detailed explanation, >>>> >>>> That's exactly what I wanted to do: write a feature request to >>>> summarize all those discussions. >>>> I agree with you that the feature should be marked distinct from the >>>> XCom feature and that we should not piggyback this feature into XCom. >>>> >>>> The crux of the problem, I think is that with XCom you do want the task >>>> to delete it's xcom on the beginning of the retry. >>>> Correct me if I'm wrong but one use cases where it was necessary was >>>> having a task A and a task B that starts immediately after A, and wait from >>>> some 'signal' from A. >>>> If A and B restart and A doesn't reset it's signal, then B will use the >>>> signal from A's first try, which is incorrect. >>>> >>>> About the 3 solutions you mention: >>>> >>>> 1) Providing the job_id from outside. That works indeed. Sadly in my >>>> use-case Livy's API is poorly designed and only returns a generated job_id, >>>> you can't specify a custom one. >>>> You can't even find a job by name, I would have to list all the active >>>> job_ids, and do a GET for each of them to get it's name and find which one >>>> is the one I want. It's doable but inelegant. >>>> >>>> 2) Store the id in an external storage. Of course it would work but it >>>> requires an external storage. More on that below. >>>> >>>> 3) I'm not sure I understand completely what you mean there, but I >>>> think you mean that the idempotency can be handled by the service you call >>>> (for instance BigQuery). Indeed that is another solution. If we were using >>>> Spark with a Hive metastore + locking or the deltalake storage format, we >>>> could have something to prevent a job that run twice from creating >>>> duplicates. This is another solution we are considering, but it is >>>> coslty to change now. >>>> >>>> You guess correctly that the feature I was asking for me would be to >>>> provide some utility to let the users implement solution 2) without >>>> requiring an external storage. >>>> I think it would be a QOL improvement for some use cases, just like it >>>> could be argued that XCom is just a QOL improvement and users could have >>>> used an external storage themselves. >>>> The main advantage that it brings is making the custom operators much >>>> easier to share and reuse across the Apache Airflow community, compared to >>>> having to set up some external >>>> storage. >>>> >>>> I have seen that some users used the metadata store itself as an >>>> external storage by adding a new table to the airflow model: >>>> >>>> http://mail-archives.apache.org/mod_mbox/airflow-dev/201809.mbox/%3ccaerdx9ektwu5urq+pnq_8q-hb-nhtfnq_xwkggpxvo4mhb_...@mail.gmail.com%3e >>>> >>>> And others suggested using XCom itself as an external storage by >>>> storing information with a special task_id: >>>> https://stackoverflow.com/a/57515143/2087478 >>>> >>>> In the discussion thread you provided it was also suggested to use >>>> Variables to store some persisting information. >>>> >>>> These 3 approaches work but feel quite "hacky" and I believe that >>>> providing such functionality would be good. >>>> >>>> Finally, I don't see the point of limiting the functionality to such >>>> extent, providing a "IdempotencyIdStorage" that only allows you to store a >>>> string >>>> will just force people who need to store more than one id for one task >>>> (for whatever reason) to use some hack again, like storing a json inside >>>> the storage. >>>> >>>> I was more thinking about something quite similar to XCom (I liked the >>>> XState name suggestion), where the entry would be keyed by "(dag_id, >>>> task_id, execution_date, key)" >>>> where "key" can be whatever you want and would be kept across retries. >>>> >>>> I have read (quickly) through the "Pandora's Box" thread you linked. >>>> Indeed it looks like there would be many ways to misuse such feature. >>>> I do understand the important of idempotency, and it looks like my use >>>> case is one of the first ever listed where I do need to persist a state >>>> across retries to make my operator really idempotent. >>>> >>>> I'm surprised no one came up with it given how frequent the Spark + >>>> Airflow combination is (well, the BigQueryOperator was one too but found >>>> another solution). >>>> >>>> Of course we can blame it on Livy for being poorly conceived (unlike >>>> BigQuery) or we can blame it on Spark for not having a built-in security >>>> mechanism to prevent double-writes, >>>> but I think that as the above hacks show, you can't really prevent >>>> users from shooting themselves in the foot if that's what they really want >>>> to. >>>> >>>> While I do think that making things foolproof is important, I believe >>>> it's also in Python's philosophy to *not* make things foolproof at the >>>> detriment of simplicity for the right use cases. >>>> But I do understand that the use cases are different and contradictory: >>>> some would require the state to be persisted across reschedule and not >>>> retries, mine would require the state to be persisted across retries and >>>> not reschedule. >>>> >>>> Maybe the Airflow-y way for that would be to have one task that does >>>> the submit and an xcom with the job, then one task that check the progress >>>> of the job, but that feels very cumbersome to double the number of tasks >>>> just for that. Plus I'm not sure we could make the first task retry if the >>>> second task fails... >>>> >>>> Thanks again, >>>> >>>> Furcy >>>> >>>> >>>> On Mon, 1 Jun 2020 at 16:01, Jarek Potiuk <[email protected]> >>>> wrote: >>>> >>>> I think we've discussed several approaches like that and using Xcom >>>> name (which for many people would mean "let's just extend XCom table for >>>> that" is not a very good idea to use it IMHO. I think this is very >>>> different functionality/logic which we might or might not agree to >>>> implement as a community. Naming it "Xcom" to trying to extend the XCom >>>> table behavior might be problematic. >>>> >>>> Not sure if you are aware but we had very similar discussion about it >>>> recently (without clear conclusions but at least you can see what kind of >>>> issues/problems different people have with this approach) >>>> >>>> https://lists.apache.org/thread.html/rc6f56234342c87f154865489e3a6555609e4b98a8c62ca4997cb6a6c%40%3Cdev.airflow.apache.org%3E >>>> >>>> I am not saying it is impossible to do, but I think it's a matter of >>>> how we formulate the "use case". It's very tempting to implement a generic >>>> - intra-task communication mechanism, indeed. But it can very easily lead >>>> to people abusing it and bypassing the guarantees (idempotency mainly) that >>>> Airflow provides for backfilling and re-running tasks. I thought a bit >>>> after the latest discussion kind of died out, and I have one possible >>>> solution to the problem. >>>> >>>> Let me explain what I think about it (but others can have different >>>> opinions of course): >>>> >>>> So far the discussion was that there are several ways to achieve what >>>> you want (and it's really about what entity is providing the "idempotency" >>>> guarantee: >>>> >>>> 1) Similarly as just merged in the BigQuery Insert Job >>>> https://github.com/apache/airflow/pull/8868/files - you can provide >>>> job_id from outside. You'd need to work out the job_id naming that works in >>>> your case and make sure that when you re-run your task with the same >>>> (dag_id, task_id, execution date) you will get the same id. Then the >>>> "uniqueness" thus idempotency is handled by the logic written in the DAG. >>>> >>>> 2) Store the DAG id in some external storage (via one of the hooks - >>>> where it can be queried in the way that will work for you). Then the >>>> idempotency is actually handled by the logic in your Operator + some >>>> external storage. >>>> >>>> 3) Query your service and retrieve the JOB ID from it - but you have to >>>> have a way to query for the job related to your "dag id + task >>>> + execution_date". Then - the idempotency is actually handling by the >>>> Service you are using. >>>> >>>> In the use case, you describe - this is the only thing you need - >>>> "idempotency source". I believe you would like to get the case 2) from >>>> above but without having to use external storage to store the "unique id". >>>> Something that will let each task in the same dag run to set or retrieve a >>>> unique value for that particular task. One value should be enough - >>>> assuming that each operator/task works on one external data "source". >>>> >>>> My current thinking is: >>>> >>>> Why don't we provide such a dedicated, idempotency service inside >>>> Airflow? We already have a DB and we could have an"IdempotencyIdStorage" >>>> class with two methods: >>>> >>>> * .set(id: str) and >>>> * .get() -> str >>>> >>>> And the data stored there should be a string keyed by "dag_id, task_id, >>>> execution_date)" - available also via Jinja templating. There is no >>>> intra-task communication, here, very little possibility of abuse and it >>>> seems to solve the major pain point where you have to provide your own >>>> storage to get the idempotency if your service does not provide one or you >>>> do not want to delegate it to the DAG writer. >>>> >>>> J. >>>> >>>> >>>> On Mon, Jun 1, 2020 at 2:12 PM Furcy Pin <[email protected]> wrote: >>>> >>>> The use case I'm referring to is that you can't use xcom to let a task >>>> read information from it's past attempts, because when a task starts it's >>>> xcom is automatically deleted. >>>> >>>> My specific use case is that we have a custom LivyOperator that calls >>>> Livy to start batch Spark Jobs. >>>> When you start a batch job Livy returns a job_id >>>> Sometimes our operator can fail for one reason or another (for instance >>>> if Livy is unreachable for a while) >>>> When the task retries, it calls Livy again, which start the same spark >>>> job, but the problem is that the spark job from the first attempt can still >>>> be running, >>>> and then we have a batch job that runs twice simultaneously and creates >>>> duplicates in the output. >>>> >>>> What we tried to do is getting the job_id from the first try, to check >>>> if the job is still running, and wait for it to complete if it is. >>>> >>>> We tried using xcom to let the task send a message to itself (to it's >>>> next try) but xcom is meant for "inter-task communication" only so this >>>> doesn't work and is not intended to work. >>>> >>>> >>>> >>>> >>>> >>>> On Mon, 1 Jun 2020 at 13:15, Ash Berlin-Taylor <[email protected]> wrote: >>>> >>>> Hi Furcy, >>>> >>>> Can you give a concrete example of what you mean by intra-task xcom? >>>> Depending your use case this may already be possible. >>>> >>>> On Jun 1 2020, at 11:45 am, Furcy Pin <[email protected]> wrote: >>>> >>>> Hello, >>>> >>>> I would like to open a feature request for Airflow to support >>>> "intra-task xcom". >>>> >>>> It seems that there are several distinct use cases for it already >>>> and only ugly workarounds and I wanted to list them in a JIRA ticket. >>>> >>>> I wanted to summarize links to the use cases and past attempts, >>>> and the recommended approach (which apparently would be to create >>>> a distinct feature from xcom to support this, it could be calle >>>> intra-com or self-com ?) >>>> >>>> Do you know if such ticket already exists? I couldn't find one. >>>> Also I can't create any ticket due to some obscure bug (see my other >>>> email). >>>> >>>> Thanks, >>>> >>>> Furcy >>>> >>>> >>>> >>>> -- >>>> >>>> Jarek Potiuk >>>> Polidea <https://www.polidea.com/> | Principal Software Engineer >>>> >>>> M: +48 660 796 129 <+48660796129> >>>> [image: Polidea] <https://www.polidea.com/> >>>> >>>> > > -- > > Jarek Potiuk > Polidea <https://www.polidea.com/> | Principal Software Engineer > > M: +48 660 796 129 <+48660796129> > [image: Polidea] <https://www.polidea.com/> > >
