ivan de los santos created AIRFLOW-6033:
-------------------------------------------

             Summary: UI crashes at "Landing Time" after switching task_id 
caps/small letters
                 Key: AIRFLOW-6033
                 URL: https://issues.apache.org/jira/browse/AIRFLOW-6033
             Project: Apache Airflow
          Issue Type: Bug
          Components: DAG, ui
    Affects Versions: 1.10.6
            Reporter: ivan de los santos


Airflow UI will crash in the browser returning "Oops" message and the Traceback 
of the crashing error.

This is caused by modifying a task_id with a capital/small letter, I will point 
out some examples that will cause airflow to crash:
 - task_id = "DUMMY_TASK" to task_id = "dUMMY_TASK"
 - task_id = "Dummy_Task" to task_id = "dummy_Task" or "Dummy_task",...
 - task_id = "Dummy_task" to task_id = "Dummy_tASk"

_____________________________________

If you change the name of the task_id to something different such as, in our 
example:
 - task_id = "Dummy_Task" to task_id = "DummyTask" or "Dummytask"

It won't fail since it will be recognized as new tasks, which is the expected 
behaviour.

If we switch back the modified name to the original name it won't crash since 
it will access to the correct tasks instances. I will explain in next 
paragraphs where this error is located.

_____________________________________________

 *How to replicate*: 
 # Launch airflow webserver -p 8080
 # Go to the Airflow-UI
 # Create an example DAG with a task_id name up to your choice in small letters 
(ex. "run")
 # Launch the DAG and wait its execution to finish
 # Modify the task_id inside the DAG with the first letter to capital letter 
(ex. "Run")
 # Refresh the DAG
 # Go to "Landing Times" inside the DAG menu in the UI
 # You will get an "oops" message with the Traceback.

 

*File causing the problem*:  
[https://github.com/apache/airflow/blob/master/airflow/www/views.py] (lines 
1643 - 1654)

 

*Reasons of the problem*:
 #  KeyError: 'run', meaning a dictionary does not contain the task_id "run", 
it will get more into the details of where this comes from.

{code:python}
Traceback (most recent call last):
  File "/home/rde/.local/lib/python3.6/site-packages/flask/app.py", line 2446, 
in wsgi_app
    response = self.full_dispatch_request()
  File "/home/rde/.local/lib/python3.6/site-packages/flask/app.py", line 1951, 
in full_dispatch_request
    rv = self.handle_user_exception(e)
  File "/home/rde/.local/lib/python3.6/site-packages/flask/app.py", line 1820, 
in handle_user_exception
    reraise(exc_type, exc_value, tb)
  File "/home/rde/.local/lib/python3.6/site-packages/flask/_compat.py", line 
39, in reraise
    raise value
  File "/home/rde/.local/lib/python3.6/site-packages/flask/app.py", line 1949, 
in full_dispatch_request
    rv = self.dispatch_request()
  File "/home/rde/.local/lib/python3.6/site-packages/flask/app.py", line 1935, 
in dispatch_request
    return self.view_functions[rule.endpoint](**req.view_args)
  File "/home/rde/.local/lib/python3.6/site-packages/flask_admin/base.py", line 
69, in inner
    return self._run_view(f, *args, **kwargs)
  File "/home/rde/.local/lib/python3.6/site-packages/flask_admin/base.py", line 
368, in _run_view
    return fn(self, *args, **kwargs)
  File "/home/rde/.local/lib/python3.6/site-packages/flask_login/utils.py", 
line 258, in decorated_view
    return func(*args, **kwargs)
  File "/home/rde/.local/lib/python3.6/site-packages/airflow/www/utils.py", 
line 295, in wrapper
    return f(*args, **kwargs)
  File "/home/rde/.local/lib/python3.6/site-packages/airflow/utils/db.py", line 
74, in wrapper
    return func(*args, **kwargs)
  File "/home/rde/.local/lib/python3.6/site-packages/airflow/www/views.py", 
line 1921, in landing_times
    x[ti.task_id].append(dttm)
KeyError: 'run'

{code}
_____________________________
h2. Code
{code:python}
for task in dag.tasks:
    y[task.task_id] = []
    x[task.task_id] = []

    for ti in task.get_task_instances(start_date=min_date, end_date=base_date):

        ts = ti.execution_date
        if dag.schedule_interval and dag.following_schedule(ts):
            ts = dag.following_schedule(ts)
        if ti.end_date:
            dttm = wwwutils.epoch(ti.execution_date)
            secs = (ti.end_date - ts).total_seconds()
            x[ti.task_id].append(dttm)
            y[ti.task_id].append(secs)

{code}
 
We can see in first two lines inside the first for loop, how the dictionary x 
and y is being filled with tasks_id attributes which comes from the actual DAG.

*The problem actually comes in the second for loop* when you get the task 
instances from a DAG, I am not sure about this next part and I wish someone to 
clarify my question about this.

I think that the task instances (ti) received from get_task_instances() 
function comes from the information stored into the database, that is the 
reason of crash when you access to "Landing Times" page, is that the x and y 
where filled with the actual name of the task_id in the DAG and the 
task_instances' task_id has different name stored causing this problem access 
to the dictionary.



One of my main questions is how having a different task name (such as changing 
from "run" to "Run")  the function get_task_instances() keeps returning past 
task instances with different name, such asking instances of Run but returns 
task instances (ti) with task_id "run"?



________________________


*Proposed solution*:  I propose creating a variable saving the DAG task's 
task_id from first dag.tasks for loop , and re-using it for the creation of the 
dictionary and the assign of task instances time values (dttm and secs). This 
is due to the fact that the task instances (ti) task_id will be related to the 
task who actually asks (get_task_instances) from itself.

 
{code:python}
for task in dag.tasks:
    # Change proposed is HERE

    task_id = task.task_id
    y[task_id] = []
    x[task_id] = []
    for ti in task.get_task_instances(start_date=min_date, end_date=base_date):
        ts = ti.execution_date
        if dag.schedule_interval and dag.following_schedule(ts):
            ts = dag.following_schedule(ts)
        if ti.end_date:
            dttm = wwwutils.epoch(ti.execution_date)
            secs = (ti.end_date - ts).total_seconds()

           # And HERE
            x[task_id].append(dttm)
            y[task_id].append(secs)
{code}
 
This fixed the problem for me and my team.
 

I am willing to work deeper on this issue if the problem requires it or apply 
my solution.
 

Best regards,

Iván



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to