[jira] [Updated] (AIRFLOW-5795) Airflow cached old Code and Veriables

2019-10-27 Thread kasim (Jira)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-5795?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

kasim updated AIRFLOW-5795:
---
Description: 
My dag start_date is configed in Variables, 

 
{code:java}
from datetime import datetime
from airflow.models import Variable

class Config(object):
version = "V21"

dag_start_date = datetime(2019, 10, 25)
sf_schedule_report = "30 8 * * *"
sf_schedule_etl = '30 1 * * *'
sf_schedule_main = "45 3,4,5,6,7,8 * * *"


CONFIG_KEY = 'sf_config_%s' % Config.version

sf_config = Variable.get(CONFIG_KEY, deserialize_json=True, default_var={})

if sf_config:
for k, v in sf_config.items():
print(f'Overwrite {k} by {v}')
if hasattr(Config, k):
if k == 'dag_start_date':
setattr(Config, k, datetime.strptime(v, '%Y-%m-%d') )
else:
setattr(Config, k, v)

print(' config ### \n\n')

print(f'CONFIG_KEY: {CONFIG_KEY}')
print(f'CONFIG DICT: {sf_config}')

print(' config end ### \n\n')
{code}
My dag init as :
{code:java}
dag = DAG('dm_sf_main_%s' % Config.version, 
start_date=Config.dag_start_date,
default_args=default_args, 
schedule_interval=schedule,
user_defined_filters={
'mod' : lambda s, d:s%d
},
)
{code}
Variables is :

!image-2019-10-28-12-32-55-531.png!

 

 

But the log shows:
 * airflow tried 4 times
 * it didn't read Variable at first time
 * it use somewhere cached start start_date `2019-10-17` ,  schedule: 45 
1,2,3,4,5,6 * * *   (This is old settings on Variable 10 days ago )
 * it did read new Variable on later attampt

 

I have tried delete files, delete dag on webui, delete related task_instace in 
database. But still same error .

I think there are some cache in scheduler memory cache , I can't restart 
airflow , so can't confirm. But this shouldn't happen.

 
{code:java}
*** Reading local file: 
/data/opt/workflow/airflow/logs/dm_sf_main_V21/branch_task/2019-10-17T01:45:00+08:00/1.log
[2019-10-17 02:45:15,628] {__init__.py:1139} INFO - Dependencies all met for 

[2019-10-17 02:45:15,644] {__init__.py:1139} INFO - Dependencies all met for 

[2019-10-17 02:45:15,644] {__init__.py:1353} INFO - 

[2019-10-17 02:45:15,645] {__init__.py:1354} INFO - Starting attempt 1 of 1
[2019-10-17 02:45:15,645] {__init__.py:1355} INFO - 

[2019-10-17 02:45:15,702] {__init__.py:1374} INFO - Executing 
 on 2019-10-17T01:45:00+08:00
