I think we've discussed several approaches like that and using Xcom
name (which for many people would mean "let's just extend XCom table for
that" is not a very good idea to use it IMHO. I think this is very
different functionality/logic which we might or might not agree to
implement as a community. Naming it "Xcom" to trying to extend the XCom
table behavior might be problematic.

Not sure if you are aware but we had very similar discussion about it
recently (without clear conclusions but at least you can see what kind of
issues/problems different people have with this approach)

https://lists.apache.org/thread.html/rc6f56234342c87f154865489e3a6555609e4b98a8c62ca4997cb6a6c%40%3Cdev.airflow.apache.org%3E

I am not saying it is impossible to do, but I think it's a matter of how we
formulate the "use case". It's very tempting to implement a generic -
intra-task communication mechanism, indeed. But it can very easily lead to
people abusing it and bypassing the guarantees (idempotency mainly) that
Airflow provides for backfilling and re-running tasks. I thought a bit
after the latest discussion kind of died out, and I have one possible
solution to the problem.

Let me explain what I think about it (but others can have different
opinions of course):

So far the discussion was that there are several ways to achieve what you
want (and it's really about what entity is providing the "idempotency"
guarantee:

1) Similarly as just merged in the BigQuery Insert Job
https://github.com/apache/airflow/pull/8868/files - you can provide job_id
from outside. You'd need to work out the job_id naming that works in your
case and make sure that when you re-run your task with the same (dag_id,
task_id, execution date) you will get the same id. Then the "uniqueness"
thus idempotency is handled by the logic written in the DAG.

2) Store the DAG id in some external storage (via one of the hooks - where
it can be queried in the way that will work for you). Then the idempotency
is actually handled by the logic in your Operator + some external storage.

3) Query your service and retrieve the JOB ID from it - but you have to
have a way to query for the job related to your "dag id  + task
+ execution_date". Then - the idempotency is actually handling by the
Service you are using.

In the use case, you describe - this is the only thing you need -
"idempotency source". I believe you would like to get the case 2) from
above but without having to use external storage to store the "unique id".
Something that will let each task in the same dag run to set or retrieve a
unique value for that particular task. One value should be enough -
assuming that each operator/task works on one external data "source".

My current thinking is:

Why don't we provide such a dedicated, idempotency service inside Airflow?
We already have a DB and we could have an"IdempotencyIdStorage" class with
two methods:

  * .set(id: str) and
  * .get() -> str

And the data stored there should be a string keyed by "dag_id, task_id,
execution_date)" - available also via Jinja templating. There is no
intra-task communication, here, very little possibility of abuse and it
seems to solve the major pain point where you have to provide your own
storage to get the idempotency if your service does not provide one or you
do not want to delegate it to the DAG writer.

J.


On Mon, Jun 1, 2020 at 2:12 PM Furcy Pin <[email protected]> wrote:

> The use case I'm referring to is that you can't use xcom to let a task
> read information from it's past attempts, because when a task starts it's
> xcom is automatically deleted.
>
> My specific use case is that we have a custom LivyOperator that calls Livy
> to start batch Spark Jobs.
> When you start a batch job Livy returns a job_id
> Sometimes our operator can fail for one reason or another (for instance if
> Livy is unreachable for a while)
> When the task retries, it calls Livy again, which start the same spark
> job, but the problem is that the spark job from the first attempt can still
> be running,
> and then we have a batch job that runs twice simultaneously and creates
> duplicates in the output.
>
> What we tried to do is getting the job_id from the first try, to check if
> the job is still running, and wait for it to complete if it is.
>
> We tried using xcom to let the task send a message to itself (to it's next
> try) but xcom is meant for "inter-task communication" only so this doesn't
> work and is not intended to work.
>
>
>
>
>
> On Mon, 1 Jun 2020 at 13:15, Ash Berlin-Taylor <[email protected]> wrote:
>
>> Hi Furcy,
>>
>> Can you give a concrete example of what you mean by intra-task xcom?
>> Depending your use case this may already be possible.
>>
>> On Jun 1 2020, at 11:45 am, Furcy Pin <[email protected]> wrote:
>>
>> Hello,
>>
>> I would like to open a feature request for Airflow to support "intra-task
>> xcom".
>>
>> It seems that there are several distinct use cases for it already
>> and only ugly workarounds and I wanted to list them in a JIRA ticket.
>>
>> I wanted to summarize links to the use cases and past attempts,
>> and the recommended approach (which apparently would be to create
>> a distinct feature from xcom to support this, it could be calle intra-com
>> or self-com ?)
>>
>> Do you know if such ticket already exists? I couldn't find one.
>> Also I can't create any ticket due to some obscure bug (see my other
>> email).
>>
>> Thanks,
>>
>> Furcy
>>
>>

-- 

Jarek Potiuk
Polidea <https://www.polidea.com/> | Principal Software Engineer

M: +48 660 796 129 <+48660796129>
[image: Polidea] <https://www.polidea.com/>

Reply via email to