Definitely, let's connect. On Wed, Jul 22, 2020 at 11:26 PM Daniel Standish <[email protected]> wrote:
> We are using state persistence pretty heavily right now with plugin models > that I have called ProcessState and TaskState > > Our implementation might be too idiosyncratic to contribute to airflow, > but then again it might not. I would be happy to do a call to demo what we > are doing to see if there is any interest, and to receive guidance from > interested parties re what if anything might make sense in airflow. Kaxil > do you have any interest in that? > > On Thu, Jun 4, 2020 at 4:49 PM Kaxil Naik <[email protected]> wrote: > >> I definitely feel we can support this uses-cases by improving XCom. The >> concept of XCom was to allow sharing messages & state between tasks. >> >> Here is the first line from the docs about Xcom: >> >> XComs let tasks exchange messages, allowing more nuanced forms of control >> and shared state. The name is an abbreviation of “cross-communication”. >> >> I read the AIP ( >> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence >> ) from @Daniel Standish <[email protected]> , the "namespacing" of >> this state would be a good feature, >> XCom already allows that with "dag_id, task_id" key. *Solution 1* in the >> AIP would solve the issue without much impact and whilst >> maintaining backwards-compatiblilty. >> >> I am against the idea of using Secrets Backend for storing "State". >> Storing state for some kind of persistence should be "short-lived" and >> temporary. >> >> The "writers" & "readers" of both (Secrets & State) are different. >> Generally, Sysadmins / Teamleads are responsible for managing secrets >> (writing, rotating, auditing) etc whereas for State it is written via >> Airflow Workers and would (or should) be short-lived and you don't care by >> auditing or rotating the value in "State". >> >> The only problem that I can see in the current XCom implementation is 1) >> the use of execution_date and the fact that 2) XCom are cleared at the >> start. >> >> One of the issue we already want to address in Airflow is to remove the >> hard-requirement of "execution_date" for DagRun and TaskInstance. This >> would also help in fixing (1) above. >> >> (2) can be solved by a flag as mentioned in the AIP. >> >> Regards, >> Kaxil >> >> >> On Tue, Jun 2, 2020 at 8:05 AM Jarek Potiuk <[email protected]> >> wrote: >> >>> I think this subject came so often, that I also change my mind slowly in >>> favor of making an explicit state persistence "service". >>> >>> Whether it's only one key or more, it's secondary, I think but if users >>> are already using Variables to keep state for tasks - this is a clear sign >>> that we miss a crucial feature and our users are abusing Airflow already in >>> the way we try to prevent by not introducing "State service". >>> >>> With the recent SecretBackend implementation where Variables might be >>> kept in a Secret backend - not only MetaStore - potentially you might have >>> no write access to the backend. There is even no "write" support in the >>> current "MetastoreBackend" implementation for writing variables. So we >>> already have configurations where if we try to write variables and read it >>> elsewhere might not work - as far as I can see. You can set several >>> backends of course and the Metastore as the last fallback of course, but >>> for me, it opens up different problems - what happens if the key is present >>> in both, tasks writes it to metastore, but another task reads it from the >>> Secret Backend. >>> >>> I think it seems that variables are being abused in exactly the way we >>> want to prevent the "StateService" to be abused - and shying away from that >>> is really like closing our eyes and pretending it's not happening. >>> >>> So maybe we can make a change AIP with this approach: >>> >>> 1) Variables -> mostly read-only (for tasks) and used to keep >>> configuration shared between workers (but not on a task level). >>> 2) StateService (or wherever we call it) where we keep state information >>> for specific dag + task + execution_date. >>> >>> J. >>> >>> >>> On Tue, Jun 2, 2020 at 12:13 AM Daniel Standish <[email protected]> >>> wrote: >>> >>>> Airflow already provides a mechanism for state persistence: the >>>> Variable, and, with caveats and flaws, XCom. >>>> >>>> I personally persist state to the airflow metastore database for a >>>> large percentage of our jobs. They are incremental jobs and it is helpful >>>> to keep track of watermark. >>>> >>>> I think that incremental jobs are probably very very common in airflow >>>> implementations. Though probably often times users resort to imperfect >>>> vehicles for this such as `execution_date` or xcom. >>>> >>>> I have a very draftey draft aip that i haven't had enough time to work >>>> on, which explores adding explicit support for state persistence to >>>> airflow: >>>> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-30%3A+State+persistence. >>>> Though I understand it is a controversial idea. (note: The AIP is not >>>> ready for primetime.) >>>> >>>> I am of the mind that being able to persist some state is not a >>>> fundamental change to airflow and would just add explicit (and more >>>> user-friendly) support for something that is already quite common, and fits >>>> fully within the wheelhouse of what airflow exists to do. >>>> >>>> >>>> >>>> >>>> On Mon, Jun 1, 2020 at 2:57 PM Chris Palmer <[email protected]> wrote: >>>> >>>>> Furcy, >>>>> >>>>> To clarify, when I say that Airflow should not be in the business of >>>>> keeping state about external systems, I specifically mean it shouldn't be >>>>> keeping state to be shared between task instances. I completely understand >>>>> that there may be external systems that are harder to work with, and like >>>>> in your case require the operator to be able to store some piece of >>>>> information to make them idempotent. I just don't think that Airflow >>>>> should >>>>> provide that storage mechanism. >>>>> >>>>> I would think that most users of Airflow have access to some sort of >>>>> cloud storage like S3 (which are really just key-value stores), and it's >>>>> easy enough to write your job_id or whatever value you care about to a >>>>> file >>>>> with a prefix computed from the dag_id, task_id, execution_date or >>>>> whatever >>>>> combination of them you care about. Yes it makes your operators more >>>>> complex and they have to know about another system, but it keeps that >>>>> complexity out of core Airflow. That's the trade off. >>>>> >>>>> Ash, >>>>> I'm not suggesting that XCom be removed from Airflow, and I understand >>>>> there are use cases where it makes some things convenient. In your example >>>>> though, it would be just as easy for the sensor to write the found object >>>>> path as the contents of another file in S3, with a computable prefix based >>>>> on the dag/task/execution_date. >>>>> >>>>> >>>>> At its heart XCom is just a key-value store where the keys are limited >>>>> to a very specific set of possibilities, and where key-value pairs are >>>>> managed in some specific ways. The request here is to add another narrowly >>>>> defined set of allowable keys, and as far as I can tell with no extra >>>>> management of them. The only real advantage of using the Airflow database >>>>> for XCom or any expansion/variation on it is that we know that all >>>>> operators have access to the database. >>>>> >>>>> I'm not an expert but I would wonder how well Postgres or MySQL >>>>> perform as high volume key value stores. Does anyone actually use XCom at >>>>> scale, and does that extra load on the database impact scheduling and >>>>> other >>>>> performance aspects of Airflow? >>>>> >>>>> Chris >>>>> >>>>> >>>>> On Mon, Jun 1, 2020 at 5:03 PM Ash Berlin-Taylor <[email protected]> >>>>> wrote: >>>>> >>>>>> Just to touch on one point about XCom, and to re-assure people that >>>>>> they, or something very like them are in Airflow for the foreseeable >>>>>> future. >>>>>> >>>>>> As an example of an appropriate use for XCom: Let's say a third >>>>>> party delivers you a set of files once a week, but the exact name of the >>>>>> files isn't known (by you) in advance. So you write a sensor that >>>>>> polls/checks S3 for the Objects to appear in our bucket, and the sensor >>>>>> outputs the S3 Object path to XCom, that then next processing step then >>>>>> examines to process the files. >>>>>> >>>>>> That sort of use case is not going anywhere. >>>>>> >>>>>> Cheers, >>>>>> -ash >>>>>> >>>>>> On Jun 1 2020, at 7:37 pm, Chris Palmer <[email protected]> wrote: >>>>>> >>>>>> At the risk of repeating myself (from the previous thread that >>>>>> touched on this topic), I don't think Airflow should be in the business >>>>>> of >>>>>> keeping state about external systems. Airflow is about authoring and >>>>>> running workflows; it's not a messaging tool or a cluster management >>>>>> tool. >>>>>> I'm not convinced that the existing XCom functionality should really be a >>>>>> part of Airflow, and I certainly don't think it should be expanded upon >>>>>> or >>>>>> new variations added. I think storing state is especially risky, if for >>>>>> no >>>>>> other reason than the fact that Airflow is not the source of truth about >>>>>> those systems. It's very likely that at some times the "state" that >>>>>> Airflow >>>>>> has saved will diverge from the actual state of the external system. >>>>>> Handling that nicely, probably requires a bunch of custom code in the >>>>>> operators/hooks anyway, so I don't think it saves anything in terms of >>>>>> operator code complexity. Users would be much better served going to the >>>>>> source of truth to determine state. If part of the problem is that Livy >>>>>> is >>>>>> lacking in features (like being able to query the status of a particular >>>>>> job_id) then I think it would be more appropriate to add the needed >>>>>> features to that project. Airflow at its core shouldn't be concerned with >>>>>> making up for failures of other tools. >>>>>> >>>>>> Also as can be seen by just this discussion, it's hard to keep these >>>>>> extra features from expanding in scope. Jarek proposed something that >>>>>> would >>>>>> just store a single string, and immediately Furcy wants to expand it to >>>>>> store multiple strings. Either way we are really just talking about a >>>>>> key-value store, and putting limits on how that key can be structured; >>>>>> the >>>>>> key is made up of some predefined set of Airflow entities (for Jarek's >>>>>> proposal) or some arbitrary key along with those Airflow entities >>>>>> (Furcy's >>>>>> proposal). >>>>>> >>>>>> I know in the past that I had a situation where I wanted to reuse a >>>>>> cluster across multiple data intervals, if one was already running (this >>>>>> was before I discovered Airflow so wasn't "execution dates" precisely). I >>>>>> can equally see use cases where I might want to share some resource for >>>>>> multiple tasks in a DAG, or across similar tasks in multiple DAGs. So if >>>>>> we >>>>>> added this then why limit it to any one of those combinations? But then >>>>>> we >>>>>> just have an arbitrary key-value store. If you want to use Airflow for >>>>>> that >>>>>> then you can use Variables, if you want to use something else then you >>>>>> can. >>>>>> >>>>>> Unless Airflow is doing some extra management of these key-values in >>>>>> some way (like it does with clearing out XCom's on reruns), then I see >>>>>> absolutely no added benefit. And even with some potential management by >>>>>> Airflow I'm still not convinced that Airflow is the right place for it. >>>>>> >>>>>> Chris >>>>>> >>>>>> On Mon, Jun 1, 2020 at 1:19 PM Furcy Pin <[email protected]> wrote: >>>>>> >>>>>> Thank you Jarek for the detailed explanation, >>>>>> >>>>>> That's exactly what I wanted to do: write a feature request to >>>>>> summarize all those discussions. >>>>>> I agree with you that the feature should be marked distinct from the >>>>>> XCom feature and that we should not piggyback this feature into XCom. >>>>>> >>>>>> The crux of the problem, I think is that with XCom you do want the >>>>>> task to delete it's xcom on the beginning of the retry. >>>>>> Correct me if I'm wrong but one use cases where it was necessary was >>>>>> having a task A and a task B that starts immediately after A, and wait >>>>>> from >>>>>> some 'signal' from A. >>>>>> If A and B restart and A doesn't reset it's signal, then B will use >>>>>> the signal from A's first try, which is incorrect. >>>>>> >>>>>> About the 3 solutions you mention: >>>>>> >>>>>> 1) Providing the job_id from outside. That works indeed. Sadly in my >>>>>> use-case Livy's API is poorly designed and only returns a generated >>>>>> job_id, >>>>>> you can't specify a custom one. >>>>>> You can't even find a job by name, I would have to list all the >>>>>> active job_ids, and do a GET for each of them to get it's name and find >>>>>> which one is the one I want. It's doable but inelegant. >>>>>> >>>>>> 2) Store the id in an external storage. Of course it would work but >>>>>> it requires an external storage. More on that below. >>>>>> >>>>>> 3) I'm not sure I understand completely what you mean there, but I >>>>>> think you mean that the idempotency can be handled by the service you >>>>>> call >>>>>> (for instance BigQuery). Indeed that is another solution. If we were >>>>>> using >>>>>> Spark with a Hive metastore + locking or the deltalake storage format, we >>>>>> could have something to prevent a job that run twice from creating >>>>>> duplicates. This is another solution we are considering, but it is >>>>>> coslty to change now. >>>>>> >>>>>> You guess correctly that the feature I was asking for me would be to >>>>>> provide some utility to let the users implement solution 2) without >>>>>> requiring an external storage. >>>>>> I think it would be a QOL improvement for some use cases, just like >>>>>> it could be argued that XCom is just a QOL improvement and users could >>>>>> have >>>>>> used an external storage themselves. >>>>>> The main advantage that it brings is making the custom operators much >>>>>> easier to share and reuse across the Apache Airflow community, compared >>>>>> to >>>>>> having to set up some external >>>>>> storage. >>>>>> >>>>>> I have seen that some users used the metadata store itself as an >>>>>> external storage by adding a new table to the airflow model: >>>>>> >>>>>> http://mail-archives.apache.org/mod_mbox/airflow-dev/201809.mbox/%3ccaerdx9ektwu5urq+pnq_8q-hb-nhtfnq_xwkggpxvo4mhb_...@mail.gmail.com%3e >>>>>> >>>>>> And others suggested using XCom itself as an external storage by >>>>>> storing information with a special task_id: >>>>>> https://stackoverflow.com/a/57515143/2087478 >>>>>> >>>>>> In the discussion thread you provided it was also suggested to use >>>>>> Variables to store some persisting information. >>>>>> >>>>>> These 3 approaches work but feel quite "hacky" and I believe that >>>>>> providing such functionality would be good. >>>>>> >>>>>> Finally, I don't see the point of limiting the functionality to such >>>>>> extent, providing a "IdempotencyIdStorage" that only allows you to store >>>>>> a >>>>>> string >>>>>> will just force people who need to store more than one id for one >>>>>> task (for whatever reason) to use some hack again, like storing a json >>>>>> inside the storage. >>>>>> >>>>>> I was more thinking about something quite similar to XCom (I liked >>>>>> the XState name suggestion), where the entry would be keyed by "(dag_id, >>>>>> task_id, execution_date, key)" >>>>>> where "key" can be whatever you want and would be kept across retries. >>>>>> >>>>>> I have read (quickly) through the "Pandora's Box" thread you linked. >>>>>> Indeed it looks like there would be many ways to misuse such feature. >>>>>> I do understand the important of idempotency, and it looks like my >>>>>> use case is one of the first ever listed where I do need to persist a >>>>>> state >>>>>> across retries to make my operator really idempotent. >>>>>> >>>>>> I'm surprised no one came up with it given how frequent the Spark + >>>>>> Airflow combination is (well, the BigQueryOperator was one too but found >>>>>> another solution). >>>>>> >>>>>> Of course we can blame it on Livy for being poorly conceived (unlike >>>>>> BigQuery) or we can blame it on Spark for not having a built-in security >>>>>> mechanism to prevent double-writes, >>>>>> but I think that as the above hacks show, you can't really prevent >>>>>> users from shooting themselves in the foot if that's what they really >>>>>> want >>>>>> to. >>>>>> >>>>>> While I do think that making things foolproof is important, I believe >>>>>> it's also in Python's philosophy to *not* make things foolproof at >>>>>> the detriment of simplicity for the right use cases. >>>>>> But I do understand that the use cases are different and >>>>>> contradictory: some would require the state to be persisted across >>>>>> reschedule and not retries, mine would require the state to be persisted >>>>>> across retries and not reschedule. >>>>>> >>>>>> Maybe the Airflow-y way for that would be to have one task that does >>>>>> the submit and an xcom with the job, then one task that check the >>>>>> progress >>>>>> of the job, but that feels very cumbersome to double the number of tasks >>>>>> just for that. Plus I'm not sure we could make the first task retry if >>>>>> the >>>>>> second task fails... >>>>>> >>>>>> Thanks again, >>>>>> >>>>>> Furcy >>>>>> >>>>>> >>>>>> On Mon, 1 Jun 2020 at 16:01, Jarek Potiuk <[email protected]> >>>>>> wrote: >>>>>> >>>>>> I think we've discussed several approaches like that and using Xcom >>>>>> name (which for many people would mean "let's just extend XCom table for >>>>>> that" is not a very good idea to use it IMHO. I think this is very >>>>>> different functionality/logic which we might or might not agree to >>>>>> implement as a community. Naming it "Xcom" to trying to extend the XCom >>>>>> table behavior might be problematic. >>>>>> >>>>>> Not sure if you are aware but we had very similar discussion about it >>>>>> recently (without clear conclusions but at least you can see what kind of >>>>>> issues/problems different people have with this approach) >>>>>> >>>>>> https://lists.apache.org/thread.html/rc6f56234342c87f154865489e3a6555609e4b98a8c62ca4997cb6a6c%40%3Cdev.airflow.apache.org%3E >>>>>> >>>>>> I am not saying it is impossible to do, but I think it's a matter of >>>>>> how we formulate the "use case". It's very tempting to implement a >>>>>> generic >>>>>> - intra-task communication mechanism, indeed. But it can very easily lead >>>>>> to people abusing it and bypassing the guarantees (idempotency mainly) >>>>>> that >>>>>> Airflow provides for backfilling and re-running tasks. I thought a bit >>>>>> after the latest discussion kind of died out, and I have one possible >>>>>> solution to the problem. >>>>>> >>>>>> Let me explain what I think about it (but others can have different >>>>>> opinions of course): >>>>>> >>>>>> So far the discussion was that there are several ways to achieve what >>>>>> you want (and it's really about what entity is providing the >>>>>> "idempotency" >>>>>> guarantee: >>>>>> >>>>>> 1) Similarly as just merged in the BigQuery Insert Job >>>>>> https://github.com/apache/airflow/pull/8868/files - you can provide >>>>>> job_id from outside. You'd need to work out the job_id naming that works >>>>>> in >>>>>> your case and make sure that when you re-run your task with the same >>>>>> (dag_id, task_id, execution date) you will get the same id. Then the >>>>>> "uniqueness" thus idempotency is handled by the logic written in the DAG. >>>>>> >>>>>> 2) Store the DAG id in some external storage (via one of the hooks - >>>>>> where it can be queried in the way that will work for you). Then the >>>>>> idempotency is actually handled by the logic in your Operator + some >>>>>> external storage. >>>>>> >>>>>> 3) Query your service and retrieve the JOB ID from it - but you have >>>>>> to have a way to query for the job related to your "dag id + task >>>>>> + execution_date". Then - the idempotency is actually handling by the >>>>>> Service you are using. >>>>>> >>>>>> In the use case, you describe - this is the only thing you need - >>>>>> "idempotency source". I believe you would like to get the case 2) from >>>>>> above but without having to use external storage to store the "unique >>>>>> id". >>>>>> Something that will let each task in the same dag run to set or retrieve >>>>>> a >>>>>> unique value for that particular task. One value should be enough - >>>>>> assuming that each operator/task works on one external data "source". >>>>>> >>>>>> My current thinking is: >>>>>> >>>>>> Why don't we provide such a dedicated, idempotency service inside >>>>>> Airflow? We already have a DB and we could have an"IdempotencyIdStorage" >>>>>> class with two methods: >>>>>> >>>>>> * .set(id: str) and >>>>>> * .get() -> str >>>>>> >>>>>> And the data stored there should be a string keyed by "dag_id, >>>>>> task_id, execution_date)" - available also via Jinja templating. There is >>>>>> no intra-task communication, here, very little possibility of abuse and >>>>>> it >>>>>> seems to solve the major pain point where you have to provide your own >>>>>> storage to get the idempotency if your service does not provide one or >>>>>> you >>>>>> do not want to delegate it to the DAG writer. >>>>>> >>>>>> J. >>>>>> >>>>>> >>>>>> On Mon, Jun 1, 2020 at 2:12 PM Furcy Pin <[email protected]> wrote: >>>>>> >>>>>> The use case I'm referring to is that you can't use xcom to let a >>>>>> task read information from it's past attempts, because when a task starts >>>>>> it's xcom is automatically deleted. >>>>>> >>>>>> My specific use case is that we have a custom LivyOperator that calls >>>>>> Livy to start batch Spark Jobs. >>>>>> When you start a batch job Livy returns a job_id >>>>>> Sometimes our operator can fail for one reason or another (for >>>>>> instance if Livy is unreachable for a while) >>>>>> When the task retries, it calls Livy again, which start the same >>>>>> spark job, but the problem is that the spark job from the first attempt >>>>>> can >>>>>> still be running, >>>>>> and then we have a batch job that runs twice simultaneously and >>>>>> creates duplicates in the output. >>>>>> >>>>>> What we tried to do is getting the job_id from the first try, to >>>>>> check if the job is still running, and wait for it to complete if it is. >>>>>> >>>>>> We tried using xcom to let the task send a message to itself (to it's >>>>>> next try) but xcom is meant for "inter-task communication" only so this >>>>>> doesn't work and is not intended to work. >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Mon, 1 Jun 2020 at 13:15, Ash Berlin-Taylor <[email protected]> >>>>>> wrote: >>>>>> >>>>>> Hi Furcy, >>>>>> >>>>>> Can you give a concrete example of what you mean by intra-task xcom? >>>>>> Depending your use case this may already be possible. >>>>>> >>>>>> On Jun 1 2020, at 11:45 am, Furcy Pin <[email protected]> wrote: >>>>>> >>>>>> Hello, >>>>>> >>>>>> I would like to open a feature request for Airflow to support >>>>>> "intra-task xcom". >>>>>> >>>>>> It seems that there are several distinct use cases for it already >>>>>> and only ugly workarounds and I wanted to list them in a JIRA ticket. >>>>>> >>>>>> I wanted to summarize links to the use cases and past attempts, >>>>>> and the recommended approach (which apparently would be to create >>>>>> a distinct feature from xcom to support this, it could be calle >>>>>> intra-com or self-com ?) >>>>>> >>>>>> Do you know if such ticket already exists? I couldn't find one. >>>>>> Also I can't create any ticket due to some obscure bug (see my other >>>>>> email). >>>>>> >>>>>> Thanks, >>>>>> >>>>>> Furcy >>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> >>>>>> Jarek Potiuk >>>>>> Polidea <https://www.polidea.com/> | Principal Software Engineer >>>>>> >>>>>> M: +48 660 796 129 <+48660796129> >>>>>> [image: Polidea] <https://www.polidea.com/> >>>>>> >>>>>> >>> >>> -- >>> >>> Jarek Potiuk >>> Polidea <https://www.polidea.com/> | Principal Software Engineer >>> >>> M: +48 660 796 129 <+48660796129> >>> [image: Polidea] <https://www.polidea.com/> >>> >>>
