niraja b created AIRFLOW-2327: --------------------------------- Summary: Cannot pickle PythonOperator dags using Mesos Executor Key: AIRFLOW-2327 URL: https://issues.apache.org/jira/browse/AIRFLOW-2327 Project: Apache Airflow Issue Type: Bug Components: contrib Affects Versions: Airflow 1.9.0 Environment: prod Reporter: niraja b
We are using the MesosExecutor of Airflow BashOperator and SimpleHTTPOperator works for us The Scheduler is started using -p to pickle the DAGS. The issue we have is with the following sample Code , we tried adding use_dill without use_dill with PythonOperator and with PythonVirtualenvOperator.. we couldnt get it sucessfully working on the agent from __future__ import print_function from airflow.models import DAG from datetime import timedelta, datetime from airflow.operators.python_operator import PythonOperator,PythonVirtualenvOperator DAG_ID = "testdag" DEFAULT_ARGS = { "start_date": datetime(2018, 4, 16, 1, 50, 16), "schedule_interval": None, "dagrun_timeout": timedelta(minutes=60), "email": ['t...@test.com'], "email_on_failure": True, "email_on_retry": False, "retries": 3, "retry_delay": timedelta(seconds=5), } def _testlambda(**kwargs): print("hello world") with DAG(dag_id=DAG_ID, default_args=DEFAULT_ARGS) as dag: ( PythonVirtualenvOperator( task_id='python_1', python_callable=_testlambda, use_dill=True, requirements=['dill'] ) ) Error Traceback (most recent call last): File "/usr/bin/airflow", line 27, in <module> args.func(args) File "/usr/lib/python2.7/site-packages/airflow/bin/cli.py", line 358, in run DagPickle).filter(DagPickle.id == args.pickle).first() File "/usr/lib64/python2.7/site-packages/sqlalchemy/orm/query.py", line 2789, in first ret = list(self[0:1]) File "/usr/lib64/python2.7/site-packages/sqlalchemy/orm/query.py", line 2581, in __getitem__ return list(res) File "/usr/lib64/python2.7/site-packages/sqlalchemy/orm/loading.py", line 137, in instances util.raise_from_cause(err) File "/usr/lib64/python2.7/site-packages/sqlalchemy/util/compat.py", line 203, in raise_from_cause reraise(type(exception), exception, tb=exc_tb, cause=cause) File "/usr/lib64/python2.7/site-packages/sqlalchemy/orm/loading.py", line 102, in instances logging.debug(str(fetch[0])) File "/usr/lib64/python2.7/site-packages/sqlalchemy/engine/result.py", line 156, in __repr__ return repr(sql_util._repr_row(self)) File "/usr/lib64/python2.7/site-packages/sqlalchemy/sql/util.py", line 329, in __repr__ ", ".join(trunc(value) for value in self.row), File "/usr/lib64/python2.7/site-packages/sqlalchemy/sql/sqltypes.py", line 1588, in process return loads(value) File "/usr/lib/python2.7/site-packages/dill/dill.py", line 299, in loads return load(file) File "/usr/lib/python2.7/site-packages/dill/dill.py", line 288, in load obj = pik.load() File "/usr/lib64/python2.7/pickle.py", line 858, in load dispatch[key](self) File "/usr/lib64/python2.7/pickle.py", line 1090, in load_global klass = self.find_class(module, name) File "/usr/lib/python2.7/site-packages/dill/dill.py", line 445, in find_class return StockUnpickler.find_class(self, module, name) File "/usr/lib64/python2.7/pickle.py", line 1124, in find_class __import__(module) ImportError: No module named unusual_prefix_ac646764c974ff68b827793414d8eabcdca720cf_dmitrydag I0416 11:22:34.367975 47476 executor.cpp:938] Command exited with status 1 (pid: 47482) I0416 11:22:35.371712 47481 process.cpp:887] Failed to accept socket: future discarded -- This message was sent by Atlassian JIRA (v7.6.3#76005)