Hi all,
I have problem with spark operator. I get exception
user@host:/# airflow test myDAG myTask 2018-04-26
[2018-04-26 15:32:11,279] {driver.py:120} INFO - Generating grammar tables from
/usr/lib/python3.5/lib2to3/Grammar.txt
[2018-04-26 15:32:11,323] {driver.py:120} INFO - Generating grammar tables from
/usr/lib/python3.5/lib2to3/PatternGrammar.txt
[2018-04-26 15:32:11,456] {__init__.py:45} INFO - Using executor
SequentialExecutor
[2018-04-26 15:32:11,535] {models.py:189} INFO - Filling up the DagBag from
/usr/local/airflow/dags
[2018-04-26 15:32:11,811] {base_hook.py:80} INFO - Using connection to:
sparkhost
Traceback (most recent call last):
File "/usr/local/bin/airflow", line 27, in
args.func(args)
File "/usr/local/lib/python3.5/dist-packages/airflow/bin/cli.py", line 528,
in test
ti.run(ignore_task_deps=True, ignore_ti_state=True, test_mode=True)
File "/usr/local/lib/python3.5/dist-packages/airflow/utils/db.py", line 50,
in wrapper
result = func(*args, **kwargs)
File "/usr/local/lib/python3.5/dist-packages/airflow/models.py", line 1584,
in run
session=session)
File "/usr/local/lib/python3.5/dist-packages/airflow/utils/db.py", line 50,
in wrapper
result = func(*args, **kwargs)
File "/usr/local/lib/python3.5/dist-packages/airflow/models.py", line 1493,
in _run_raw_task
result = task_copy.execute(context=context)
File
"/usr/local/lib/python3.5/dist-packages/airflow/contrib/operators/spark_submit_operator.py",
line 145, in execute
self._hook.submit(self._application)
File
"/usr/local/lib/python3.5/dist-packages/airflow/contrib/hooks/spark_submit_hook.py",
line 231, in submit
**kwargs)
File "/usr/lib/python3.5/subprocess.py", line 947, in __init__
restore_signals, start_new_session)
File "/usr/lib/python3.5/subprocess.py", line 1490, in _execute_child
restore_signals, start_new_session, preexec_fn)
TypeError: Can't convert 'list' object to str implicitly
My DAG look like:
from airflow import DAG
from datetime import datetime, timedelta, date
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
default_args = {
'owner': 'spark',
'depends_on_past': False,
'start_date': datetime.now(),
'retries': 1,
'retry_delay': timedelta(minutes=1)
}
dag = DAG('myDAG', default_args=default_args,)
connection_id = "SPARK"
os.environ[('AIRFLOW_CONN_%s' % connection_id)] = 'spark://sparkhost:7077'
_config = {
'jars': 'spark_job.jar',
'executor_memory': '2g',
'name': 'myJob',
'conn_id': connection_id,
'java_class':'org.Job'
}
operator = SparkSubmitOperator(
task_id='myTask',
dag=dag,
**_config
)
What is wrong? Could somebody help me wit it?