[2019-10-17 02:45:15,702] {base_task_runner.py:119} INFO - Running: ['airflow', 
'run', 'dm_sf_main_V21', 'branch_task', '2019-10-17T01:45:00+08:00', 
'--job_id', '154142', '--raw', '-sd', 'DAGS_FOLDER/sf_dags_n/dm_sf_main.py', 
'--cfg_path', '/tmp/tmp4p1ml6pq']
[2019-10-17 02:45:16,180] {base_task_runner.py:101} INFO - Job 154142: Subtask 
branch_task [2019-10-17 02:45:16,179] {settings.py:182} INFO - 
settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, 
pid=17786
[2019-10-17 02:45:16,341] {base_task_runner.py:101} INFO - Job 154142: Subtask 
branch_task [2019-10-17 02:45:16,340] {__init__.py:51} INFO - Using executor 
CeleryExecutor
[2019-10-17 02:45:16,643] {base_task_runner.py:101} INFO - Job 154142: Subtask 
branch_task [2019-10-17 02:45:16,642] {__init__.py:305} INFO - Filling up the 
DagBag from /opt/workflow/airflow/dags/sf_dags_n/dm_sf_main.py
[2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask 
branch_task 
[2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask 
branch_task schedule: 45 1,2,3,4,5,6 * * *
[2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask 
branch_task divisor: 5
[2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask 
branch_task 
[2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask 
branch_task ###  found sales_forecast_prophet in operator_pyspark_conf, 
overwrite default conf !
[2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask 
branch_task 
[2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask 
branch_task ## Job[sales_forecast_prophet] Command-line :
[2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask 
branch_task ---
[2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask 
branch_task 
[2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask 
branch_task spark-submitx
[2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask 
branch_task 
[2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask 
branch_task ---
[2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask 
branch_task 
[2019-10-17 

[jira] [Updated] (AIRFLOW-5795) Airflow cached old Code and Veriables

2019-10-27 Thread kasim (Jira)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-5795?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

kasim updated AIRFLOW-5795:
---
Description: 
My dag start_date is configed in Variables, 

 
{code:java}
from datetime import datetime
from airflow.models import Variable

class Config(object):
version = "V21"

dag_start_date = datetime(2019, 10, 25)
sf_schedule_report = "30 8 * * *"
sf_schedule_etl = '30 1 * * *'
sf_schedule_main = "45 3,4,5,6,7,8 * * *"


CONFIG_KEY = 'sf_config_%s' % Config.version

sf_config = Variable.get(CONFIG_KEY, deserialize_json=True, default_var={})

if sf_config:
for k, v in sf_config.items():
print(f'Overwrite {k} by {v}')
if hasattr(Config, k):
if k == 'dag_start_date':
setattr(Config, k, datetime.strptime(v, '%Y-%m-%d') )
else:
setattr(Config, k, v)

print(' config ### \n\n')

print(f'CONFIG_KEY: {CONFIG_KEY}')
print(f'CONFIG DICT: {sf_config}')

print(' config end ### \n\n')
{code}
My dag init as :
{code:java}
dag = DAG('dm_sf_etl_%s' % Config.version, 
start_date=Config.dag_start_date,
default_args=default_args, 
schedule_interval=schedule,
user_defined_filters={
'mod' : lambda s, d:s%d
},
)
{code}
Variables is :

!image-2019-10-28-12-32-55-531.png!

 

 

But the log shows:
 * airflow tried 4 times
 * it didn't read Variable at first time
 * it use somewhere cached start start_date `2019-10-17` ,  schedule: 45 
1,2,3,4,5,6 * * *   (This is old settings on Variable 10 days ago )
 * it did read new Variable on later attampt

 

I have tried delete files, delete dag on webui, delete related task_instace in 
database. But still same error .

I think there are some cache in scheduler memory cache , I can't restart 
airflow , so can't confirm. But this shouldn't happen.

 
{code:java}
*** Reading local file: 
/data/opt/workflow/airflow/logs/dm_sf_main_V21/branch_task/2019-10-17T01:45:00+08:00/1.log
[2019-10-17 02:45:15,628] {__init__.py:1139} INFO - Dependencies all met for 

[2019-10-17 02:45:15,644] {__init__.py:1139} INFO - Dependencies all met for 

[2019-10-17 02:45:15,644] {__init__.py:1353} INFO - 

[2019-10-17 02:45:15,645] {__init__.py:1354} INFO - Starting attempt 1 of 1
[2019-10-17 02:45:15,645] {__init__.py:1355} INFO - 

[2019-10-17 02:45:15,702] {__init__.py:1374} INFO - Executing 
 on 2019-10-17T01:45:00+08:00
[2019-10-17 02:45:15,702] {base_task_runner.py:119} INFO - Running: ['airflow', 
'run', 'dm_sf_main_V21', 'branch_task', '2019-10-17T01:45:00+08:00', 
'--job_id', '154142', '--raw', '-sd', 'DAGS_FOLDER/sf_dags_n/dm_sf_main.py', 
'--cfg_path', '/tmp/tmp4p1ml6pq']
[2019-10-17 02:45:16,180] {base_task_runner.py:101} INFO - Job 154142: Subtask 
branch_task [2019-10-17 02:45:16,179] {settings.py:182} INFO - 
settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, 
pid=17786
[2019-10-17 02:45:16,341] {base_task_runner.py:101} INFO - Job 154142: Subtask 
branch_task [2019-10-17 02:45:16,340] {__init__.py:51} INFO - Using executor 
CeleryExecutor
[2019-10-17 02:45:16,643] {base_task_runner.py:101} INFO - Job 154142: Subtask 
branch_task [2019-10-17 02:45:16,642] {__init__.py:305} INFO - Filling up the 
DagBag from /opt/workflow/airflow/dags/sf_dags_n/dm_sf_main.py
[2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask 
branch_task 
[2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask 
branch_task schedule: 45 1,2,3,4,5,6 * * *
[2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask 
branch_task divisor: 5
[2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask 
branch_task 
[2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask 
branch_task ###  found sales_forecast_prophet in operator_pyspark_conf, 
overwrite default conf !
[2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask 
branch_task 
[2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask 
branch_task ## Job[sales_forecast_prophet] Command-line :
[2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask 
branch_task ---
[2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask 
branch_task 
[2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask 
branch_task spark-submitx
[2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask 
branch_task 
[2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask 
branch_task ---
[2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask 
branch_task 
[2019-10-17 

[jira] [Updated] (AIRFLOW-5795) Airflow cached old Code and Veriables

2019-10-27 Thread kasim (Jira)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-5795?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

kasim updated AIRFLOW-5795:
---
Attachment: (was: 1)A]~UL_N@TVU072M)68WHP.png)

> Airflow cached old Code and Veriables 
> --
>
> Key: AIRFLOW-5795
> URL: https://issues.apache.org/jira/browse/AIRFLOW-5795
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: DAG, database
>Affects Versions: 1.10.3
>Reporter: kasim
>Priority: Major
> Attachments: image-2019-10-28-12-32-55-531.png
>
>
> My dag start_date is configed in Variables, 
>  
> {code:java}
> from datetime import datetime
> from airflow.models import Variable
> class Config(object):
> version = "V21"
> 
> dag_start_date = datetime(2019, 10, 25)
> sf_schedule_report = "30 8 * * *"
> sf_schedule_etl = '30 1 * * *'
> sf_schedule_main = "45 3,4,5,6,7,8 * * *"
> CONFIG_KEY = 'sf_config_%s' % Config.version
> sf_config = Variable.get(CONFIG_KEY, deserialize_json=True, default_var={})
> if sf_config:
> for k, v in sf_config.items():
> print(f'Overwrite {k} by {v}')
> if hasattr(Config, k):
> if k == 'dag_start_date':
> setattr(Config, k, datetime.strptime(v, '%Y-%m-%d') )
> else:
> setattr(Config, k, v)
> print(' config ### \n\n')
> print(f'CONFIG_KEY: {CONFIG_KEY}')
> print(f'CONFIG DICT: {sf_config}')
> print(' config end ### \n\n')
> {code}
> My dag init as :
> {code:java}
> dag = DAG('dm_sf_etl_%s' % Config.version, 
> start_date=Config.dag_start_date,
> default_args=default_args, 
> schedule_interval=schedule,
> user_defined_filters={
> 'mod' : lambda s, d:s%d
> },
> )
> {code}
> Variables is :
> !image-2019-10-28-12-32-55-531.png!
>  
>  
> But the log shows:
>  * airflow tried 4 times
>  * it didn't read Variable at first time
>  * it use somewhere cached start start_date `2019-10-17` ,  schedule: 45 
> 1,2,3,4,5,6 * * *   (This is old settings on Variable 10 days ago )
>  * it did read new Variable on later attampt
>  
> I have tried delete files, delete dag on webui, delete related task_instace 
> in database. But still same error .
> I think there are some cache in scheduler memory cache , I can't restart 
> airflow , so can't confirm. But this shouldn't happen.
>  
> {code:java}
> *** Reading local file: 
> /data/opt/workflow/airflow/logs/dm_sf_main_V21/branch_task/2019-10-17T01:45:00+08:00/1.log
> [2019-10-17 02:45:15,628] {__init__.py:1139} INFO - Dependencies all met for 
> 
> [2019-10-17 02:45:15,644] {__init__.py:1139} INFO - Dependencies all met for 
> 
> [2019-10-17 02:45:15,644] {__init__.py:1353} INFO - 
> 
> [2019-10-17 02:45:15,645] {__init__.py:1354} INFO - Starting attempt 1 of 1
> [2019-10-17 02:45:15,645] {__init__.py:1355} INFO - 
> 
> [2019-10-17 02:45:15,702] {__init__.py:1374} INFO - Executing 
>  on 2019-10-17T01:45:00+08:00
> [2019-10-17 02:45:15,702] {base_task_runner.py:119} INFO - Running: 
> ['airflow', 'run', 'dm_sf_main_V21', 'branch_task', 
> '2019-10-17T01:45:00+08:00', '--job_id', '154142', '--raw', '-sd', 
> 'DAGS_FOLDER/sf_dags_n/dm_sf_main.py', '--cfg_path', '/tmp/tmp4p1ml6pq']
> [2019-10-17 02:45:16,180] {base_task_runner.py:101} INFO - Job 154142: 
> Subtask branch_task [2019-10-17 02:45:16,179] {settings.py:182} INFO - 
> settings.configure_orm(): Using pool settings. pool_size=5, 
> pool_recycle=1800, pid=17786
> [2019-10-17 02:45:16,341] {base_task_runner.py:101} INFO - Job 154142: 
> Subtask branch_task [2019-10-17 02:45:16,340] {__init__.py:51} INFO - Using 
> executor CeleryExecutor
> [2019-10-17 02:45:16,643] {base_task_runner.py:101} INFO - Job 154142: 
> Subtask branch_task [2019-10-17 02:45:16,642] {__init__.py:305} INFO - 
> Filling up the DagBag from /opt/workflow/airflow/dags/sf_dags_n/dm_sf_main.py
> [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: 
> Subtask branch_task 
> [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: 
> Subtask branch_task schedule: 45 1,2,3,4,5,6 * * *
> [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: 
> Subtask branch_task divisor: 5
> [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: 
> Subtask branch_task 
> [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: 
> Subtask branch_task ###  found sales_forecast_prophet in 
> operator_pyspark_conf, overwrite default conf !
> [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: 
> Subtask branch_task 
> [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: 
> Subtask branch_task ## 

[jira] [Updated] (AIRFLOW-5795) Airflow cached old Code and Veriables

2019-10-27 Thread kasim (Jira)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-5795?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

kasim updated AIRFLOW-5795:
---
Attachment: image-2019-10-28-12-32-55-531.png

> Airflow cached old Code and Veriables 
> --
>
> Key: AIRFLOW-5795
> URL: https://issues.apache.org/jira/browse/AIRFLOW-5795
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: DAG, database
>Affects Versions: 1.10.3
>Reporter: kasim
>Priority: Major
> Attachments: 1)A]~UL_N@TVU072M)68WHP.png, 
> image-2019-10-28-12-32-55-531.png
>
>
> My dag start_date is configed in Variables, 
>  
> {code:java}
> from datetime import datetime
> from airflow.models import Variable
> class Config(object):
> version = "V21"
> 
> dag_start_date = datetime(2019, 10, 25)
> sf_schedule_report = "30 8 * * *"
> sf_schedule_etl = '30 1 * * *'
> sf_schedule_main = "45 3,4,5,6,7,8 * * *"
> CONFIG_KEY = 'sf_config_%s' % Config.version
> sf_config = Variable.get(CONFIG_KEY, deserialize_json=True, default_var={})
> if sf_config:
> for k, v in sf_config.items():
> print(f'Overwrite {k} by {v}')
> if hasattr(Config, k):
> if k == 'dag_start_date':
> setattr(Config, k, datetime.strptime(v, '%Y-%m-%d') )
> else:
> setattr(Config, k, v)
> print(' config ### \n\n')
> print(f'CONFIG_KEY: {CONFIG_KEY}')
> print(f'CONFIG DICT: {sf_config}')
> print(' config end ### \n\n')
> {code}
> My dag init as :
> {code:java}
> dag = DAG('dm_sf_etl_%s' % Config.version, 
> start_date=Config.dag_start_date,
> default_args=default_args, 
> schedule_interval=schedule,
> user_defined_filters={
> 'mod' : lambda s, d:s%d
> },
> )
> {code}
> Variables is :
> !image-2019-10-28-12-25-56-300.png!
>  
>  
> But the log shows:
>  * airflow tried 4 times
>  * it didn't read Variable at first time
>  * it use somewhere cached start start_date `2019-10-17` ,  schedule: 45 
> 1,2,3,4,5,6 * * *   (This is old settings on Variable 10 days ago )
>  * it did read new Variable on later attampt
>  
> I have tried delete files, delete dag on webui, delete related task_instace 
> in database. But still same error .
> I think there are some cache in scheduler memory cache , I can't restart 
> airflow , so can't confirm. But this shouldn't happen.
>  
> {code:java}
> *** Reading local file: 
> /data/opt/workflow/airflow/logs/dm_sf_main_V21/branch_task/2019-10-17T01:45:00+08:00/1.log
> [2019-10-17 02:45:15,628] {__init__.py:1139} INFO - Dependencies all met for 
> 
> [2019-10-17 02:45:15,644] {__init__.py:1139} INFO - Dependencies all met for 
> 
> [2019-10-17 02:45:15,644] {__init__.py:1353} INFO - 
> 
> [2019-10-17 02:45:15,645] {__init__.py:1354} INFO - Starting attempt 1 of 1
> [2019-10-17 02:45:15,645] {__init__.py:1355} INFO - 
> 
> [2019-10-17 02:45:15,702] {__init__.py:1374} INFO - Executing 
>  on 2019-10-17T01:45:00+08:00
> [2019-10-17 02:45:15,702] {base_task_runner.py:119} INFO - Running: 
> ['airflow', 'run', 'dm_sf_main_V21', 'branch_task', 
> '2019-10-17T01:45:00+08:00', '--job_id', '154142', '--raw', '-sd', 
> 'DAGS_FOLDER/sf_dags_n/dm_sf_main.py', '--cfg_path', '/tmp/tmp4p1ml6pq']
> [2019-10-17 02:45:16,180] {base_task_runner.py:101} INFO - Job 154142: 
> Subtask branch_task [2019-10-17 02:45:16,179] {settings.py:182} INFO - 
> settings.configure_orm(): Using pool settings. pool_size=5, 
> pool_recycle=1800, pid=17786
> [2019-10-17 02:45:16,341] {base_task_runner.py:101} INFO - Job 154142: 
> Subtask branch_task [2019-10-17 02:45:16,340] {__init__.py:51} INFO - Using 
> executor CeleryExecutor
> [2019-10-17 02:45:16,643] {base_task_runner.py:101} INFO - Job 154142: 
> Subtask branch_task [2019-10-17 02:45:16,642] {__init__.py:305} INFO - 
> Filling up the DagBag from /opt/workflow/airflow/dags/sf_dags_n/dm_sf_main.py
> [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: 
> Subtask branch_task 
> [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: 
> Subtask branch_task schedule: 45 1,2,3,4,5,6 * * *
> [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: 
> Subtask branch_task divisor: 5
> [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: 
> Subtask branch_task 
> [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: 
> Subtask branch_task ###  found sales_forecast_prophet in 
> operator_pyspark_conf, overwrite default conf !
> [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: 
> Subtask branch_task 
> [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: 
> Subtask 

[jira] [Updated] (AIRFLOW-5795) Airflow cached old Code and Veriables

2019-10-27 Thread kasim (Jira)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-5795?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

kasim updated AIRFLOW-5795:
---
Attachment: 1)A]~UL_N@TVU072M)68WHP.png

> Airflow cached old Code and Veriables 
> --
>
> Key: AIRFLOW-5795
> URL: https://issues.apache.org/jira/browse/AIRFLOW-5795
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: DAG, database
>Affects Versions: 1.10.3
>Reporter: kasim
>Priority: Major
> Attachments: 1)A]~UL_N@TVU072M)68WHP.png, 
> image-2019-10-28-12-32-55-531.png
>
>
> My dag start_date is configed in Variables, 
>  
> {code:java}
> from datetime import datetime
> from airflow.models import Variable
> class Config(object):
> version = "V21"
> 
> dag_start_date = datetime(2019, 10, 25)
> sf_schedule_report = "30 8 * * *"
> sf_schedule_etl = '30 1 * * *'
> sf_schedule_main = "45 3,4,5,6,7,8 * * *"
> CONFIG_KEY = 'sf_config_%s' % Config.version
> sf_config = Variable.get(CONFIG_KEY, deserialize_json=True, default_var={})
> if sf_config:
> for k, v in sf_config.items():
> print(f'Overwrite {k} by {v}')
> if hasattr(Config, k):
> if k == 'dag_start_date':
> setattr(Config, k, datetime.strptime(v, '%Y-%m-%d') )
> else:
> setattr(Config, k, v)
> print(' config ### \n\n')
> print(f'CONFIG_KEY: {CONFIG_KEY}')
> print(f'CONFIG DICT: {sf_config}')
> print(' config end ### \n\n')
> {code}
> My dag init as :
> {code:java}
> dag = DAG('dm_sf_etl_%s' % Config.version, 
> start_date=Config.dag_start_date,
> default_args=default_args, 
> schedule_interval=schedule,
> user_defined_filters={
> 'mod' : lambda s, d:s%d
> },
> )
> {code}
> Variables is :
> !image-2019-10-28-12-25-56-300.png!
>  
>  
> But the log shows:
>  * airflow tried 4 times
>  * it didn't read Variable at first time
>  * it use somewhere cached start start_date `2019-10-17` ,  schedule: 45 
> 1,2,3,4,5,6 * * *   (This is old settings on Variable 10 days ago )
>  * it did read new Variable on later attampt
>  
> I have tried delete files, delete dag on webui, delete related task_instace 
> in database. But still same error .
> I think there are some cache in scheduler memory cache , I can't restart 
> airflow , so can't confirm. But this shouldn't happen.
>  
> {code:java}
> *** Reading local file: 
> /data/opt/workflow/airflow/logs/dm_sf_main_V21/branch_task/2019-10-17T01:45:00+08:00/1.log
> [2019-10-17 02:45:15,628] {__init__.py:1139} INFO - Dependencies all met for 
> 
> [2019-10-17 02:45:15,644] {__init__.py:1139} INFO - Dependencies all met for 
> 
> [2019-10-17 02:45:15,644] {__init__.py:1353} INFO - 
> 
> [2019-10-17 02:45:15,645] {__init__.py:1354} INFO - Starting attempt 1 of 1
> [2019-10-17 02:45:15,645] {__init__.py:1355} INFO - 
> 
> [2019-10-17 02:45:15,702] {__init__.py:1374} INFO - Executing 
>  on 2019-10-17T01:45:00+08:00
> [2019-10-17 02:45:15,702] {base_task_runner.py:119} INFO - Running: 
> ['airflow', 'run', 'dm_sf_main_V21', 'branch_task', 
> '2019-10-17T01:45:00+08:00', '--job_id', '154142', '--raw', '-sd', 
> 'DAGS_FOLDER/sf_dags_n/dm_sf_main.py', '--cfg_path', '/tmp/tmp4p1ml6pq']
> [2019-10-17 02:45:16,180] {base_task_runner.py:101} INFO - Job 154142: 
> Subtask branch_task [2019-10-17 02:45:16,179] {settings.py:182} INFO - 
> settings.configure_orm(): Using pool settings. pool_size=5, 
> pool_recycle=1800, pid=17786
> [2019-10-17 02:45:16,341] {base_task_runner.py:101} INFO - Job 154142: 
> Subtask branch_task [2019-10-17 02:45:16,340] {__init__.py:51} INFO - Using 
> executor CeleryExecutor
> [2019-10-17 02:45:16,643] {base_task_runner.py:101} INFO - Job 154142: 
> Subtask branch_task [2019-10-17 02:45:16,642] {__init__.py:305} INFO - 
> Filling up the DagBag from /opt/workflow/airflow/dags/sf_dags_n/dm_sf_main.py
> [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: 
> Subtask branch_task 
> [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: 
> Subtask branch_task schedule: 45 1,2,3,4,5,6 * * *
> [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: 
> Subtask branch_task divisor: 5
> [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: 
> Subtask branch_task 
> [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: 
> Subtask branch_task ###  found sales_forecast_prophet in 
> operator_pyspark_conf, overwrite default conf !
> [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: 
> Subtask branch_task 
> [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: 
> Subtask