[jira] [Updated] (AIRFLOW-5795) Airflow cached old Code and Veriables
[ https://issues.apache.org/jira/browse/AIRFLOW-5795?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] kasim updated AIRFLOW-5795: --- Description: My dag start_date is configed in Variables, {code:java} from datetime import datetime from airflow.models import Variable class Config(object): version = "V21" dag_start_date = datetime(2019, 10, 25) sf_schedule_report = "30 8 * * *" sf_schedule_etl = '30 1 * * *' sf_schedule_main = "45 3,4,5,6,7,8 * * *" CONFIG_KEY = 'sf_config_%s' % Config.version sf_config = Variable.get(CONFIG_KEY, deserialize_json=True, default_var={}) if sf_config: for k, v in sf_config.items(): print(f'Overwrite {k} by {v}') if hasattr(Config, k): if k == 'dag_start_date': setattr(Config, k, datetime.strptime(v, '%Y-%m-%d') ) else: setattr(Config, k, v) print(' config ### \n\n') print(f'CONFIG_KEY: {CONFIG_KEY}') print(f'CONFIG DICT: {sf_config}') print(' config end ### \n\n') {code} My dag init as : {code:java} dag = DAG('dm_sf_main_%s' % Config.version, start_date=Config.dag_start_date, default_args=default_args, schedule_interval=schedule, user_defined_filters={ 'mod' : lambda s, d:s%d }, ) {code} Variables is : !image-2019-10-28-12-32-55-531.png! But the log shows: * airflow tried 4 times * it didn't read Variable at first time * it use somewhere cached start start_date `2019-10-17` , schedule: 45 1,2,3,4,5,6 * * * (This is old settings on Variable 10 days ago ) * it did read new Variable on later attampt I have tried delete files, delete dag on webui, delete related task_instace in database. But still same error . I think there are some cache in scheduler memory cache , I can't restart airflow , so can't confirm. But this shouldn't happen. {code:java} *** Reading local file: /data/opt/workflow/airflow/logs/dm_sf_main_V21/branch_task/2019-10-17T01:45:00+08:00/1.log [2019-10-17 02:45:15,628] {__init__.py:1139} INFO - Dependencies all met for [2019-10-17 02:45:15,644] {__init__.py:1139} INFO - Dependencies all met for [2019-10-17 02:45:15,644] {__init__.py:1353} INFO - [2019-10-17 02:45:15,645] {__init__.py:1354} INFO - Starting attempt 1 of 1 [2019-10-17 02:45:15,645] {__init__.py:1355} INFO - [2019-10-17 02:45:15,702] {__init__.py:1374} INFO - Executing on 2019-10-17T01:45:00+08:00 [2019-10-17 02:45:15,702] {base_task_runner.py:119} INFO - Running: ['airflow', 'run', 'dm_sf_main_V21', 'branch_task', '2019-10-17T01:45:00+08:00', '--job_id', '154142', '--raw', '-sd', 'DAGS_FOLDER/sf_dags_n/dm_sf_main.py', '--cfg_path', '/tmp/tmp4p1ml6pq'] [2019-10-17 02:45:16,180] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task [2019-10-17 02:45:16,179] {settings.py:182} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, pid=17786 [2019-10-17 02:45:16,341] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task [2019-10-17 02:45:16,340] {__init__.py:51} INFO - Using executor CeleryExecutor [2019-10-17 02:45:16,643] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task [2019-10-17 02:45:16,642] {__init__.py:305} INFO - Filling up the DagBag from /opt/workflow/airflow/dags/sf_dags_n/dm_sf_main.py [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task schedule: 45 1,2,3,4,5,6 * * * [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task divisor: 5 [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task ### found sales_forecast_prophet in operator_pyspark_conf, overwrite default conf ! [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task ## Job[sales_forecast_prophet] Command-line : [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task --- [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task spark-submitx [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task --- [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task [2019-10-17
[jira] [Updated] (AIRFLOW-5795) Airflow cached old Code and Veriables
[ https://issues.apache.org/jira/browse/AIRFLOW-5795?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] kasim updated AIRFLOW-5795: --- Description: My dag start_date is configed in Variables, {code:java} from datetime import datetime from airflow.models import Variable class Config(object): version = "V21" dag_start_date = datetime(2019, 10, 25) sf_schedule_report = "30 8 * * *" sf_schedule_etl = '30 1 * * *' sf_schedule_main = "45 3,4,5,6,7,8 * * *" CONFIG_KEY = 'sf_config_%s' % Config.version sf_config = Variable.get(CONFIG_KEY, deserialize_json=True, default_var={}) if sf_config: for k, v in sf_config.items(): print(f'Overwrite {k} by {v}') if hasattr(Config, k): if k == 'dag_start_date': setattr(Config, k, datetime.strptime(v, '%Y-%m-%d') ) else: setattr(Config, k, v) print(' config ### \n\n') print(f'CONFIG_KEY: {CONFIG_KEY}') print(f'CONFIG DICT: {sf_config}') print(' config end ### \n\n') {code} My dag init as : {code:java} dag = DAG('dm_sf_etl_%s' % Config.version, start_date=Config.dag_start_date, default_args=default_args, schedule_interval=schedule, user_defined_filters={ 'mod' : lambda s, d:s%d }, ) {code} Variables is : !image-2019-10-28-12-32-55-531.png! But the log shows: * airflow tried 4 times * it didn't read Variable at first time * it use somewhere cached start start_date `2019-10-17` , schedule: 45 1,2,3,4,5,6 * * * (This is old settings on Variable 10 days ago ) * it did read new Variable on later attampt I have tried delete files, delete dag on webui, delete related task_instace in database. But still same error . I think there are some cache in scheduler memory cache , I can't restart airflow , so can't confirm. But this shouldn't happen. {code:java} *** Reading local file: /data/opt/workflow/airflow/logs/dm_sf_main_V21/branch_task/2019-10-17T01:45:00+08:00/1.log [2019-10-17 02:45:15,628] {__init__.py:1139} INFO - Dependencies all met for [2019-10-17 02:45:15,644] {__init__.py:1139} INFO - Dependencies all met for [2019-10-17 02:45:15,644] {__init__.py:1353} INFO - [2019-10-17 02:45:15,645] {__init__.py:1354} INFO - Starting attempt 1 of 1 [2019-10-17 02:45:15,645] {__init__.py:1355} INFO - [2019-10-17 02:45:15,702] {__init__.py:1374} INFO - Executing on 2019-10-17T01:45:00+08:00 [2019-10-17 02:45:15,702] {base_task_runner.py:119} INFO - Running: ['airflow', 'run', 'dm_sf_main_V21', 'branch_task', '2019-10-17T01:45:00+08:00', '--job_id', '154142', '--raw', '-sd', 'DAGS_FOLDER/sf_dags_n/dm_sf_main.py', '--cfg_path', '/tmp/tmp4p1ml6pq'] [2019-10-17 02:45:16,180] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task [2019-10-17 02:45:16,179] {settings.py:182} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, pid=17786 [2019-10-17 02:45:16,341] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task [2019-10-17 02:45:16,340] {__init__.py:51} INFO - Using executor CeleryExecutor [2019-10-17 02:45:16,643] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task [2019-10-17 02:45:16,642] {__init__.py:305} INFO - Filling up the DagBag from /opt/workflow/airflow/dags/sf_dags_n/dm_sf_main.py [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task schedule: 45 1,2,3,4,5,6 * * * [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task divisor: 5 [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task ### found sales_forecast_prophet in operator_pyspark_conf, overwrite default conf ! [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task ## Job[sales_forecast_prophet] Command-line : [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task --- [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task spark-submitx [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task --- [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task [2019-10-17
[jira] [Updated] (AIRFLOW-5795) Airflow cached old Code and Veriables
[ https://issues.apache.org/jira/browse/AIRFLOW-5795?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] kasim updated AIRFLOW-5795: --- Attachment: (was: 1)A]~UL_N@TVU072M)68WHP.png) > Airflow cached old Code and Veriables > -- > > Key: AIRFLOW-5795 > URL: https://issues.apache.org/jira/browse/AIRFLOW-5795 > Project: Apache Airflow > Issue Type: Bug > Components: DAG, database >Affects Versions: 1.10.3 >Reporter: kasim >Priority: Major > Attachments: image-2019-10-28-12-32-55-531.png > > > My dag start_date is configed in Variables, > > {code:java} > from datetime import datetime > from airflow.models import Variable > class Config(object): > version = "V21" > > dag_start_date = datetime(2019, 10, 25) > sf_schedule_report = "30 8 * * *" > sf_schedule_etl = '30 1 * * *' > sf_schedule_main = "45 3,4,5,6,7,8 * * *" > CONFIG_KEY = 'sf_config_%s' % Config.version > sf_config = Variable.get(CONFIG_KEY, deserialize_json=True, default_var={}) > if sf_config: > for k, v in sf_config.items(): > print(f'Overwrite {k} by {v}') > if hasattr(Config, k): > if k == 'dag_start_date': > setattr(Config, k, datetime.strptime(v, '%Y-%m-%d') ) > else: > setattr(Config, k, v) > print(' config ### \n\n') > print(f'CONFIG_KEY: {CONFIG_KEY}') > print(f'CONFIG DICT: {sf_config}') > print(' config end ### \n\n') > {code} > My dag init as : > {code:java} > dag = DAG('dm_sf_etl_%s' % Config.version, > start_date=Config.dag_start_date, > default_args=default_args, > schedule_interval=schedule, > user_defined_filters={ > 'mod' : lambda s, d:s%d > }, > ) > {code} > Variables is : > !image-2019-10-28-12-32-55-531.png! > > > But the log shows: > * airflow tried 4 times > * it didn't read Variable at first time > * it use somewhere cached start start_date `2019-10-17` , schedule: 45 > 1,2,3,4,5,6 * * * (This is old settings on Variable 10 days ago ) > * it did read new Variable on later attampt > > I have tried delete files, delete dag on webui, delete related task_instace > in database. But still same error . > I think there are some cache in scheduler memory cache , I can't restart > airflow , so can't confirm. But this shouldn't happen. > > {code:java} > *** Reading local file: > /data/opt/workflow/airflow/logs/dm_sf_main_V21/branch_task/2019-10-17T01:45:00+08:00/1.log > [2019-10-17 02:45:15,628] {__init__.py:1139} INFO - Dependencies all met for > > [2019-10-17 02:45:15,644] {__init__.py:1139} INFO - Dependencies all met for > > [2019-10-17 02:45:15,644] {__init__.py:1353} INFO - > > [2019-10-17 02:45:15,645] {__init__.py:1354} INFO - Starting attempt 1 of 1 > [2019-10-17 02:45:15,645] {__init__.py:1355} INFO - > > [2019-10-17 02:45:15,702] {__init__.py:1374} INFO - Executing > on 2019-10-17T01:45:00+08:00 > [2019-10-17 02:45:15,702] {base_task_runner.py:119} INFO - Running: > ['airflow', 'run', 'dm_sf_main_V21', 'branch_task', > '2019-10-17T01:45:00+08:00', '--job_id', '154142', '--raw', '-sd', > 'DAGS_FOLDER/sf_dags_n/dm_sf_main.py', '--cfg_path', '/tmp/tmp4p1ml6pq'] > [2019-10-17 02:45:16,180] {base_task_runner.py:101} INFO - Job 154142: > Subtask branch_task [2019-10-17 02:45:16,179] {settings.py:182} INFO - > settings.configure_orm(): Using pool settings. pool_size=5, > pool_recycle=1800, pid=17786 > [2019-10-17 02:45:16,341] {base_task_runner.py:101} INFO - Job 154142: > Subtask branch_task [2019-10-17 02:45:16,340] {__init__.py:51} INFO - Using > executor CeleryExecutor > [2019-10-17 02:45:16,643] {base_task_runner.py:101} INFO - Job 154142: > Subtask branch_task [2019-10-17 02:45:16,642] {__init__.py:305} INFO - > Filling up the DagBag from /opt/workflow/airflow/dags/sf_dags_n/dm_sf_main.py > [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: > Subtask branch_task > [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: > Subtask branch_task schedule: 45 1,2,3,4,5,6 * * * > [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: > Subtask branch_task divisor: 5 > [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: > Subtask branch_task > [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: > Subtask branch_task ### found sales_forecast_prophet in > operator_pyspark_conf, overwrite default conf ! > [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: > Subtask branch_task > [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: > Subtask branch_task ##
[jira] [Updated] (AIRFLOW-5795) Airflow cached old Code and Veriables
[ https://issues.apache.org/jira/browse/AIRFLOW-5795?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] kasim updated AIRFLOW-5795: --- Attachment: image-2019-10-28-12-32-55-531.png > Airflow cached old Code and Veriables > -- > > Key: AIRFLOW-5795 > URL: https://issues.apache.org/jira/browse/AIRFLOW-5795 > Project: Apache Airflow > Issue Type: Bug > Components: DAG, database >Affects Versions: 1.10.3 >Reporter: kasim >Priority: Major > Attachments: 1)A]~UL_N@TVU072M)68WHP.png, > image-2019-10-28-12-32-55-531.png > > > My dag start_date is configed in Variables, > > {code:java} > from datetime import datetime > from airflow.models import Variable > class Config(object): > version = "V21" > > dag_start_date = datetime(2019, 10, 25) > sf_schedule_report = "30 8 * * *" > sf_schedule_etl = '30 1 * * *' > sf_schedule_main = "45 3,4,5,6,7,8 * * *" > CONFIG_KEY = 'sf_config_%s' % Config.version > sf_config = Variable.get(CONFIG_KEY, deserialize_json=True, default_var={}) > if sf_config: > for k, v in sf_config.items(): > print(f'Overwrite {k} by {v}') > if hasattr(Config, k): > if k == 'dag_start_date': > setattr(Config, k, datetime.strptime(v, '%Y-%m-%d') ) > else: > setattr(Config, k, v) > print(' config ### \n\n') > print(f'CONFIG_KEY: {CONFIG_KEY}') > print(f'CONFIG DICT: {sf_config}') > print(' config end ### \n\n') > {code} > My dag init as : > {code:java} > dag = DAG('dm_sf_etl_%s' % Config.version, > start_date=Config.dag_start_date, > default_args=default_args, > schedule_interval=schedule, > user_defined_filters={ > 'mod' : lambda s, d:s%d > }, > ) > {code} > Variables is : > !image-2019-10-28-12-25-56-300.png! > > > But the log shows: > * airflow tried 4 times > * it didn't read Variable at first time > * it use somewhere cached start start_date `2019-10-17` , schedule: 45 > 1,2,3,4,5,6 * * * (This is old settings on Variable 10 days ago ) > * it did read new Variable on later attampt > > I have tried delete files, delete dag on webui, delete related task_instace > in database. But still same error . > I think there are some cache in scheduler memory cache , I can't restart > airflow , so can't confirm. But this shouldn't happen. > > {code:java} > *** Reading local file: > /data/opt/workflow/airflow/logs/dm_sf_main_V21/branch_task/2019-10-17T01:45:00+08:00/1.log > [2019-10-17 02:45:15,628] {__init__.py:1139} INFO - Dependencies all met for > > [2019-10-17 02:45:15,644] {__init__.py:1139} INFO - Dependencies all met for > > [2019-10-17 02:45:15,644] {__init__.py:1353} INFO - > > [2019-10-17 02:45:15,645] {__init__.py:1354} INFO - Starting attempt 1 of 1 > [2019-10-17 02:45:15,645] {__init__.py:1355} INFO - > > [2019-10-17 02:45:15,702] {__init__.py:1374} INFO - Executing > on 2019-10-17T01:45:00+08:00 > [2019-10-17 02:45:15,702] {base_task_runner.py:119} INFO - Running: > ['airflow', 'run', 'dm_sf_main_V21', 'branch_task', > '2019-10-17T01:45:00+08:00', '--job_id', '154142', '--raw', '-sd', > 'DAGS_FOLDER/sf_dags_n/dm_sf_main.py', '--cfg_path', '/tmp/tmp4p1ml6pq'] > [2019-10-17 02:45:16,180] {base_task_runner.py:101} INFO - Job 154142: > Subtask branch_task [2019-10-17 02:45:16,179] {settings.py:182} INFO - > settings.configure_orm(): Using pool settings. pool_size=5, > pool_recycle=1800, pid=17786 > [2019-10-17 02:45:16,341] {base_task_runner.py:101} INFO - Job 154142: > Subtask branch_task [2019-10-17 02:45:16,340] {__init__.py:51} INFO - Using > executor CeleryExecutor > [2019-10-17 02:45:16,643] {base_task_runner.py:101} INFO - Job 154142: > Subtask branch_task [2019-10-17 02:45:16,642] {__init__.py:305} INFO - > Filling up the DagBag from /opt/workflow/airflow/dags/sf_dags_n/dm_sf_main.py > [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: > Subtask branch_task > [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: > Subtask branch_task schedule: 45 1,2,3,4,5,6 * * * > [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: > Subtask branch_task divisor: 5 > [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: > Subtask branch_task > [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: > Subtask branch_task ### found sales_forecast_prophet in > operator_pyspark_conf, overwrite default conf ! > [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: > Subtask branch_task > [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: > Subtask
[jira] [Updated] (AIRFLOW-5795) Airflow cached old Code and Veriables
[ https://issues.apache.org/jira/browse/AIRFLOW-5795?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] kasim updated AIRFLOW-5795: --- Attachment: 1)A]~UL_N@TVU072M)68WHP.png > Airflow cached old Code and Veriables > -- > > Key: AIRFLOW-5795 > URL: https://issues.apache.org/jira/browse/AIRFLOW-5795 > Project: Apache Airflow > Issue Type: Bug > Components: DAG, database >Affects Versions: 1.10.3 >Reporter: kasim >Priority: Major > Attachments: 1)A]~UL_N@TVU072M)68WHP.png, > image-2019-10-28-12-32-55-531.png > > > My dag start_date is configed in Variables, > > {code:java} > from datetime import datetime > from airflow.models import Variable > class Config(object): > version = "V21" > > dag_start_date = datetime(2019, 10, 25) > sf_schedule_report = "30 8 * * *" > sf_schedule_etl = '30 1 * * *' > sf_schedule_main = "45 3,4,5,6,7,8 * * *" > CONFIG_KEY = 'sf_config_%s' % Config.version > sf_config = Variable.get(CONFIG_KEY, deserialize_json=True, default_var={}) > if sf_config: > for k, v in sf_config.items(): > print(f'Overwrite {k} by {v}') > if hasattr(Config, k): > if k == 'dag_start_date': > setattr(Config, k, datetime.strptime(v, '%Y-%m-%d') ) > else: > setattr(Config, k, v) > print(' config ### \n\n') > print(f'CONFIG_KEY: {CONFIG_KEY}') > print(f'CONFIG DICT: {sf_config}') > print(' config end ### \n\n') > {code} > My dag init as : > {code:java} > dag = DAG('dm_sf_etl_%s' % Config.version, > start_date=Config.dag_start_date, > default_args=default_args, > schedule_interval=schedule, > user_defined_filters={ > 'mod' : lambda s, d:s%d > }, > ) > {code} > Variables is : > !image-2019-10-28-12-25-56-300.png! > > > But the log shows: > * airflow tried 4 times > * it didn't read Variable at first time > * it use somewhere cached start start_date `2019-10-17` , schedule: 45 > 1,2,3,4,5,6 * * * (This is old settings on Variable 10 days ago ) > * it did read new Variable on later attampt > > I have tried delete files, delete dag on webui, delete related task_instace > in database. But still same error . > I think there are some cache in scheduler memory cache , I can't restart > airflow , so can't confirm. But this shouldn't happen. > > {code:java} > *** Reading local file: > /data/opt/workflow/airflow/logs/dm_sf_main_V21/branch_task/2019-10-17T01:45:00+08:00/1.log > [2019-10-17 02:45:15,628] {__init__.py:1139} INFO - Dependencies all met for > > [2019-10-17 02:45:15,644] {__init__.py:1139} INFO - Dependencies all met for > > [2019-10-17 02:45:15,644] {__init__.py:1353} INFO - > > [2019-10-17 02:45:15,645] {__init__.py:1354} INFO - Starting attempt 1 of 1 > [2019-10-17 02:45:15,645] {__init__.py:1355} INFO - > > [2019-10-17 02:45:15,702] {__init__.py:1374} INFO - Executing > on 2019-10-17T01:45:00+08:00 > [2019-10-17 02:45:15,702] {base_task_runner.py:119} INFO - Running: > ['airflow', 'run', 'dm_sf_main_V21', 'branch_task', > '2019-10-17T01:45:00+08:00', '--job_id', '154142', '--raw', '-sd', > 'DAGS_FOLDER/sf_dags_n/dm_sf_main.py', '--cfg_path', '/tmp/tmp4p1ml6pq'] > [2019-10-17 02:45:16,180] {base_task_runner.py:101} INFO - Job 154142: > Subtask branch_task [2019-10-17 02:45:16,179] {settings.py:182} INFO - > settings.configure_orm(): Using pool settings. pool_size=5, > pool_recycle=1800, pid=17786 > [2019-10-17 02:45:16,341] {base_task_runner.py:101} INFO - Job 154142: > Subtask branch_task [2019-10-17 02:45:16,340] {__init__.py:51} INFO - Using > executor CeleryExecutor > [2019-10-17 02:45:16,643] {base_task_runner.py:101} INFO - Job 154142: > Subtask branch_task [2019-10-17 02:45:16,642] {__init__.py:305} INFO - > Filling up the DagBag from /opt/workflow/airflow/dags/sf_dags_n/dm_sf_main.py > [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: > Subtask branch_task > [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: > Subtask branch_task schedule: 45 1,2,3,4,5,6 * * * > [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: > Subtask branch_task divisor: 5 > [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: > Subtask branch_task > [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: > Subtask branch_task ### found sales_forecast_prophet in > operator_pyspark_conf, overwrite default conf ! > [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: > Subtask branch_task > [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: > Subtask