niraja b created AIRFLOW-2327:
---------------------------------

             Summary: Cannot pickle PythonOperator dags using Mesos Executor
                 Key: AIRFLOW-2327
                 URL: https://issues.apache.org/jira/browse/AIRFLOW-2327
             Project: Apache Airflow
          Issue Type: Bug
          Components: contrib
    Affects Versions: Airflow 1.9.0
         Environment: prod
            Reporter: niraja b


We are using the MesosExecutor of Airflow 

 

BashOperator and SimpleHTTPOperator works for us 

The Scheduler is started using -p to pickle the DAGS.

 

 

The issue we have is with the following sample Code , we tried adding use_dill 
without use_dill with PythonOperator and with PythonVirtualenvOperator.. we 
couldnt get it sucessfully working on the agent 

 

from __future__ import print_function
from airflow.models import DAG
from datetime import timedelta, datetime
from airflow.operators.python_operator import 
PythonOperator,PythonVirtualenvOperator

DAG_ID = "testdag"


DEFAULT_ARGS = {
 "start_date": datetime(2018, 4, 16, 1, 50, 16),
 "schedule_interval": None,
 "dagrun_timeout": timedelta(minutes=60),
 "email": ['t...@test.com'],
 "email_on_failure": True,
 "email_on_retry": False,
 "retries": 3,
 "retry_delay": timedelta(seconds=5),
}

def _testlambda(**kwargs):
 print("hello world")

with DAG(dag_id=DAG_ID, default_args=DEFAULT_ARGS) as dag:
 (
 PythonVirtualenvOperator(
 task_id='python_1',
 python_callable=_testlambda, 
 use_dill=True,
 requirements=['dill']
 )
 )

 

Error 

 

Traceback (most recent call last):

  File "/usr/bin/airflow", line 27, in <module>

    args.func(args)

  File "/usr/lib/python2.7/site-packages/airflow/bin/cli.py", line 358, in run

    DagPickle).filter(DagPickle.id == args.pickle).first()

  File "/usr/lib64/python2.7/site-packages/sqlalchemy/orm/query.py", line 2789, 
in first

    ret = list(self[0:1])

  File "/usr/lib64/python2.7/site-packages/sqlalchemy/orm/query.py", line 2581, 
in __getitem__

    return list(res)

  File "/usr/lib64/python2.7/site-packages/sqlalchemy/orm/loading.py", line 
137, in instances

    util.raise_from_cause(err)

  File "/usr/lib64/python2.7/site-packages/sqlalchemy/util/compat.py", line 
203, in raise_from_cause

    reraise(type(exception), exception, tb=exc_tb, cause=cause)

  File "/usr/lib64/python2.7/site-packages/sqlalchemy/orm/loading.py", line 
102, in instances

    logging.debug(str(fetch[0]))

  File "/usr/lib64/python2.7/site-packages/sqlalchemy/engine/result.py", line 
156, in __repr__

    return repr(sql_util._repr_row(self))

  File "/usr/lib64/python2.7/site-packages/sqlalchemy/sql/util.py", line 329, 
in __repr__

    ", ".join(trunc(value) for value in self.row),

  File "/usr/lib64/python2.7/site-packages/sqlalchemy/sql/sqltypes.py", line 
1588, in process

    return loads(value)

  File "/usr/lib/python2.7/site-packages/dill/dill.py", line 299, in loads

    return load(file)

  File "/usr/lib/python2.7/site-packages/dill/dill.py", line 288, in load

    obj = pik.load()

  File "/usr/lib64/python2.7/pickle.py", line 858, in load

    dispatch[key](self)

  File "/usr/lib64/python2.7/pickle.py", line 1090, in load_global

    klass = self.find_class(module, name)

  File "/usr/lib/python2.7/site-packages/dill/dill.py", line 445, in find_class

    return StockUnpickler.find_class(self, module, name)

  File "/usr/lib64/python2.7/pickle.py", line 1124, in find_class

    __import__(module)

ImportError: No module named 
unusual_prefix_ac646764c974ff68b827793414d8eabcdca720cf_dmitrydag

I0416 11:22:34.367975 47476 executor.cpp:938] Command exited with status 1 
(pid: 47482)

I0416 11:22:35.371712 47481 process.cpp:887] Failed to accept socket: future 
discarded

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to