[jira] [Updated] (AIRFLOW-5927) Airflow cache import file or variable

2019-11-13 Thread kasim (Jira)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-5927?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

kasim updated AIRFLOW-5927:
---
Description: 
I have a `config.py`  pull configure from `Variable` and merge into default 
config :

 
{code:java}
from datetime import datetime
from airflow.models import Variable
class Config:
 version = "V21"
etl_feature_dir = f'/data/dm/sales_forecast/features/version={version}'
 forecast_result_dir = 
f'/data/dm/sales_forecast/results/fbprophet/version={version}'
 forecast_model_dir = 
f'/data/dm/sales_forecast/models/fbprophet/version={version}'
 forecast_result_s3_dir = 
f's3a://x/data/dm/sales_forecast/fbprophet/version={version}'
 
 etl_dir = '/data/dm/sales_forecast/etl'
 feature_current_path = 'hdfs://pupuxdc/test/MERGE_OUT'
dag_start_date = datetime(2019, 10, 25)
etl_start_time = "2019-06-01 00:00:00"
 etl_end_time = " (execution_date + 
macros.timedelta(days=8)).strftime('%Y-%m-%d 00:00:00') "
train_start_time = " (execution_date  
macros.timedelta(days=90)).strftime('%Y%m-%d 00:00:00') "
 train_end_time = " execution_date.strftime('%Y-%m-%d 00:00:00') "
predict_start_time = " execution_date.strftime('%Y-%m-%d 00:00:00') "
 predict_end_time = " (execution_date + 
macros.timedelta(days=7)).strftime('%Y-%m-%d 00:00:00') "
report_start_date = " (execution_date  
macros.timedelta(days=6)).strftime('%Y%m-%d 00:00:00') "
 report_end_date = " execution_date.strftime('%Y-%m-%d 00:00:00') "
sf_schedule_report = "30 8  *  "
sf_schedule_etl = '30 1  *  '
sf_schedule_main_flow = "45 2  *  "
CONFIG_KEY = 'sf_config_%s' % Config.version
sf_config = Variable.get(CONFIG_KEY, deserialize_json=True, default_var={})
if sf_config:
 for k, v in sf_config.items():
 print(f'Overwrite {k} by {v}')
 if hasattr(Config, k):
 if k == 'dag_start_date':
 setattr(Config, k, datetime.strptime(v, '%Y-%m-%d') )
             elif v == 'None':
 setattr(Config, k, None)
 else:
 setattr(Config, k, v)
 
{code}
 

 

 

And I have 5 dag file import this Config . they have some similar code like 
  
{code:java}
 
from datetime import datetime, timedelta
from airflow import DAG
 from airflow.operators.dummy_operator import DummyOperator
 from airflow.operators.bash_operator import BashOperator
 from airflow.operators.dagrun_operator import TriggerDagRunOperator
 from airflow.models import Variable
from sf_dags_n.config import Config
default_args = {'owner': 'mithril','depends_on_past': False,
'email': ['mithril'],'email_on_failure': False,'email_on_retry': False, 
   'retries': 2,'schedule_interval': schedule_interval,}
 
{code}
{code:java}
dag = DAG('dm_sfx_etl_%s' % Config.version, 
 start_date=Config.dag_start_date,
 default_args=default_args, 
 schedule_interval='20 1  *  ',
 user_defined_filters= { 'mod' : lambda s, d:s%d }
,
 )



{code}
 
  
 The stange thing is : 
  
 Change `sf_schedule_etl` in Variable  took effect several times, but at some 
point , I couldn't change it from variable any more, even I directly hard code  
it : 
  
 ```
 dag = DAG('dm_sfx_etl_%s' % Config.version, 
 start_date=Config.dag_start_date ,
 default_args=default_args, 
 schedule_interval='20 1*  **',
 user_defined_filters= \{ 'mod' : lambda s, d:s%d }

,
 )
 ```
  
 If such situation came, even delete dag file and delete from airflow webui 
,didn't change `schedule_interval` . 
  
 PS: my dag file have running for some days, in these days ,I may add some 
operator to it , or change some operator type, but it still fine .   I think 
there must be some cache in airflow lead to this  problem.
  
  
  
  
  

  was:
I have a `config.py`  pull configure from `Variable` and merge into default 
config :

 
{code:java}
from datetime import datetime
from airflow.models import Variable
class Config:
 version = "V21"
etl_feature_dir = f'/data/dm/sales_forecast/features/version={version}'
 forecast_result_dir = 
f'/data/dm/sales_forecast/results/fbprophet/version={version}'
 forecast_model_dir = 
f'/data/dm/sales_forecast/models/fbprophet/version={version}'
 forecast_result_s3_dir = 
f's3a://x/data/dm/sales_forecast/fbprophet/version={version}'
 
 etl_dir = '/data/dm/sales_forecast/etl'
 feature_current_path = 'hdfs://pupuxdc/test/MERGE_OUT'
dag_start_date = datetime(2019, 10, 25)
etl_start_time = "2019-06-01 00:00:00"
 etl_end_time = " (execution_date + 
macros.timedelta(days=8)).strftime('%Y-%m-%d 00:00:00') "
train_start_time = " (execution_date  
macros.timedelta(days=90)).strftime('%Y%m-%d 00:00:00') "
 train_end_time = " execution_date.strftime('%Y-%m-%d 00:00:00') "
predict_start_time = " execution_date.strftime('%Y-%m-%d 00:00:00') "
 

[jira] [Updated] (AIRFLOW-5927) Airflow cache import file or variable

2019-11-13 Thread kasim (Jira)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-5927?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

kasim updated AIRFLOW-5927:
---
Description: 
I have a `config.py`  pull configure from `Variable` and merge into default 
config :

 
{code:java}
from datetime import datetime
from airflow.models import Variable
class Config:
 version = "V21"
etl_feature_dir = f'/data/dm/sales_forecast/features/version={version}'
 forecast_result_dir = 
f'/data/dm/sales_forecast/results/fbprophet/version={version}'
 forecast_model_dir = 
f'/data/dm/sales_forecast/models/fbprophet/version={version}'
 forecast_result_s3_dir = 
f's3a://x/data/dm/sales_forecast/fbprophet/version={version}'
 
 etl_dir = '/data/dm/sales_forecast/etl'
 feature_current_path = 'hdfs://pupuxdc/test/MERGE_OUT'
dag_start_date = datetime(2019, 10, 25)
etl_start_time = "2019-06-01 00:00:00"
 etl_end_time = " (execution_date + 
macros.timedelta(days=8)).strftime('%Y-%m-%d 00:00:00') "
train_start_time = " (execution_date  
macros.timedelta(days=90)).strftime('%Y%m-%d 00:00:00') "
 train_end_time = " execution_date.strftime('%Y-%m-%d 00:00:00') "
predict_start_time = " execution_date.strftime('%Y-%m-%d 00:00:00') "
 predict_end_time = " (execution_date + 
macros.timedelta(days=7)).strftime('%Y-%m-%d 00:00:00') "
report_start_date = " (execution_date  
macros.timedelta(days=6)).strftime('%Y%m-%d 00:00:00') "
 report_end_date = " execution_date.strftime('%Y-%m-%d 00:00:00') "
sf_schedule_report = "30 8  *  "
sf_schedule_etl = '30 1  *  '
sf_schedule_main_flow = "45 2  *  "
CONFIG_KEY = 'sf_config_%s' % Config.version
sf_config = Variable.get(CONFIG_KEY, deserialize_json=True, default_var={})
if sf_config:
 for k, v in sf_config.items():
 print(f'Overwrite {k} by {v}')
 if hasattr(Config, k):
 if k == 'dag_start_date':
 setattr(Config, k, datetime.strptime(v, '%Y-%m-%d') )
             elif v == 'None':
 setattr(Config, k, None)
 else:
 setattr(Config, k, v)
 
{code}
 

 

 

And I have 5 dag file import this Config . they have some similar code like 
  
{code:java}
 
from datetime import datetime, timedelta
from airflow import DAG
 from airflow.operators.dummy_operator import DummyOperator
 from airflow.operators.bash_operator import BashOperator
 from airflow.operators.dagrun_operator import TriggerDagRunOperator
 from airflow.models import Variable
from sf_dags_n.config import Config
default_args = {'owner': 'mithril','depends_on_past': False,
'email': ['mithril'],'email_on_failure': False,'email_on_retry': False, 
   'retries': 2,'schedule_interval': schedule_interval,}
 
{code}
{code:java}
dag = DAG('dm_sfx_etl_%s' % Config.version, 
 start_date=datetime.strptime(Config.dag_start_date, '%Y-%m-%d') ,
 default_args=default_args, 
 schedule_interval='20 1  *  ',
 user_defined_filters= { 'mod' : lambda s, d:s%d }
,
 )



{code}
 
  
 The stange thing is : 
  
 Change `sf_schedule_etl` in Variable  took effect several times, but at some 
point , I couldn't change it from variable any more, even I directly hard code  
it : 
  
 ```
 dag = DAG('dm_sfx_etl_%s' % Config.version, 
 start_date=Config.dag_start_date ,
 default_args=default_args, 
 schedule_interval='20 1**  *  ',
 user_defined_filters= \{ 'mod' : lambda s, d:s%d }

,
 )
 ```
  
 If such situation came, even delete dag file and delete from airflow webui 
,didn't change `schedule_interval` . 
  
 PS: my dag file have running for some days, in these days ,I may add some 
operator to it , or change some operator type, but it still fine .   I think 
there must be some cache in airflow lead to this  problem.
  
  
  
  
  

  was:
I have a `config.py`  pull configure from `Variable` and merge into default 
config :

 
{code:java}
from datetime import datetime
from airflow.models import Variable
class Config:
 version = "V21"
etl_feature_dir = f'/data/dm/sales_forecast/features/version={version}'
 forecast_result_dir = 
f'/data/dm/sales_forecast/results/fbprophet/version={version}'
 forecast_model_dir = 
f'/data/dm/sales_forecast/models/fbprophet/version={version}'
 forecast_result_s3_dir = 
f's3a://pupumall-dc-tmp/data/dm/sales_forecast/fbprophet/version={version}'
 
 etl_dir = '/data/dm/sales_forecast/etl'
 feature_current_path = 'hdfs://pupuxdc/test/MERGE_OUT'
dag_start_date = datetime(2019, 10, 25)
etl_start_time = "2019-06-01 00:00:00"
 etl_end_time = " (execution_date + 
macros.timedelta(days=8)).strftime('%Y-%m-%d 00:00:00') "
train_start_time = " (execution_date  
macros.timedelta(days=90)).strftime('%Y%m-%d 00:00:00') "
 train_end_time = " execution_date.strftime('%Y-%m-%d 00:00:00') "
predict_start_time = " 

[jira] [Updated] (AIRFLOW-5927) Airflow cache import file or variable

2019-11-13 Thread kasim (Jira)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-5927?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

kasim updated AIRFLOW-5927:
---
Description: 
I have a `config.py`  pull configure from `Variable` and merge into default 
config :

 
{code:java}
from datetime import datetime
from airflow.models import Variable
class Config:
 version = "V21"
etl_feature_dir = f'/data/dm/sales_forecast/features/version={version}'
 forecast_result_dir = 
f'/data/dm/sales_forecast/results/fbprophet/version={version}'
 forecast_model_dir = 
f'/data/dm/sales_forecast/models/fbprophet/version={version}'
 forecast_result_s3_dir = 
f's3a://pupumall-dc-tmp/data/dm/sales_forecast/fbprophet/version={version}'
 
 etl_dir = '/data/dm/sales_forecast/etl'
 feature_current_path = 'hdfs://pupuxdc/test/MERGE_OUT'
dag_start_date = datetime(2019, 10, 25)
etl_start_time = "2019-06-01 00:00:00"
 etl_end_time = " (execution_date + 
macros.timedelta(days=8)).strftime('%Y-%m-%d 00:00:00') "
train_start_time = " (execution_date  
macros.timedelta(days=90)).strftime('%Y%m-%d 00:00:00') "
 train_end_time = " execution_date.strftime('%Y-%m-%d 00:00:00') "
predict_start_time = " execution_date.strftime('%Y-%m-%d 00:00:00') "
 predict_end_time = " (execution_date + 
macros.timedelta(days=7)).strftime('%Y-%m-%d 00:00:00') "
report_start_date = " (execution_date  
macros.timedelta(days=6)).strftime('%Y%m-%d 00:00:00') "
 report_end_date = " execution_date.strftime('%Y-%m-%d 00:00:00') "
sf_schedule_report = "30 8  *  "
sf_schedule_etl = '30 1  *  '
sf_schedule_main_flow = "45 2  *  "
CONFIG_KEY = 'sf_config_%s' % Config.version
sf_config = Variable.get(CONFIG_KEY, deserialize_json=True, default_var={})
if sf_config:
 for k, v in sf_config.items():
 print(f'Overwrite {k} by {v}')
 if hasattr(Config, k):
 if k == 'dag_start_date':
 setattr(Config, k, datetime.strptime(v, '%Y-%m-%d') )
             elif v == 'None':
 setattr(Config, k, None)
 else:
 setattr(Config, k, v)
 
{code}
 

 

 


 And I have 5 dag file import this Config . they have some similar code like 
  
{code:java}
 
from datetime import datetime, timedelta
from airflow import DAG
 from airflow.operators.dummy_operator import DummyOperator
 from airflow.operators.bash_operator import BashOperator
 from airflow.operators.dagrun_operator import TriggerDagRunOperator
 from airflow.models import Variable
from sf_dags_n.config import Config
default_args = {'owner': 'mithril','depends_on_past': False,
'email': ['mithril'],'email_on_failure': False,'email_on_retry': False, 
   'retries': 2,'schedule_interval': schedule_interval,}
 
{code}
{code:java}
dag = DAG('dm_sfx_etl_%s' % Config.version, 
 start_date=datetime.strptime(Config.dag_start_date, '%Y-%m-%d') ,
 default_args=default_args, 
 schedule_interval='20 1  *  ',
 user_defined_filters= { 'mod' : lambda s, d:s%d }
,
 )



{code}
 
  
 The stange thing is : 
  
 Change `sf_schedule_etl` in Variable  took effect several times, but at some 
point , I couldn't change it from variable any more, even I directly hard code  
it : 
  
 ```
 dag = DAG('dm_sfx_etl_%s' % Config.version, 
 start_date=Config.dag_start_date ,
 default_args=default_args, 
 schedule_interval='20 1  *  **  ',
 user_defined_filters= \{ 'mod' : lambda s, d:s%d }

,
 )
 ```
  
 If such situation came, even delete dag file and delete from airflow webui 
,didn't change `schedule_interval` . 
  
 PS: my dag file have running for some days, in these days ,I may add some 
operator to it , or change some operator type, but it still fine .   I think 
there must be some cache in airflow lead to this  problem.
  
  
  
  
  

  was:
I have a `config.py`  pull configure from `Variable` and merge into default 
config :

 
 ```python

from datetime import datetime

from airflow.models import Variable

class Config:
 version = "V21"

etl_feature_dir = f'/data/dm/sales_forecast/features/version=\{version}'
 forecast_result_dir = 
f'/data/dm/sales_forecast/results/fbprophet/version=\{version}'
 forecast_model_dir = 
f'/data/dm/sales_forecast/models/fbprophet/version=\{version}'
 forecast_result_s3_dir = 
f's3a://pupumall-dc-tmp/data/dm/sales_forecast/fbprophet/version=\{version}'
 
 etl_dir = '/data/dm/sales_forecast/etl'
 feature_current_path = 'hdfs://pupuxdc/test/MERGE_OUT'

dag_start_date = datetime(2019, 10, 25)

etl_start_time = "2019-06-01 00:00:00"
 etl_end_time = "{{ (execution_date + 
macros.timedelta(days=8)).strftime('%Y-%m-%d 00:00:00') }}"

train_start_time = "{{ (execution_date  
-macros.timedelta(days=90)).strftime('%Y-%m-%d 00:00:00') }}"
 train_end_time = "{{ execution_date.strftime('%Y-%m-%d 00:00:00') }}"


[jira] [Updated] (AIRFLOW-5927) Airflow cache import file or variable

2019-11-13 Thread kasim (Jira)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-5927?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

kasim updated AIRFLOW-5927:
---
Description: 
I have a `config.py`  pull configure from `Variable` and merge into default 
config :

 
 ```python

from datetime import datetime

from airflow.models import Variable

class Config:
 version = "V21"

etl_feature_dir = f'/data/dm/sales_forecast/features/version=\{version}'
 forecast_result_dir = 
f'/data/dm/sales_forecast/results/fbprophet/version=\{version}'
 forecast_model_dir = 
f'/data/dm/sales_forecast/models/fbprophet/version=\{version}'
 forecast_result_s3_dir = 
f's3a://pupumall-dc-tmp/data/dm/sales_forecast/fbprophet/version=\{version}'
 
 etl_dir = '/data/dm/sales_forecast/etl'
 feature_current_path = 'hdfs://pupuxdc/test/MERGE_OUT'

dag_start_date = datetime(2019, 10, 25)

etl_start_time = "2019-06-01 00:00:00"
 etl_end_time = "{{ (execution_date + 
macros.timedelta(days=8)).strftime('%Y-%m-%d 00:00:00') }}"

train_start_time = "{{ (execution_date  
-macros.timedelta(days=90)).strftime('%Y-%m-%d 00:00:00') }}"
 train_end_time = "{{ execution_date.strftime('%Y-%m-%d 00:00:00') }}"

predict_start_time = "{{ execution_date.strftime('%Y-%m-%d 00:00:00') }}"
 predict_end_time = "{{ (execution_date + 
macros.timedelta(days=7)).strftime('%Y-%m-%d 00:00:00') }}"

report_start_date = "{{ (execution_date  
-macros.timedelta(days=6)).strftime('%Y-%m-%d 00:00:00') }}"
 report_end_date = "{{ execution_date.strftime('%Y-%m-%d 00:00:00') }}"

sf_schedule_report = "30 8  **  *"

sf_schedule_etl = '30 1  **  *'

sf_schedule_main_flow = "45 2  **  *"

CONFIG_KEY = 'sf_config_%s' % Config.version

sf_config = Variable.get(CONFIG_KEY, deserialize_json=True, default_var={})

if sf_config:
 for k, v in sf_config.items():
 print(f'Overwrite \{k} by \{v}')
 if hasattr(Config, k):
 if k == 'dag_start_date':
setattr(Config, k, datetime.strptime(v, '%Y-%m-%d') )
            elif v == 'None':
 setattr(Config, k, None)
 else:
 setattr(Config, k, v)```

 
 And I have 5 dag file import this Config . they have some similar code like 
  
 ```python

from datetime import datetime, timedelta

from airflow import DAG
 from airflow.operators.dummy_operator import DummyOperator
 from airflow.operators.bash_operator import BashOperator
 from airflow.operators.dagrun_operator import TriggerDagRunOperator
 from airflow.models import Variable

from sf_dags_n.config import Config

default_args = 

{ 'owner': 'mithril', 'depends_on_past': False, 'email': 
['mithril'], 'email_on_failure': False, 'email_on_retry': False, 
'retries': 2, }

dag = DAG('dm_sfx_etl_%s' % Config.version, 
 start_date=datetime.strptime(Config.dag_start_date, '%Y-%m-%d') ,
 default_args=default_args, 
 schedule_interval=Config.sf_schedule_etl,
 user_defined_filters=

{ 'mod' : lambda s, d:s%d }

,
 )

# other codes
 ```
  
  
 The stange thing is : 
  
Change `sf_schedule_etl` in Variable  took effect several times, but at some 
point , I couldn't change it from variable any more, even I directly hard code  
it : 
  
 ```
 dag = DAG('dm_sfx_etl_%s' % Config.version, 
 start_date=datetime.strptime(Config.dag_start_date, '%Y-%m-%d') ,
 default_args=default_args, 
 schedule_interval='20 1  **  *',
 user_defined_filters= \{ 'mod' : lambda s, d:s%d }

,
 )
 ```
  
 If such situation came, even delete dag file and delete from airflow webui 
,didn't change `schedule_interval` . 
  
 PS: my dag file have running for some days, in these days ,I may add some 
operator to it , or change some operator type, but it still fine .   I think 
there must be some cache in airflow lead to this  problem.
  
  
  
  
  

  was:
I have a `config.py`  pull configure from `Variable` and merge into default 
config :

 
```python

from datetime import datetime

from airflow.models import Variable

class Config:
version = "V21"

etl_feature_dir = f'/data/dm/sales_forecast/features/version=\{version}'
forecast_result_dir = 
f'/data/dm/sales_forecast/results/fbprophet/version=\{version}'
forecast_model_dir = 
f'/data/dm/sales_forecast/models/fbprophet/version=\{version}'
forecast_result_s3_dir = 
f's3a://pupumall-dc-tmp/data/dm/sales_forecast/fbprophet/version=\{version}'

etl_dir = '/data/dm/sales_forecast/etl'
feature_current_path = 'hdfs://pupuxdc/test/MERGE_OUT'

dag_start_date = datetime(2019, 10, 25)

etl_start_time = "2019-06-01 00:00:00"
etl_end_time = "\{{ (execution_date + 
macros.timedelta(days=8)).strftime('%Y-%m-%d 00:00:00') }}"

train_start_time = "\{{ (execution_date - 
macros.timedelta(days=90)).strftime('%Y-%m-%d 00:00:00') }}"
train_end_time = "\{{ 

[jira] [Created] (AIRFLOW-5927) Airflow cache import file or variable

2019-11-13 Thread kasim (Jira)
kasim created AIRFLOW-5927:
--

 Summary: Airflow cache import file or variable
 Key: AIRFLOW-5927
 URL: https://issues.apache.org/jira/browse/AIRFLOW-5927
 Project: Apache Airflow
  Issue Type: Bug
  Components: DAG, database
Affects Versions: 1.10.3
Reporter: kasim


I have a `config.py`  pull configure from `Variable` and merge into default 
config :

 
```python

from datetime import datetime

from airflow.models import Variable

class Config:
version = "V21"

etl_feature_dir = f'/data/dm/sales_forecast/features/version=\{version}'
forecast_result_dir = 
f'/data/dm/sales_forecast/results/fbprophet/version=\{version}'
forecast_model_dir = 
f'/data/dm/sales_forecast/models/fbprophet/version=\{version}'
forecast_result_s3_dir = 
f's3a://pupumall-dc-tmp/data/dm/sales_forecast/fbprophet/version=\{version}'

etl_dir = '/data/dm/sales_forecast/etl'
feature_current_path = 'hdfs://pupuxdc/test/MERGE_OUT'

dag_start_date = datetime(2019, 10, 25)

etl_start_time = "2019-06-01 00:00:00"
etl_end_time = "\{{ (execution_date + 
macros.timedelta(days=8)).strftime('%Y-%m-%d 00:00:00') }}"

train_start_time = "\{{ (execution_date - 
macros.timedelta(days=90)).strftime('%Y-%m-%d 00:00:00') }}"
train_end_time = "\{{ execution_date.strftime('%Y-%m-%d 00:00:00') }}"

predict_start_time = "\{{ execution_date.strftime('%Y-%m-%d 00:00:00') }}"
predict_end_time = "\{{ (execution_date + 
macros.timedelta(days=7)).strftime('%Y-%m-%d 00:00:00') }}"

report_start_date = "\{{ (execution_date - 
macros.timedelta(days=6)).strftime('%Y-%m-%d 00:00:00') }}"
report_end_date = "\{{ execution_date.strftime('%Y-%m-%d 00:00:00') }}"

sf_schedule_report = "30 8 * * *"

sf_schedule_etl = '30 1 * * *'


sf_schedule_main_flow = "45 2 * * *"


CONFIG_KEY = 'sf_config_%s' % Config.version

sf_config = Variable.get(CONFIG_KEY, deserialize_json=True, default_var={})


if sf_config:
for k, v in sf_config.items():
print(f'Overwrite \{k} by \{v}')
if hasattr(Config, k):
if k == 'dag_start_date':
print(datetime.strptime(v, '%Y-%m-%d'))
print(type(datetime.strptime(v, '%Y-%m-%d')))

setattr(Config, k, datetime.strptime(v, '%Y-%m-%d') )

print(Config.dag_start_date)
print(type(Config.dag_start_date))

if v == 'None':
setattr(Config, k, None)
else:
setattr(Config, k, v)```


 
And I have 5 dag file import this Config . they have some similar code like 
 
```python


from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dagrun_operator import TriggerDagRunOperator
from airflow.models import Variable

from sf_dags_n.config import Config

default_args = {
'owner': 'mithril',
'depends_on_past': False,
'email': ['mithril'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 2,
}

print(Config.dag_start_date)
print(type(Config.dag_start_date))


dag = DAG('dm_sfx_etl_%s' % Config.version, 
start_date=datetime.strptime(Config.dag_start_date, '%Y-%m-%d') ,
default_args=default_args, 
schedule_interval=Config.sf_schedule_etl,
user_defined_filters={
'mod' : lambda s, d:s%d
},
)

# other codes
```
 
 
The stange thing is : 
 
1. At first , in dag file which `from sf_dags_n.config import Config` ,  
Config.dag_start_date was datetime type .  But it became str several days ago,  
I check it type in config.py and the dag file ,  Config.dag_start_date  is 
still datetime type , but  in dag file  is str .  As I remenber, I set default 
Config.dag_start_date with type str, but changed it when occured problem . Then 
it was fine for  some time before went wrong.
 
2. Change `sf_schedule_etl` in Variable  took effect several times, but at some 
point , I couldn't change it from variable any more, even I directly hard code  
it : 
 
```
dag = DAG('dm_sfx_etl_%s' % Config.version, 
start_date=datetime.strptime(Config.dag_start_date, '%Y-%m-%d') ,
default_args=default_args, 
schedule_interval='20 1 * * *',
user_defined_filters={
'mod' : lambda s, d:s%d
},
)
```
 
If such situation came, even delete dag file and delete from airflow webui 
,didn't change `schedule_interval` . 
 
PS: my dag file have running for some days, in these days ,I may add some 
operator to it , or change some operator type, but it still fine .   I think 
there must be some cache in airflow lead to this  problem.
 
 
 
 
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (AIRFLOW-5795) Airflow cached old Code and Veriables

2019-10-27 Thread kasim (Jira)
kasim created AIRFLOW-5795:
--

 Summary: Airflow cached old Code and Veriables 
 Key: AIRFLOW-5795
 URL: https://issues.apache.org/jira/browse/AIRFLOW-5795
 Project: Apache Airflow
  Issue Type: Bug
  Components: DAG, database
Affects Versions: 1.10.3
Reporter: kasim


My dag start_date is configed in Variables, 

 
{code:java}
from datetime import datetime
from airflow.models import Variable

class Config(object):
version = "V21"

dag_start_date = datetime(2019, 10, 25)
sf_schedule_report = "30 8 * * *"
sf_schedule_etl = '30 1 * * *'
sf_schedule_main = "45 3,4,5,6,7,8 * * *"


CONFIG_KEY = 'sf_config_%s' % Config.version

sf_config = Variable.get(CONFIG_KEY, deserialize_json=True, default_var={})

if sf_config:
for k, v in sf_config.items():
print(f'Overwrite {k} by {v}')
if hasattr(Config, k):
if k == 'dag_start_date':
setattr(Config, k, datetime.strptime(v, '%Y-%m-%d') )
else:
setattr(Config, k, v)

print(' config ### \n\n')

print(f'CONFIG_KEY: {CONFIG_KEY}')
print(f'CONFIG DICT: {sf_config}')

print(' config end ### \n\n')
{code}
My dag init as :
{code:java}
dag = DAG('dm_sf_etl_%s' % Config.version, 
start_date=Config.dag_start_date,
default_args=default_args, 
schedule_interval=schedule,
user_defined_filters={
'mod' : lambda s, d:s%d
},
)
{code}
Variables is :

!image-2019-10-28-12-25-56-300.png!

 

 

But the log shows:
 * airflow tried 4 times
 * it didn't read Variable at first time
 * it use somewhere cached start start_date `2019-10-17` ,  schedule: 45 
1,2,3,4,5,6 * * *   (This is old settings on Variable 10 days ago )
 * it did read new Variable on later attampt

 

I have tried delete files, delete dag on webui, delete related task_instace in 
database. But still same error .

I think there are some cache in scheduler memory cache , I can't restart 
airflow , so can't confirm. But this shouldn't happen.

 
{code:java}
*** Reading local file: 
/data/opt/workflow/airflow/logs/dm_sf_main_V21/branch_task/2019-10-17T01:45:00+08:00/1.log
[2019-10-17 02:45:15,628] {__init__.py:1139} INFO - Dependencies all met for 

[2019-10-17 02:45:15,644] {__init__.py:1139} INFO - Dependencies all met for 

[2019-10-17 02:45:15,644] {__init__.py:1353} INFO - 

[2019-10-17 02:45:15,645] {__init__.py:1354} INFO - Starting attempt 1 of 1
[2019-10-17 02:45:15,645] {__init__.py:1355} INFO - 

[2019-10-17 02:45:15,702] {__init__.py:1374} INFO - Executing 
 on 2019-10-17T01:45:00+08:00
[2019-10-17 02:45:15,702] {base_task_runner.py:119} INFO - Running: ['airflow', 
'run', 'dm_sf_main_V21', 'branch_task', '2019-10-17T01:45:00+08:00', 
'--job_id', '154142', '--raw', '-sd', 'DAGS_FOLDER/sf_dags_n/dm_sf_main.py', 
'--cfg_path', '/tmp/tmp4p1ml6pq']
[2019-10-17 02:45:16,180] {base_task_runner.py:101} INFO - Job 154142: Subtask 
branch_task [2019-10-17 02:45:16,179] {settings.py:182} INFO - 
settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, 
pid=17786
[2019-10-17 02:45:16,341] {base_task_runner.py:101} INFO - Job 154142: Subtask 
branch_task [2019-10-17 02:45:16,340] {__init__.py:51} INFO - Using executor 
CeleryExecutor
[2019-10-17 02:45:16,643] {base_task_runner.py:101} INFO - Job 154142: Subtask 
branch_task [2019-10-17 02:45:16,642] {__init__.py:305} INFO - Filling up the 
DagBag from /opt/workflow/airflow/dags/sf_dags_n/dm_sf_main.py
[2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask 
branch_task 
[2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask 
branch_task schedule: 45 1,2,3,4,5,6 * * *
[2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask 
branch_task divisor: 5
[2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask 
branch_task 
[2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask 
branch_task ###  found sales_forecast_prophet in operator_pyspark_conf, 
overwrite default conf !
[2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask 
branch_task 
[2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask 
branch_task ## Job[sales_forecast_prophet] Command-line :
[2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask 
branch_task ---
[2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask 
branch_task 
[2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask 
branch_task spark-submitx
[2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask 
branch_task 
[2019-10-17 02:45:16,704] 

[jira] [Updated] (AIRFLOW-5795) Airflow cached old Code and Veriables

2019-10-27 Thread kasim (Jira)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-5795?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

kasim updated AIRFLOW-5795:
---
Description: 
My dag start_date is configed in Variables, 

 
{code:java}
from datetime import datetime
from airflow.models import Variable

class Config(object):
version = "V21"

dag_start_date = datetime(2019, 10, 25)
sf_schedule_report = "30 8 * * *"
sf_schedule_etl = '30 1 * * *'
sf_schedule_main = "45 3,4,5,6,7,8 * * *"


CONFIG_KEY = 'sf_config_%s' % Config.version

sf_config = Variable.get(CONFIG_KEY, deserialize_json=True, default_var={})

if sf_config:
for k, v in sf_config.items():
print(f'Overwrite {k} by {v}')
if hasattr(Config, k):
if k == 'dag_start_date':
setattr(Config, k, datetime.strptime(v, '%Y-%m-%d') )
else:
setattr(Config, k, v)

print(' config ### \n\n')

print(f'CONFIG_KEY: {CONFIG_KEY}')
print(f'CONFIG DICT: {sf_config}')

print(' config end ### \n\n')
{code}
My dag init as :
{code:java}
dag = DAG('dm_sf_main_%s' % Config.version, 
start_date=Config.dag_start_date,
default_args=default_args, 
schedule_interval=schedule,
user_defined_filters={
'mod' : lambda s, d:s%d
},
)
{code}
Variables is :

!image-2019-10-28-12-32-55-531.png!

 

 

But the log shows:
 * airflow tried 4 times
 * it didn't read Variable at first time
 * it use somewhere cached start start_date `2019-10-17` ,  schedule: 45 
1,2,3,4,5,6 * * *   (This is old settings on Variable 10 days ago )
 * it did read new Variable on later attampt

 

I have tried delete files, delete dag on webui, delete related task_instace in 
database. But still same error .

I think there are some cache in scheduler memory cache , I can't restart 
airflow , so can't confirm. But this shouldn't happen.

 
{code:java}
*** Reading local file: 
/data/opt/workflow/airflow/logs/dm_sf_main_V21/branch_task/2019-10-17T01:45:00+08:00/1.log
[2019-10-17 02:45:15,628] {__init__.py:1139} INFO - Dependencies all met for 

[2019-10-17 02:45:15,644] {__init__.py:1139} INFO - Dependencies all met for 

[2019-10-17 02:45:15,644] {__init__.py:1353} INFO - 

[2019-10-17 02:45:15,645] {__init__.py:1354} INFO - Starting attempt 1 of 1
[2019-10-17 02:45:15,645] {__init__.py:1355} INFO - 

[2019-10-17 02:45:15,702] {__init__.py:1374} INFO - Executing 
 on 2019-10-17T01:45:00+08:00
[2019-10-17 02:45:15,702] {base_task_runner.py:119} INFO - Running: ['airflow', 
'run', 'dm_sf_main_V21', 'branch_task', '2019-10-17T01:45:00+08:00', 
'--job_id', '154142', '--raw', '-sd', 'DAGS_FOLDER/sf_dags_n/dm_sf_main.py', 
'--cfg_path', '/tmp/tmp4p1ml6pq']
[2019-10-17 02:45:16,180] {base_task_runner.py:101} INFO - Job 154142: Subtask 
branch_task [2019-10-17 02:45:16,179] {settings.py:182} INFO - 
settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, 
pid=17786
[2019-10-17 02:45:16,341] {base_task_runner.py:101} INFO - Job 154142: Subtask 
branch_task [2019-10-17 02:45:16,340] {__init__.py:51} INFO - Using executor 
CeleryExecutor
[2019-10-17 02:45:16,643] {base_task_runner.py:101} INFO - Job 154142: Subtask 
branch_task [2019-10-17 02:45:16,642] {__init__.py:305} INFO - Filling up the 
DagBag from /opt/workflow/airflow/dags/sf_dags_n/dm_sf_main.py
[2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask 
branch_task 
[2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask 
branch_task schedule: 45 1,2,3,4,5,6 * * *
[2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask 
branch_task divisor: 5
[2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask 
branch_task 
[2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask 
branch_task ###  found sales_forecast_prophet in operator_pyspark_conf, 
overwrite default conf !
[2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask 
branch_task 
[2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask 
branch_task ## Job[sales_forecast_prophet] Command-line :
[2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask 
branch_task ---
[2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask 
branch_task 
[2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask 
branch_task spark-submitx
[2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask 
branch_task 
[2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask 
branch_task ---
[2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask 
branch_task 
[2019-10-17 

[jira] [Updated] (AIRFLOW-5795) Airflow cached old Code and Veriables

2019-10-27 Thread kasim (Jira)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-5795?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

kasim updated AIRFLOW-5795:
---
Description: 
My dag start_date is configed in Variables, 

 
{code:java}
from datetime import datetime
from airflow.models import Variable

class Config(object):
version = "V21"

dag_start_date = datetime(2019, 10, 25)
sf_schedule_report = "30 8 * * *"
sf_schedule_etl = '30 1 * * *'
sf_schedule_main = "45 3,4,5,6,7,8 * * *"


CONFIG_KEY = 'sf_config_%s' % Config.version

sf_config = Variable.get(CONFIG_KEY, deserialize_json=True, default_var={})

if sf_config:
for k, v in sf_config.items():
print(f'Overwrite {k} by {v}')
if hasattr(Config, k):
if k == 'dag_start_date':
setattr(Config, k, datetime.strptime(v, '%Y-%m-%d') )
else:
setattr(Config, k, v)

print(' config ### \n\n')

print(f'CONFIG_KEY: {CONFIG_KEY}')
print(f'CONFIG DICT: {sf_config}')

print(' config end ### \n\n')
{code}
My dag init as :
{code:java}
dag = DAG('dm_sf_etl_%s' % Config.version, 
start_date=Config.dag_start_date,
default_args=default_args, 
schedule_interval=schedule,
user_defined_filters={
'mod' : lambda s, d:s%d
},
)
{code}
Variables is :

!image-2019-10-28-12-32-55-531.png!

 

 

But the log shows:
 * airflow tried 4 times
 * it didn't read Variable at first time
 * it use somewhere cached start start_date `2019-10-17` ,  schedule: 45 
1,2,3,4,5,6 * * *   (This is old settings on Variable 10 days ago )
 * it did read new Variable on later attampt

 

I have tried delete files, delete dag on webui, delete related task_instace in 
database. But still same error .

I think there are some cache in scheduler memory cache , I can't restart 
airflow , so can't confirm. But this shouldn't happen.

 
{code:java}
*** Reading local file: 
/data/opt/workflow/airflow/logs/dm_sf_main_V21/branch_task/2019-10-17T01:45:00+08:00/1.log
[2019-10-17 02:45:15,628] {__init__.py:1139} INFO - Dependencies all met for 

[2019-10-17 02:45:15,644] {__init__.py:1139} INFO - Dependencies all met for 

[2019-10-17 02:45:15,644] {__init__.py:1353} INFO - 

[2019-10-17 02:45:15,645] {__init__.py:1354} INFO - Starting attempt 1 of 1
[2019-10-17 02:45:15,645] {__init__.py:1355} INFO - 

[2019-10-17 02:45:15,702] {__init__.py:1374} INFO - Executing 
 on 2019-10-17T01:45:00+08:00
[2019-10-17 02:45:15,702] {base_task_runner.py:119} INFO - Running: ['airflow', 
'run', 'dm_sf_main_V21', 'branch_task', '2019-10-17T01:45:00+08:00', 
'--job_id', '154142', '--raw', '-sd', 'DAGS_FOLDER/sf_dags_n/dm_sf_main.py', 
'--cfg_path', '/tmp/tmp4p1ml6pq']
[2019-10-17 02:45:16,180] {base_task_runner.py:101} INFO - Job 154142: Subtask 
branch_task [2019-10-17 02:45:16,179] {settings.py:182} INFO - 
settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, 
pid=17786
[2019-10-17 02:45:16,341] {base_task_runner.py:101} INFO - Job 154142: Subtask 
branch_task [2019-10-17 02:45:16,340] {__init__.py:51} INFO - Using executor 
CeleryExecutor
[2019-10-17 02:45:16,643] {base_task_runner.py:101} INFO - Job 154142: Subtask 
branch_task [2019-10-17 02:45:16,642] {__init__.py:305} INFO - Filling up the 
DagBag from /opt/workflow/airflow/dags/sf_dags_n/dm_sf_main.py
[2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask 
branch_task 
[2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask 
branch_task schedule: 45 1,2,3,4,5,6 * * *
[2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask 
branch_task divisor: 5
[2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask 
branch_task 
[2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask 
branch_task ###  found sales_forecast_prophet in operator_pyspark_conf, 
overwrite default conf !
[2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask 
branch_task 
[2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask 
branch_task ## Job[sales_forecast_prophet] Command-line :
[2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask 
branch_task ---
[2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask 
branch_task 
[2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask 
branch_task spark-submitx
[2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask 
branch_task 
[2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask 
branch_task ---
[2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask 
branch_task 
[2019-10-17 

[jira] [Updated] (AIRFLOW-5795) Airflow cached old Code and Veriables

2019-10-27 Thread kasim (Jira)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-5795?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

kasim updated AIRFLOW-5795:
---
Attachment: (was: 1)A]~UL_N@TVU072M)68WHP.png)

> Airflow cached old Code and Veriables 
> --
>
> Key: AIRFLOW-5795
> URL: https://issues.apache.org/jira/browse/AIRFLOW-5795
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: DAG, database
>Affects Versions: 1.10.3
>Reporter: kasim
>Priority: Major
> Attachments: image-2019-10-28-12-32-55-531.png
>
>
> My dag start_date is configed in Variables, 
>  
> {code:java}
> from datetime import datetime
> from airflow.models import Variable
> class Config(object):
> version = "V21"
> 
> dag_start_date = datetime(2019, 10, 25)
> sf_schedule_report = "30 8 * * *"
> sf_schedule_etl = '30 1 * * *'
> sf_schedule_main = "45 3,4,5,6,7,8 * * *"
> CONFIG_KEY = 'sf_config_%s' % Config.version
> sf_config = Variable.get(CONFIG_KEY, deserialize_json=True, default_var={})
> if sf_config:
> for k, v in sf_config.items():
> print(f'Overwrite {k} by {v}')
> if hasattr(Config, k):
> if k == 'dag_start_date':
> setattr(Config, k, datetime.strptime(v, '%Y-%m-%d') )
> else:
> setattr(Config, k, v)
> print(' config ### \n\n')
> print(f'CONFIG_KEY: {CONFIG_KEY}')
> print(f'CONFIG DICT: {sf_config}')
> print(' config end ### \n\n')
> {code}
> My dag init as :
> {code:java}
> dag = DAG('dm_sf_etl_%s' % Config.version, 
> start_date=Config.dag_start_date,
> default_args=default_args, 
> schedule_interval=schedule,
> user_defined_filters={
> 'mod' : lambda s, d:s%d
> },
> )
> {code}
> Variables is :
> !image-2019-10-28-12-32-55-531.png!
>  
>  
> But the log shows:
>  * airflow tried 4 times
>  * it didn't read Variable at first time
>  * it use somewhere cached start start_date `2019-10-17` ,  schedule: 45 
> 1,2,3,4,5,6 * * *   (This is old settings on Variable 10 days ago )
>  * it did read new Variable on later attampt
>  
> I have tried delete files, delete dag on webui, delete related task_instace 
> in database. But still same error .
> I think there are some cache in scheduler memory cache , I can't restart 
> airflow , so can't confirm. But this shouldn't happen.
>  
> {code:java}
> *** Reading local file: 
> /data/opt/workflow/airflow/logs/dm_sf_main_V21/branch_task/2019-10-17T01:45:00+08:00/1.log
> [2019-10-17 02:45:15,628] {__init__.py:1139} INFO - Dependencies all met for 
> 
> [2019-10-17 02:45:15,644] {__init__.py:1139} INFO - Dependencies all met for 
> 
> [2019-10-17 02:45:15,644] {__init__.py:1353} INFO - 
> 
> [2019-10-17 02:45:15,645] {__init__.py:1354} INFO - Starting attempt 1 of 1
> [2019-10-17 02:45:15,645] {__init__.py:1355} INFO - 
> 
> [2019-10-17 02:45:15,702] {__init__.py:1374} INFO - Executing 
>  on 2019-10-17T01:45:00+08:00
> [2019-10-17 02:45:15,702] {base_task_runner.py:119} INFO - Running: 
> ['airflow', 'run', 'dm_sf_main_V21', 'branch_task', 
> '2019-10-17T01:45:00+08:00', '--job_id', '154142', '--raw', '-sd', 
> 'DAGS_FOLDER/sf_dags_n/dm_sf_main.py', '--cfg_path', '/tmp/tmp4p1ml6pq']
> [2019-10-17 02:45:16,180] {base_task_runner.py:101} INFO - Job 154142: 
> Subtask branch_task [2019-10-17 02:45:16,179] {settings.py:182} INFO - 
> settings.configure_orm(): Using pool settings. pool_size=5, 
> pool_recycle=1800, pid=17786
> [2019-10-17 02:45:16,341] {base_task_runner.py:101} INFO - Job 154142: 
> Subtask branch_task [2019-10-17 02:45:16,340] {__init__.py:51} INFO - Using 
> executor CeleryExecutor
> [2019-10-17 02:45:16,643] {base_task_runner.py:101} INFO - Job 154142: 
> Subtask branch_task [2019-10-17 02:45:16,642] {__init__.py:305} INFO - 
> Filling up the DagBag from /opt/workflow/airflow/dags/sf_dags_n/dm_sf_main.py
> [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: 
> Subtask branch_task 
> [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: 
> Subtask branch_task schedule: 45 1,2,3,4,5,6 * * *
> [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: 
> Subtask branch_task divisor: 5
> [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: 
> Subtask branch_task 
> [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: 
> Subtask branch_task ###  found sales_forecast_prophet in 
> operator_pyspark_conf, overwrite default conf !
> [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: 
> Subtask branch_task 
> [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: 
> Subtask branch_task ## 

[jira] [Updated] (AIRFLOW-5795) Airflow cached old Code and Veriables

2019-10-27 Thread kasim (Jira)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-5795?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

kasim updated AIRFLOW-5795:
---
Attachment: image-2019-10-28-12-32-55-531.png

> Airflow cached old Code and Veriables 
> --
>
> Key: AIRFLOW-5795
> URL: https://issues.apache.org/jira/browse/AIRFLOW-5795
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: DAG, database
>Affects Versions: 1.10.3
>Reporter: kasim
>Priority: Major
> Attachments: 1)A]~UL_N@TVU072M)68WHP.png, 
> image-2019-10-28-12-32-55-531.png
>
>
> My dag start_date is configed in Variables, 
>  
> {code:java}
> from datetime import datetime
> from airflow.models import Variable
> class Config(object):
> version = "V21"
> 
> dag_start_date = datetime(2019, 10, 25)
> sf_schedule_report = "30 8 * * *"
> sf_schedule_etl = '30 1 * * *'
> sf_schedule_main = "45 3,4,5,6,7,8 * * *"
> CONFIG_KEY = 'sf_config_%s' % Config.version
> sf_config = Variable.get(CONFIG_KEY, deserialize_json=True, default_var={})
> if sf_config:
> for k, v in sf_config.items():
> print(f'Overwrite {k} by {v}')
> if hasattr(Config, k):
> if k == 'dag_start_date':
> setattr(Config, k, datetime.strptime(v, '%Y-%m-%d') )
> else:
> setattr(Config, k, v)
> print(' config ### \n\n')
> print(f'CONFIG_KEY: {CONFIG_KEY}')
> print(f'CONFIG DICT: {sf_config}')
> print(' config end ### \n\n')
> {code}
> My dag init as :
> {code:java}
> dag = DAG('dm_sf_etl_%s' % Config.version, 
> start_date=Config.dag_start_date,
> default_args=default_args, 
> schedule_interval=schedule,
> user_defined_filters={
> 'mod' : lambda s, d:s%d
> },
> )
> {code}
> Variables is :
> !image-2019-10-28-12-25-56-300.png!
>  
>  
> But the log shows:
>  * airflow tried 4 times
>  * it didn't read Variable at first time
>  * it use somewhere cached start start_date `2019-10-17` ,  schedule: 45 
> 1,2,3,4,5,6 * * *   (This is old settings on Variable 10 days ago )
>  * it did read new Variable on later attampt
>  
> I have tried delete files, delete dag on webui, delete related task_instace 
> in database. But still same error .
> I think there are some cache in scheduler memory cache , I can't restart 
> airflow , so can't confirm. But this shouldn't happen.
>  
> {code:java}
> *** Reading local file: 
> /data/opt/workflow/airflow/logs/dm_sf_main_V21/branch_task/2019-10-17T01:45:00+08:00/1.log
> [2019-10-17 02:45:15,628] {__init__.py:1139} INFO - Dependencies all met for 
> 
> [2019-10-17 02:45:15,644] {__init__.py:1139} INFO - Dependencies all met for 
> 
> [2019-10-17 02:45:15,644] {__init__.py:1353} INFO - 
> 
> [2019-10-17 02:45:15,645] {__init__.py:1354} INFO - Starting attempt 1 of 1
> [2019-10-17 02:45:15,645] {__init__.py:1355} INFO - 
> 
> [2019-10-17 02:45:15,702] {__init__.py:1374} INFO - Executing 
>  on 2019-10-17T01:45:00+08:00
> [2019-10-17 02:45:15,702] {base_task_runner.py:119} INFO - Running: 
> ['airflow', 'run', 'dm_sf_main_V21', 'branch_task', 
> '2019-10-17T01:45:00+08:00', '--job_id', '154142', '--raw', '-sd', 
> 'DAGS_FOLDER/sf_dags_n/dm_sf_main.py', '--cfg_path', '/tmp/tmp4p1ml6pq']
> [2019-10-17 02:45:16,180] {base_task_runner.py:101} INFO - Job 154142: 
> Subtask branch_task [2019-10-17 02:45:16,179] {settings.py:182} INFO - 
> settings.configure_orm(): Using pool settings. pool_size=5, 
> pool_recycle=1800, pid=17786
> [2019-10-17 02:45:16,341] {base_task_runner.py:101} INFO - Job 154142: 
> Subtask branch_task [2019-10-17 02:45:16,340] {__init__.py:51} INFO - Using 
> executor CeleryExecutor
> [2019-10-17 02:45:16,643] {base_task_runner.py:101} INFO - Job 154142: 
> Subtask branch_task [2019-10-17 02:45:16,642] {__init__.py:305} INFO - 
> Filling up the DagBag from /opt/workflow/airflow/dags/sf_dags_n/dm_sf_main.py
> [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: 
> Subtask branch_task 
> [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: 
> Subtask branch_task schedule: 45 1,2,3,4,5,6 * * *
> [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: 
> Subtask branch_task divisor: 5
> [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: 
> Subtask branch_task 
> [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: 
> Subtask branch_task ###  found sales_forecast_prophet in 
> operator_pyspark_conf, overwrite default conf !
> [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: 
> Subtask branch_task 
> [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: 
> Subtask 

[jira] [Updated] (AIRFLOW-5795) Airflow cached old Code and Veriables

2019-10-27 Thread kasim (Jira)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-5795?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

kasim updated AIRFLOW-5795:
---
Attachment: 1)A]~UL_N@TVU072M)68WHP.png

> Airflow cached old Code and Veriables 
> --
>
> Key: AIRFLOW-5795
> URL: https://issues.apache.org/jira/browse/AIRFLOW-5795
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: DAG, database
>Affects Versions: 1.10.3
>Reporter: kasim
>Priority: Major
> Attachments: 1)A]~UL_N@TVU072M)68WHP.png, 
> image-2019-10-28-12-32-55-531.png
>
>
> My dag start_date is configed in Variables, 
>  
> {code:java}
> from datetime import datetime
> from airflow.models import Variable
> class Config(object):
> version = "V21"
> 
> dag_start_date = datetime(2019, 10, 25)
> sf_schedule_report = "30 8 * * *"
> sf_schedule_etl = '30 1 * * *'
> sf_schedule_main = "45 3,4,5,6,7,8 * * *"
> CONFIG_KEY = 'sf_config_%s' % Config.version
> sf_config = Variable.get(CONFIG_KEY, deserialize_json=True, default_var={})
> if sf_config:
> for k, v in sf_config.items():
> print(f'Overwrite {k} by {v}')
> if hasattr(Config, k):
> if k == 'dag_start_date':
> setattr(Config, k, datetime.strptime(v, '%Y-%m-%d') )
> else:
> setattr(Config, k, v)
> print(' config ### \n\n')
> print(f'CONFIG_KEY: {CONFIG_KEY}')
> print(f'CONFIG DICT: {sf_config}')
> print(' config end ### \n\n')
> {code}
> My dag init as :
> {code:java}
> dag = DAG('dm_sf_etl_%s' % Config.version, 
> start_date=Config.dag_start_date,
> default_args=default_args, 
> schedule_interval=schedule,
> user_defined_filters={
> 'mod' : lambda s, d:s%d
> },
> )
> {code}
> Variables is :
> !image-2019-10-28-12-25-56-300.png!
>  
>  
> But the log shows:
>  * airflow tried 4 times
>  * it didn't read Variable at first time
>  * it use somewhere cached start start_date `2019-10-17` ,  schedule: 45 
> 1,2,3,4,5,6 * * *   (This is old settings on Variable 10 days ago )
>  * it did read new Variable on later attampt
>  
> I have tried delete files, delete dag on webui, delete related task_instace 
> in database. But still same error .
> I think there are some cache in scheduler memory cache , I can't restart 
> airflow , so can't confirm. But this shouldn't happen.
>  
> {code:java}
> *** Reading local file: 
> /data/opt/workflow/airflow/logs/dm_sf_main_V21/branch_task/2019-10-17T01:45:00+08:00/1.log
> [2019-10-17 02:45:15,628] {__init__.py:1139} INFO - Dependencies all met for 
> 
> [2019-10-17 02:45:15,644] {__init__.py:1139} INFO - Dependencies all met for 
> 
> [2019-10-17 02:45:15,644] {__init__.py:1353} INFO - 
> 
> [2019-10-17 02:45:15,645] {__init__.py:1354} INFO - Starting attempt 1 of 1
> [2019-10-17 02:45:15,645] {__init__.py:1355} INFO - 
> 
> [2019-10-17 02:45:15,702] {__init__.py:1374} INFO - Executing 
>  on 2019-10-17T01:45:00+08:00
> [2019-10-17 02:45:15,702] {base_task_runner.py:119} INFO - Running: 
> ['airflow', 'run', 'dm_sf_main_V21', 'branch_task', 
> '2019-10-17T01:45:00+08:00', '--job_id', '154142', '--raw', '-sd', 
> 'DAGS_FOLDER/sf_dags_n/dm_sf_main.py', '--cfg_path', '/tmp/tmp4p1ml6pq']
> [2019-10-17 02:45:16,180] {base_task_runner.py:101} INFO - Job 154142: 
> Subtask branch_task [2019-10-17 02:45:16,179] {settings.py:182} INFO - 
> settings.configure_orm(): Using pool settings. pool_size=5, 
> pool_recycle=1800, pid=17786
> [2019-10-17 02:45:16,341] {base_task_runner.py:101} INFO - Job 154142: 
> Subtask branch_task [2019-10-17 02:45:16,340] {__init__.py:51} INFO - Using 
> executor CeleryExecutor
> [2019-10-17 02:45:16,643] {base_task_runner.py:101} INFO - Job 154142: 
> Subtask branch_task [2019-10-17 02:45:16,642] {__init__.py:305} INFO - 
> Filling up the DagBag from /opt/workflow/airflow/dags/sf_dags_n/dm_sf_main.py
> [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: 
> Subtask branch_task 
> [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: 
> Subtask branch_task schedule: 45 1,2,3,4,5,6 * * *
> [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: 
> Subtask branch_task divisor: 5
> [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: 
> Subtask branch_task 
> [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: 
> Subtask branch_task ###  found sales_forecast_prophet in 
> operator_pyspark_conf, overwrite default conf !
> [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: 
> Subtask branch_task 
> [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: 
> Subtask 

[jira] [Closed] (AIRFLOW-4806) Raise `Cannot execute error` even if that dag successed

2019-09-23 Thread kasim (Jira)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-4806?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

kasim closed AIRFLOW-4806.
--
Resolution: Not A Bug

> Raise `Cannot execute error` even if that dag successed
> ---
>
> Key: AIRFLOW-4806
> URL: https://issues.apache.org/jira/browse/AIRFLOW-4806
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: DagRun
>Affects Versions: 1.10.3
>Reporter: kasim
>Priority: Major
>
>  
> airflow log : 
>  
> {code:java}
> *** Log file does not exist: 
> /opt/airflow/logs/test_wordcount/test_wordcount/2019-05-31T16:00:00+00:00/7.log
> *** Fetching from: 
> http://dc07:8793/log/test_wordcount/test_wordcount/2019-05-31T16:00:00+00:00/7.log
> [2019-06-18 15:58:33,562] {__init__.py:1139} INFO - Dependencies all met for 
>  [queued]>
> [2019-06-18 15:58:33,585] {__init__.py:1139} INFO - Dependencies all met for 
>  [queued]>
> [2019-06-18 15:58:33,585] {__init__.py:1353} INFO - 
> 
> [2019-06-18 15:58:33,585] {__init__.py:1354} INFO - Starting attempt 7 of 7
> [2019-06-18 15:58:33,585] {__init__.py:1355} INFO - 
> 
> [2019-06-18 15:58:33,594] {__init__.py:1374} INFO - Executing 
>  on 2019-05-31T16:00:00+00:00
> [2019-06-18 15:58:33,594] {base_task_runner.py:119} INFO - Running: 
> ['airflow', 'run', 'test_wordcount', 'test_wordcount', 
> '2019-05-31T16:00:00+00:00', '--job_id', '413', '--raw', '-sd', 
> 'DAGS_FOLDER/wordcount.py', '--cfg_path', '/tmp/tmpkqb2n943']
> [2019-06-18 15:58:34,094] {base_task_runner.py:101} INFO - Job 413: Subtask 
> test_wordcount 
> /opt/anaconda3/envs/airflow/lib/python3.6/site-packages/airflow/configuration.py:590:
>  DeprecationWarning: You have two airflow.cfg files: 
> /opt/airflow/airflow/airflow.cfg and /opt/airflow/airflow.cfg. Airflow used 
> to look at ~/airflow/airflow.cfg, even when AIRFLOW_HOME was set to a 
> different value. Airflow will now only read /opt/airflow/airflow.cfg, and you 
> should remove the other file
> [2019-06-18 15:58:34,095] {base_task_runner.py:101} INFO - Job 413: Subtask 
> test_wordcount   category=DeprecationWarning,
> [2019-06-18 15:58:34,191] {base_task_runner.py:101} INFO - Job 413: Subtask 
> test_wordcount [2019-06-18 15:58:34,191] {settings.py:182} INFO - 
> settings.configure_orm(): Using pool settings. pool_size=5, 
> pool_recycle=1800, pid=30605
> [2019-06-18 15:58:34,429] {base_task_runner.py:101} INFO - Job 413: Subtask 
> test_wordcount [2019-06-18 15:58:34,429] {default_celery.py:90} WARNING - You 
> have configured a result_backend of redis://192.168.20.17/1, it is highly 
> recommended to use an alternative result_backend (i.e. a database).
> [2019-06-18 15:58:34,430] {base_task_runner.py:101} INFO - Job 413: Subtask 
> test_wordcount [2019-06-18 15:58:34,430] {__init__.py:51} INFO - Using 
> executor CeleryExecutor
> [2019-06-18 15:58:34,704] {base_task_runner.py:101} INFO - Job 413: Subtask 
> test_wordcount [2019-06-18 15:58:34,704] {__init__.py:305} INFO - Filling up 
> the DagBag from /opt/airflow/dags/wordcount.py
> [2019-06-18 15:58:34,754] {base_task_runner.py:101} INFO - Job 413: Subtask 
> test_wordcount [2019-06-18 15:58:34,754] {cli.py:517} INFO - Running 
>  [running]> on host dc07
> [2019-06-18 15:58:34,875] {logging_mixin.py:95} INFO - [2019-06-18 
> 15:58:34,875] {base_hook.py:83} INFO - Using connection to: id: 
> spark_default. Host: yarn, Port: None, Schema: None, Login: None, Password: 
> None, extra: {'master': 'yarn', 'deploy-mode': 'cluster', 'queue': 'data', 
> 'env_vars': {'HADOOP_USER_NAME': 'hdfs'}, 'spark_home': 
> '/opt/cloudera/parcels/CDH/lib/spark/'}
> [2019-06-18 15:58:34,876] {logging_mixin.py:95} INFO - [2019-06-18 
> 15:58:34,876] {spark_submit_hook.py:295} INFO - Spark-Submit cmd: 
> ['spark-submit', '--master', 'yarn', '--py-files', 
> '/opt/airflow/dags/jobs.zip,/opt/airflow/dags/libs.zip', '--num-executors', 
> '1', '--executor-cores', '1', '--executor-memory', '1g', '--driver-memory', 
> '1g', '--name', 'test_wordcount', '--queue', 'data', '--deploy-mode', 
> 'cluster', '/opt/airflow/dags/main.py', '--job', 'wordcount', '--job-args', 
> 'input_path=/test/words.txt', 'output_path=/test/wordcount.csv']
> [2019-06-18 15:58:41,557] {logging_mixin.py:95} INFO - [2019-06-18 
> 15:58:41,557] {spark_submit_hook.py:400} INFO - Identified spark driver id: 
> application_1560762064551_0032
> [2019-06-18 15:58:44,791] {logging_mixin.py:95} INFO - [2019-06-18 
> 15:58:44,790] {spark_submit_hook.py:400} INFO - Identified spark driver id: 
> application_1560762064551_0032
> [2019-06-18 15:58:44,929] {logging_mixin.py:95} INFO - [2019-06-18 
> 15:58:44,929] {spark_submit_hook.py:400} INFO - Identified spark 

[jira] [Updated] (AIRFLOW-5538) Add a flag to make scheduling trigger on start_date instead of execution_date (make execution_date equal to start_date)

2019-09-22 Thread kasim (Jira)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-5538?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

kasim updated AIRFLOW-5538:
---
Description: 
>From [https://airflow.apache.org/scheduler.html] :

> Note that if you run a DAG on a schedule_interval of one day, the run
 > stamped 2016-01-01 will be trigger soon after 2016-01-01T23:59. In
 > other words, the job instance is started once the period it covers has
 > ended.

This feature is very hurt .

For example I have etl job which run every day, schedule_interval is `0 1 * * 
*`, so it will trigger 2019-09-22 01:00:00 job on 2019-09-23 01:00:00 . But my 
etl is processing all data before start_date , means data range is between 
(history, 2019-09-23 00:00:00) , and we can't use `datetime.now()` because this 
is unable to reproduce. This force me add 1 day to execution_date:
 ```python
 etl_end_time = "\{{ (execution_date + 
macros.timedelta(days=1)).strftime('%Y-%m-%d 00:00:00') }}"
 ```

However, when I need run a job with schedule_interval `45 2,3,4,5,6 * * *` , 
the `2019-09-22 06:45:00` job would run on `2019-09-23 02:45:00`, which is the 
day after execution_date . Instead of adding a day, I had to changed 
schedule_interval `45 2,3,4,5,6,7 * * *` and put a dummy operator on last run.
 And in this situation , you don't need add one day to execution_date , this 
means you have to define two `etl_end_time` to represent a same date in jobs 
with different schedule_interval .

All these are very uncomfortable for me , adding a config or built-in method to 
make execution_date equal to start_date. would be very nice . If this become 
true, would only have one etl_end_time = "\{{ 
(execution_date.strftime('%Y-%m-%d 00:00:00') }}" . 

  was:
>From [https://airflow.apache.org/scheduler.html] :

> Note that if you run a DAG on a schedule_interval of one day, the run
 > stamped 2016-01-01 will be trigger soon after 2016-01-01T23:59. In
 > other words, the job instance is started once the period it covers has
 > ended.

This feature is very hurt .

For example I have etl job which run every day, schedule_interval is `0 1 * * 
*`, so it will trigger 2019-09-22 01:00:00 job on 2019-09-23 01:00:00 . But my 
etl is processing all data before start_date , means data range is between 
(history, 2019-09-23 00:00:00) , and we can't use `datetime.now()` because this 
is unable to reproduce. This force me add 1 day to execution_date:
 ```python
 etl_end_time = "\{{ (execution_date + 
macros.timedelta(days=1)).strftime('%Y-%m-%d 00:00:00') }}"
 ```

However, when I need run a job with schedule_interval `45 2,3,4,5,6 * * *` , 
the `2019-09-22 06:45:00` job would run on `2019-09-23 02:45:00`, which is the 
day after execution_date . Instead of adding a day, I had to changed 
schedule_interval `45 2,3,4,5,6,7 * * *` and put a dummy operator on last run.
 And in this situation , you don't need add one day to execution_date , this 
means you have to define two `etl_end_time` to represent a same date in jobs 
with different schedule_interval .

All these are very uncomfortable for me , adding a config or built-in method to 
make execution_date equal to start_date. would be very nice . 


> Add a flag to make scheduling trigger on start_date instead of execution_date 
> (make execution_date equal to start_date)
> ---
>
> Key: AIRFLOW-5538
> URL: https://issues.apache.org/jira/browse/AIRFLOW-5538
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: DagRun
>Affects Versions: 1.10.5
>Reporter: kasim
>Priority: Major
>
> From [https://airflow.apache.org/scheduler.html] :
> > Note that if you run a DAG on a schedule_interval of one day, the run
>  > stamped 2016-01-01 will be trigger soon after 2016-01-01T23:59. In
>  > other words, the job instance is started once the period it covers has
>  > ended.
> This feature is very hurt .
> For example I have etl job which run every day, schedule_interval is `0 1 * * 
> *`, so it will trigger 2019-09-22 01:00:00 job on 2019-09-23 01:00:00 . But 
> my etl is processing all data before start_date , means data range is between 
> (history, 2019-09-23 00:00:00) , and we can't use `datetime.now()` because 
> this is unable to reproduce. This force me add 1 day to execution_date:
>  ```python
>  etl_end_time = "\{{ (execution_date + 
> macros.timedelta(days=1)).strftime('%Y-%m-%d 00:00:00') }}"
>  ```
> However, when I need run a job with schedule_interval `45 2,3,4,5,6 * * *` , 
> the `2019-09-22 06:45:00` job would run on `2019-09-23 02:45:00`, which is 
> the day after execution_date . Instead of adding a day, I had to changed 
> schedule_interval `45 2,3,4,5,6,7 * * *` and put a dummy operator on last run.
>  And in this situation , you don't need add one day to 

[jira] [Updated] (AIRFLOW-5538) Add a flag to make scheduling trigger on start_date instead of execution_date (make execution_date equal to start_date)

2019-09-22 Thread kasim (Jira)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-5538?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

kasim updated AIRFLOW-5538:
---
Description: 
>From [https://airflow.apache.org/scheduler.html] :

> Note that if you run a DAG on a schedule_interval of one day, the run
 > stamped 2016-01-01 will be trigger soon after 2016-01-01T23:59. In
 > other words, the job instance is started once the period it covers has
 > ended.

This feature is very hurt .

For example I have etl job which run every day, schedule_interval is `0 1 * * 
*`, so it will trigger 2019-09-22 01:00:00 job on 2019-09-23 01:00:00 . But my 
etl is processing all data before start_date , means data range is between 
(history, 2019-09-23 00:00:00) , and we can't use `datetime.now()` because this 
is unable to reproduce. This force me add 1 day to execution_date:
 ```python
 etl_end_time = "\{{ (execution_date + 
macros.timedelta(days=1)).strftime('%Y-%m-%d 00:00:00') }}"
 ```

However, when I need run a job with schedule_interval `45 2,3,4,5,6 * * *` , 
the `2019-09-22 06:45:00` job would run on `2019-09-23 02:45:00`, which is the 
day after execution_date . Instead of adding a day, I had to changed 
schedule_interval `45 2,3,4,5,6,7 * * *` and put a dummy operator on last run.
 And in this situation , you don't need add one day to execution_date , this 
means you have to define two `etl_end_time` to represent a same date in jobs 
with different schedule_interval .

All these are very uncomfortable for me , adding a config or built-in method to 
make execution_date equal to start_date. would be very nice . 

  was:
>From https://airflow.apache.org/scheduler.html :

> Note that if you run a DAG on a schedule_interval of one day, the run
> stamped 2016-01-01 will be trigger soon after 2016-01-01T23:59. In
> other words, the job instance is started once the period it covers has
> ended.

This feature is very hurt .

For example I have etl job which run every day, schedule_interval is `0 1 * * 
*`, so it will trigger 2019-09-22 01:00:00 job on 2019-09-23 01:00:00 . But my 
etl is processing all data before start_date , means data range is between 
(history, 2019-09-23 00:00:00) , and we can't use `datetime.now()` because this 
is unable to reproduce. This force me add 1 day to execution_date:
```python
etl_end_time = "\{{ (execution_date + 
macros.timedelta(days=1)).strftime('%Y-%m-%d 00:00:00') }}"
```

However, when I need run a job with schedule_interval `45 2,3,4,5,6 * * *` , 
the `2019-09-22 06:45:00` job would run on `2019-09-23 02:45:00`, which is the 
day after execution_date . Instead of adding a day, I had to changed 
schedule_interval `45 2,3,4,5,6,7 * * *` and put a dummy operator on last run.
And in this situation , you don't need add one day to execution_date , this 
means you have to define two `etl_end_time` to represent a same date in jobs 
with different schedule_interval .

All these are very uncomfortable for me , is there any config or built-in 
method to make execution_date equal to start_date ? Or I have to modify airflow 
source code ...

 


> Add a flag to make scheduling trigger on start_date instead of execution_date 
> (make execution_date equal to start_date)
> ---
>
> Key: AIRFLOW-5538
> URL: https://issues.apache.org/jira/browse/AIRFLOW-5538
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: DagRun
>Affects Versions: 1.10.5
>Reporter: kasim
>Priority: Major
>
> From [https://airflow.apache.org/scheduler.html] :
> > Note that if you run a DAG on a schedule_interval of one day, the run
>  > stamped 2016-01-01 will be trigger soon after 2016-01-01T23:59. In
>  > other words, the job instance is started once the period it covers has
>  > ended.
> This feature is very hurt .
> For example I have etl job which run every day, schedule_interval is `0 1 * * 
> *`, so it will trigger 2019-09-22 01:00:00 job on 2019-09-23 01:00:00 . But 
> my etl is processing all data before start_date , means data range is between 
> (history, 2019-09-23 00:00:00) , and we can't use `datetime.now()` because 
> this is unable to reproduce. This force me add 1 day to execution_date:
>  ```python
>  etl_end_time = "\{{ (execution_date + 
> macros.timedelta(days=1)).strftime('%Y-%m-%d 00:00:00') }}"
>  ```
> However, when I need run a job with schedule_interval `45 2,3,4,5,6 * * *` , 
> the `2019-09-22 06:45:00` job would run on `2019-09-23 02:45:00`, which is 
> the day after execution_date . Instead of adding a day, I had to changed 
> schedule_interval `45 2,3,4,5,6,7 * * *` and put a dummy operator on last run.
>  And in this situation , you don't need add one day to execution_date , this 
> means you have to define two `etl_end_time` to represent a same date in 

[jira] [Created] (AIRFLOW-5538) Add a flag to make scheduling trigger on start_date instead of execution_date (make execution_date equal to start_date)

2019-09-22 Thread kasim (Jira)
kasim created AIRFLOW-5538:
--

 Summary: Add a flag to make scheduling trigger on start_date 
instead of execution_date (make execution_date equal to start_date)
 Key: AIRFLOW-5538
 URL: https://issues.apache.org/jira/browse/AIRFLOW-5538
 Project: Apache Airflow
  Issue Type: Improvement
  Components: DagRun
Affects Versions: 1.10.5
Reporter: kasim


>From https://airflow.apache.org/scheduler.html :

> Note that if you run a DAG on a schedule_interval of one day, the run
> stamped 2016-01-01 will be trigger soon after 2016-01-01T23:59. In
> other words, the job instance is started once the period it covers has
> ended.

This feature is very hurt .

For example I have etl job which run every day, schedule_interval is `0 1 * * 
*`, so it will trigger 2019-09-22 01:00:00 job on 2019-09-23 01:00:00 . But my 
etl is processing all data before start_date , means data range is between 
(history, 2019-09-23 00:00:00) , and we can't use `datetime.now()` because this 
is unable to reproduce. This force me add 1 day to execution_date:
```python
etl_end_time = "\{{ (execution_date + 
macros.timedelta(days=1)).strftime('%Y-%m-%d 00:00:00') }}"
```

However, when I need run a job with schedule_interval `45 2,3,4,5,6 * * *` , 
the `2019-09-22 06:45:00` job would run on `2019-09-23 02:45:00`, which is the 
day after execution_date . Instead of adding a day, I had to changed 
schedule_interval `45 2,3,4,5,6,7 * * *` and put a dummy operator on last run.
And in this situation , you don't need add one day to execution_date , this 
means you have to define two `etl_end_time` to represent a same date in jobs 
with different schedule_interval .

All these are very uncomfortable for me , is there any config or built-in 
method to make execution_date equal to start_date ? Or I have to modify airflow 
source code ...

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (AIRFLOW-4806) Raise `Cannot execute error` even if that dag successed

2019-06-18 Thread kasim (JIRA)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-4806?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

kasim updated AIRFLOW-4806:
---
Description: 
 

airflow log : 

 
{code:java}
*** Log file does not exist: 
/opt/airflow/logs/test_wordcount/test_wordcount/2019-05-31T16:00:00+00:00/7.log
*** Fetching from: 
http://dc07:8793/log/test_wordcount/test_wordcount/2019-05-31T16:00:00+00:00/7.log

[2019-06-18 15:58:33,562] {__init__.py:1139} INFO - Dependencies all met for 

[2019-06-18 15:58:33,585] {__init__.py:1139} INFO - Dependencies all met for 

[2019-06-18 15:58:33,585] {__init__.py:1353} INFO - 

[2019-06-18 15:58:33,585] {__init__.py:1354} INFO - Starting attempt 7 of 7
[2019-06-18 15:58:33,585] {__init__.py:1355} INFO - 

[2019-06-18 15:58:33,594] {__init__.py:1374} INFO - Executing 
 on 2019-05-31T16:00:00+00:00
[2019-06-18 15:58:33,594] {base_task_runner.py:119} INFO - Running: ['airflow', 
'run', 'test_wordcount', 'test_wordcount', '2019-05-31T16:00:00+00:00', 
'--job_id', '413', '--raw', '-sd', 'DAGS_FOLDER/wordcount.py', '--cfg_path', 
'/tmp/tmpkqb2n943']
[2019-06-18 15:58:34,094] {base_task_runner.py:101} INFO - Job 413: Subtask 
test_wordcount 
/opt/anaconda3/envs/airflow/lib/python3.6/site-packages/airflow/configuration.py:590:
 DeprecationWarning: You have two airflow.cfg files: 
/opt/airflow/airflow/airflow.cfg and /opt/airflow/airflow.cfg. Airflow used to 
look at ~/airflow/airflow.cfg, even when AIRFLOW_HOME was set to a different 
value. Airflow will now only read /opt/airflow/airflow.cfg, and you should 
remove the other file
[2019-06-18 15:58:34,095] {base_task_runner.py:101} INFO - Job 413: Subtask 
test_wordcount   category=DeprecationWarning,
[2019-06-18 15:58:34,191] {base_task_runner.py:101} INFO - Job 413: Subtask 
test_wordcount [2019-06-18 15:58:34,191] {settings.py:182} INFO - 
settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, 
pid=30605
[2019-06-18 15:58:34,429] {base_task_runner.py:101} INFO - Job 413: Subtask 
test_wordcount [2019-06-18 15:58:34,429] {default_celery.py:90} WARNING - You 
have configured a result_backend of redis://192.168.20.17/1, it is highly 
recommended to use an alternative result_backend (i.e. a database).
[2019-06-18 15:58:34,430] {base_task_runner.py:101} INFO - Job 413: Subtask 
test_wordcount [2019-06-18 15:58:34,430] {__init__.py:51} INFO - Using executor 
CeleryExecutor
[2019-06-18 15:58:34,704] {base_task_runner.py:101} INFO - Job 413: Subtask 
test_wordcount [2019-06-18 15:58:34,704] {__init__.py:305} INFO - Filling up 
the DagBag from /opt/airflow/dags/wordcount.py
[2019-06-18 15:58:34,754] {base_task_runner.py:101} INFO - Job 413: Subtask 
test_wordcount [2019-06-18 15:58:34,754] {cli.py:517} INFO - Running 
 on host dc07
[2019-06-18 15:58:34,875] {logging_mixin.py:95} INFO - [2019-06-18 
15:58:34,875] {base_hook.py:83} INFO - Using connection to: id: spark_default. 
Host: yarn, Port: None, Schema: None, Login: None, Password: None, extra: 
{'master': 'yarn', 'deploy-mode': 'cluster', 'queue': 'data', 'env_vars': 
{'HADOOP_USER_NAME': 'hdfs'}, 'spark_home': 
'/opt/cloudera/parcels/CDH/lib/spark/'}
[2019-06-18 15:58:34,876] {logging_mixin.py:95} INFO - [2019-06-18 
15:58:34,876] {spark_submit_hook.py:295} INFO - Spark-Submit cmd: 
['spark-submit', '--master', 'yarn', '--py-files', 
'/opt/airflow/dags/jobs.zip,/opt/airflow/dags/libs.zip', '--num-executors', 
'1', '--executor-cores', '1', '--executor-memory', '1g', '--driver-memory', 
'1g', '--name', 'test_wordcount', '--queue', 'data', '--deploy-mode', 
'cluster', '/opt/airflow/dags/main.py', '--job', 'wordcount', '--job-args', 
'input_path=/test/words.txt', 'output_path=/test/wordcount.csv']
[2019-06-18 15:58:41,557] {logging_mixin.py:95} INFO - [2019-06-18 
15:58:41,557] {spark_submit_hook.py:400} INFO - Identified spark driver id: 
application_1560762064551_0032
[2019-06-18 15:58:44,791] {logging_mixin.py:95} INFO - [2019-06-18 
15:58:44,790] {spark_submit_hook.py:400} INFO - Identified spark driver id: 
application_1560762064551_0032
[2019-06-18 15:58:44,929] {logging_mixin.py:95} INFO - [2019-06-18 
15:58:44,929] {spark_submit_hook.py:400} INFO - Identified spark driver id: 
application_1560762064551_0032
[2019-06-18 15:58:45,061] {logging_mixin.py:95} INFO - [2019-06-18 
15:58:45,061] {spark_submit_hook.py:400} INFO - Identified spark driver id: 
application_1560762064551_0032
[2019-06-18 15:58:45,186] {logging_mixin.py:95} INFO - [2019-06-18 
15:58:45,186] {spark_submit_hook.py:400} INFO - Identified spark driver id: 
application_1560762064551_0032
[2019-06-18 15:58:45,320] {logging_mixin.py:95} INFO - [2019-06-18 
15:58:45,320] {spark_submit_hook.py:400} INFO - Identified spark driver id: 
application_1560762064551_0032
[2019-06-18 15:58:45,602] 

[jira] [Updated] (AIRFLOW-4806) Raise `Cannot execute error` even if that dag successed

2019-06-18 Thread kasim (JIRA)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-4806?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

kasim updated AIRFLOW-4806:
---
Description: 
 

airflow log : 

 
{code:java}
*** Log file does not exist: 
/opt/airflow/logs/test_wordcount/test_wordcount/2019-05-31T16:00:00+00:00/7.log
*** Fetching from: 
http://dc07:8793/log/test_wordcount/test_wordcount/2019-05-31T16:00:00+00:00/7.log

[2019-06-18 15:58:33,562] {__init__.py:1139} INFO - Dependencies all met for 

[2019-06-18 15:58:33,585] {__init__.py:1139} INFO - Dependencies all met for 

[2019-06-18 15:58:33,585] {__init__.py:1353} INFO - 

[2019-06-18 15:58:33,585] {__init__.py:1354} INFO - Starting attempt 7 of 7
[2019-06-18 15:58:33,585] {__init__.py:1355} INFO - 

[2019-06-18 15:58:33,594] {__init__.py:1374} INFO - Executing 
 on 2019-05-31T16:00:00+00:00
[2019-06-18 15:58:33,594] {base_task_runner.py:119} INFO - Running: ['airflow', 
'run', 'test_wordcount', 'test_wordcount', '2019-05-31T16:00:00+00:00', 
'--job_id', '413', '--raw', '-sd', 'DAGS_FOLDER/wordcount.py', '--cfg_path', 
'/tmp/tmpkqb2n943']
[2019-06-18 15:58:34,094] {base_task_runner.py:101} INFO - Job 413: Subtask 
test_wordcount 
/opt/anaconda3/envs/airflow/lib/python3.6/site-packages/airflow/configuration.py:590:
 DeprecationWarning: You have two airflow.cfg files: 
/opt/airflow/airflow/airflow.cfg and /opt/airflow/airflow.cfg. Airflow used to 
look at ~/airflow/airflow.cfg, even when AIRFLOW_HOME was set to a different 
value. Airflow will now only read /opt/airflow/airflow.cfg, and you should 
remove the other file
[2019-06-18 15:58:34,095] {base_task_runner.py:101} INFO - Job 413: Subtask 
test_wordcount   category=DeprecationWarning,
[2019-06-18 15:58:34,191] {base_task_runner.py:101} INFO - Job 413: Subtask 
test_wordcount [2019-06-18 15:58:34,191] {settings.py:182} INFO - 
settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, 
pid=30605
[2019-06-18 15:58:34,429] {base_task_runner.py:101} INFO - Job 413: Subtask 
test_wordcount [2019-06-18 15:58:34,429] {default_celery.py:90} WARNING - You 
have configured a result_backend of redis://192.168.20.17/1, it is highly 
recommended to use an alternative result_backend (i.e. a database).
[2019-06-18 15:58:34,430] {base_task_runner.py:101} INFO - Job 413: Subtask 
test_wordcount [2019-06-18 15:58:34,430] {__init__.py:51} INFO - Using executor 
CeleryExecutor
[2019-06-18 15:58:34,704] {base_task_runner.py:101} INFO - Job 413: Subtask 
test_wordcount [2019-06-18 15:58:34,704] {__init__.py:305} INFO - Filling up 
the DagBag from /opt/airflow/dags/wordcount.py
[2019-06-18 15:58:34,754] {base_task_runner.py:101} INFO - Job 413: Subtask 
test_wordcount [2019-06-18 15:58:34,754] {cli.py:517} INFO - Running 
 on host dc07
[2019-06-18 15:58:34,875] {logging_mixin.py:95} INFO - [2019-06-18 
15:58:34,875] {base_hook.py:83} INFO - Using connection to: id: spark_default. 
Host: yarn, Port: None, Schema: None, Login: None, Password: None, extra: 
{'master': 'yarn', 'deploy-mode': 'cluster', 'queue': 'data', 'env_vars': 
{'HADOOP_USER_NAME': 'hdfs'}, 'spark_home': 
'/opt/cloudera/parcels/CDH/lib/spark/'}
[2019-06-18 15:58:34,876] {logging_mixin.py:95} INFO - [2019-06-18 
15:58:34,876] {spark_submit_hook.py:295} INFO - Spark-Submit cmd: 
['spark-submit', '--master', 'yarn', '--py-files', 
'/opt/airflow/dags/jobs.zip,/opt/airflow/dags/libs.zip', '--num-executors', 
'1', '--executor-cores', '1', '--executor-memory', '1g', '--driver-memory', 
'1g', '--name', 'test_wordcount', '--queue', 'data', '--deploy-mode', 
'cluster', '/opt/airflow/dags/main.py', '--job', 'wordcount', '--job-args', 
'input_path=/test/words.txt', 'output_path=/test/wordcount.csv']
[2019-06-18 15:58:41,557] {logging_mixin.py:95} INFO - [2019-06-18 
15:58:41,557] {spark_submit_hook.py:400} INFO - Identified spark driver id: 
application_1560762064551_0032
[2019-06-18 15:58:44,791] {logging_mixin.py:95} INFO - [2019-06-18 
15:58:44,790] {spark_submit_hook.py:400} INFO - Identified spark driver id: 
application_1560762064551_0032
[2019-06-18 15:58:44,929] {logging_mixin.py:95} INFO - [2019-06-18 
15:58:44,929] {spark_submit_hook.py:400} INFO - Identified spark driver id: 
application_1560762064551_0032
[2019-06-18 15:58:45,061] {logging_mixin.py:95} INFO - [2019-06-18 
15:58:45,061] {spark_submit_hook.py:400} INFO - Identified spark driver id: 
application_1560762064551_0032
[2019-06-18 15:58:45,186] {logging_mixin.py:95} INFO - [2019-06-18 
15:58:45,186] {spark_submit_hook.py:400} INFO - Identified spark driver id: 
application_1560762064551_0032
[2019-06-18 15:58:45,320] {logging_mixin.py:95} INFO - [2019-06-18 
15:58:45,320] {spark_submit_hook.py:400} INFO - Identified spark driver id: 
application_1560762064551_0032
[2019-06-18 15:58:45,602] 

[jira] [Updated] (AIRFLOW-4806) Raise `Cannot execute error` even if that dag successed

2019-06-18 Thread kasim (JIRA)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-4806?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

kasim updated AIRFLOW-4806:
---
Description: 
 

airflow log : 

 
{code:java}
*** Log file does not exist: 
/opt/airflow/logs/test_wordcount/test_wordcount/2019-05-31T16:00:00+00:00/7.log
*** Fetching from: 
http://dc07:8793/log/test_wordcount/test_wordcount/2019-05-31T16:00:00+00:00/7.log

[2019-06-18 15:58:33,562] {__init__.py:1139} INFO - Dependencies all met for 

[2019-06-18 15:58:33,585] {__init__.py:1139} INFO - Dependencies all met for 

[2019-06-18 15:58:33,585] {__init__.py:1353} INFO - 

[2019-06-18 15:58:33,585] {__init__.py:1354} INFO - Starting attempt 7 of 7
[2019-06-18 15:58:33,585] {__init__.py:1355} INFO - 

[2019-06-18 15:58:33,594] {__init__.py:1374} INFO - Executing 
 on 2019-05-31T16:00:00+00:00
[2019-06-18 15:58:33,594] {base_task_runner.py:119} INFO - Running: ['airflow', 
'run', 'test_wordcount', 'test_wordcount', '2019-05-31T16:00:00+00:00', 
'--job_id', '413', '--raw', '-sd', 'DAGS_FOLDER/wordcount.py', '--cfg_path', 
'/tmp/tmpkqb2n943']
[2019-06-18 15:58:34,094] {base_task_runner.py:101} INFO - Job 413: Subtask 
test_wordcount 
/opt/anaconda3/envs/airflow/lib/python3.6/site-packages/airflow/configuration.py:590:
 DeprecationWarning: You have two airflow.cfg files: 
/opt/airflow/airflow/airflow.cfg and /opt/airflow/airflow.cfg. Airflow used to 
look at ~/airflow/airflow.cfg, even when AIRFLOW_HOME was set to a different 
value. Airflow will now only read /opt/airflow/airflow.cfg, and you should 
remove the other file
[2019-06-18 15:58:34,095] {base_task_runner.py:101} INFO - Job 413: Subtask 
test_wordcount   category=DeprecationWarning,
[2019-06-18 15:58:34,191] {base_task_runner.py:101} INFO - Job 413: Subtask 
test_wordcount [2019-06-18 15:58:34,191] {settings.py:182} INFO - 
settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, 
pid=30605
[2019-06-18 15:58:34,429] {base_task_runner.py:101} INFO - Job 413: Subtask 
test_wordcount [2019-06-18 15:58:34,429] {default_celery.py:90} WARNING - You 
have configured a result_backend of redis://192.168.20.17/1, it is highly 
recommended to use an alternative result_backend (i.e. a database).
[2019-06-18 15:58:34,430] {base_task_runner.py:101} INFO - Job 413: Subtask 
test_wordcount [2019-06-18 15:58:34,430] {__init__.py:51} INFO - Using executor 
CeleryExecutor
[2019-06-18 15:58:34,704] {base_task_runner.py:101} INFO - Job 413: Subtask 
test_wordcount [2019-06-18 15:58:34,704] {__init__.py:305} INFO - Filling up 
the DagBag from /opt/airflow/dags/wordcount.py
[2019-06-18 15:58:34,754] {base_task_runner.py:101} INFO - Job 413: Subtask 
test_wordcount [2019-06-18 15:58:34,754] {cli.py:517} INFO - Running 
 on host dc07
[2019-06-18 15:58:34,875] {logging_mixin.py:95} INFO - [2019-06-18 
15:58:34,875] {base_hook.py:83} INFO - Using connection to: id: spark_default. 
Host: yarn, Port: None, Schema: None, Login: None, Password: None, extra: 
{'master': 'yarn', 'deploy-mode': 'cluster', 'queue': 'data', 'env_vars': 
{'HADOOP_USER_NAME': 'hdfs'}, 'spark_home': 
'/opt/cloudera/parcels/CDH/lib/spark/'}
[2019-06-18 15:58:34,876] {logging_mixin.py:95} INFO - [2019-06-18 
15:58:34,876] {spark_submit_hook.py:295} INFO - Spark-Submit cmd: 
['spark-submit', '--master', 'yarn', '--py-files', 
'/opt/airflow/dags/jobs.zip,/opt/airflow/dags/libs.zip', '--num-executors', 
'1', '--executor-cores', '1', '--executor-memory', '1g', '--driver-memory', 
'1g', '--name', 'test_wordcount', '--queue', 'data', '--deploy-mode', 
'cluster', '/opt/airflow/dags/main.py', '--job', 'wordcount', '--job-args', 
'input_path=/test/words.txt', 'output_path=/test/wordcount.csv']
[2019-06-18 15:58:41,557] {logging_mixin.py:95} INFO - [2019-06-18 
15:58:41,557] {spark_submit_hook.py:400} INFO - Identified spark driver id: 
application_1560762064551_0032
[2019-06-18 15:58:44,791] {logging_mixin.py:95} INFO - [2019-06-18 
15:58:44,790] {spark_submit_hook.py:400} INFO - Identified spark driver id: 
application_1560762064551_0032
[2019-06-18 15:58:44,929] {logging_mixin.py:95} INFO - [2019-06-18 
15:58:44,929] {spark_submit_hook.py:400} INFO - Identified spark driver id: 
application_1560762064551_0032
[2019-06-18 15:58:45,061] {logging_mixin.py:95} INFO - [2019-06-18 
15:58:45,061] {spark_submit_hook.py:400} INFO - Identified spark driver id: 
application_1560762064551_0032
[2019-06-18 15:58:45,186] {logging_mixin.py:95} INFO - [2019-06-18 
15:58:45,186] {spark_submit_hook.py:400} INFO - Identified spark driver id: 
application_1560762064551_0032
[2019-06-18 15:58:45,320] {logging_mixin.py:95} INFO - [2019-06-18 
15:58:45,320] {spark_submit_hook.py:400} INFO - Identified spark driver id: 
application_1560762064551_0032
[2019-06-18 15:58:45,602] 

[jira] [Created] (AIRFLOW-4806) Raise `Cannot execute error` even if that dag successed

2019-06-18 Thread kasim (JIRA)
kasim created AIRFLOW-4806:
--

 Summary: Raise `Cannot execute error` even if that dag successed
 Key: AIRFLOW-4806
 URL: https://issues.apache.org/jira/browse/AIRFLOW-4806
 Project: Apache Airflow
  Issue Type: Bug
  Components: DagRun
Affects Versions: 1.10.3
Reporter: kasim


 

log : 

 

 
{code:java}
*** Log file does not exist: 
/opt/airflow/logs/test_wordcount/test_wordcount/2019-05-31T16:00:00+00:00/7.log
*** Fetching from: 
http://dc07:8793/log/test_wordcount/test_wordcount/2019-05-31T16:00:00+00:00/7.log

[2019-06-18 15:58:33,562] {__init__.py:1139} INFO - Dependencies all met for 

[2019-06-18 15:58:33,585] {__init__.py:1139} INFO - Dependencies all met for 

[2019-06-18 15:58:33,585] {__init__.py:1353} INFO - 

[2019-06-18 15:58:33,585] {__init__.py:1354} INFO - Starting attempt 7 of 7
[2019-06-18 15:58:33,585] {__init__.py:1355} INFO - 

[2019-06-18 15:58:33,594] {__init__.py:1374} INFO - Executing 
 on 2019-05-31T16:00:00+00:00
[2019-06-18 15:58:33,594] {base_task_runner.py:119} INFO - Running: ['airflow', 
'run', 'test_wordcount', 'test_wordcount', '2019-05-31T16:00:00+00:00', 
'--job_id', '413', '--raw', '-sd', 'DAGS_FOLDER/wordcount.py', '--cfg_path', 
'/tmp/tmpkqb2n943']
[2019-06-18 15:58:34,094] {base_task_runner.py:101} INFO - Job 413: Subtask 
test_wordcount 
/opt/anaconda3/envs/airflow/lib/python3.6/site-packages/airflow/configuration.py:590:
 DeprecationWarning: You have two airflow.cfg files: 
/opt/airflow/airflow/airflow.cfg and /opt/airflow/airflow.cfg. Airflow used to 
look at ~/airflow/airflow.cfg, even when AIRFLOW_HOME was set to a different 
value. Airflow will now only read /opt/airflow/airflow.cfg, and you should 
remove the other file
[2019-06-18 15:58:34,095] {base_task_runner.py:101} INFO - Job 413: Subtask 
test_wordcount   category=DeprecationWarning,
[2019-06-18 15:58:34,191] {base_task_runner.py:101} INFO - Job 413: Subtask 
test_wordcount [2019-06-18 15:58:34,191] {settings.py:182} INFO - 
settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, 
pid=30605
[2019-06-18 15:58:34,429] {base_task_runner.py:101} INFO - Job 413: Subtask 
test_wordcount [2019-06-18 15:58:34,429] {default_celery.py:90} WARNING - You 
have configured a result_backend of redis://192.168.20.17/1, it is highly 
recommended to use an alternative result_backend (i.e. a database).
[2019-06-18 15:58:34,430] {base_task_runner.py:101} INFO - Job 413: Subtask 
test_wordcount [2019-06-18 15:58:34,430] {__init__.py:51} INFO - Using executor 
CeleryExecutor
[2019-06-18 15:58:34,704] {base_task_runner.py:101} INFO - Job 413: Subtask 
test_wordcount [2019-06-18 15:58:34,704] {__init__.py:305} INFO - Filling up 
the DagBag from /opt/airflow/dags/wordcount.py
[2019-06-18 15:58:34,754] {base_task_runner.py:101} INFO - Job 413: Subtask 
test_wordcount [2019-06-18 15:58:34,754] {cli.py:517} INFO - Running 
 on host dc07
[2019-06-18 15:58:34,875] {logging_mixin.py:95} INFO - [2019-06-18 
15:58:34,875] {base_hook.py:83} INFO - Using connection to: id: spark_default. 
Host: yarn, Port: None, Schema: None, Login: None, Password: None, extra: 
{'master': 'yarn', 'deploy-mode': 'cluster', 'queue': 'data', 'env_vars': 
{'HADOOP_USER_NAME': 'hdfs'}, 'spark_home': 
'/opt/cloudera/parcels/CDH/lib/spark/'}
[2019-06-18 15:58:34,876] {logging_mixin.py:95} INFO - [2019-06-18 
15:58:34,876] {spark_submit_hook.py:295} INFO - Spark-Submit cmd: 
['spark-submit', '--master', 'yarn', '--py-files', 
'/opt/airflow/dags/jobs.zip,/opt/airflow/dags/libs.zip', '--num-executors', 
'1', '--executor-cores', '1', '--executor-memory', '1g', '--driver-memory', 
'1g', '--name', 'test_wordcount', '--queue', 'data', '--deploy-mode', 
'cluster', '/opt/airflow/dags/main.py', '--job', 'wordcount', '--job-args', 
'input_path=/test/words.txt', 'output_path=/test/wordcount.csv']
[2019-06-18 15:58:41,557] {logging_mixin.py:95} INFO - [2019-06-18 
15:58:41,557] {spark_submit_hook.py:400} INFO - Identified spark driver id: 
application_1560762064551_0032
[2019-06-18 15:58:44,791] {logging_mixin.py:95} INFO - [2019-06-18 
15:58:44,790] {spark_submit_hook.py:400} INFO - Identified spark driver id: 
application_1560762064551_0032
[2019-06-18 15:58:44,929] {logging_mixin.py:95} INFO - [2019-06-18 
15:58:44,929] {spark_submit_hook.py:400} INFO - Identified spark driver id: 
application_1560762064551_0032
[2019-06-18 15:58:45,061] {logging_mixin.py:95} INFO - [2019-06-18 
15:58:45,061] {spark_submit_hook.py:400} INFO - Identified spark driver id: 
application_1560762064551_0032
[2019-06-18 15:58:45,186] {logging_mixin.py:95} INFO - [2019-06-18 
15:58:45,186] {spark_submit_hook.py:400} INFO - Identified spark driver id: 
application_1560762064551_0032
[2019-06-18 

[jira] [Updated] (AIRFLOW-4801) trigger dag got when dag is zipped

2019-06-17 Thread kasim (JIRA)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-4801?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

kasim updated AIRFLOW-4801:
---
Description: 
I have dags.zip like below :  

 
{code:java}
Archive: dags.zip
extracting: jobs.zip
extracting: libs.zip
inflating: log4j.properties
inflating: main.py# spark-summit python file
creating: resources/
creating: resources/result/
creating: resources/train/
inflating: resources/train/sale_count.parquet
inflating: resources/words.txt
inflating: salecount.py   # a dag contain SparkSubmitOperator
inflating: test.py# very simple one as airflow example 
{code}
 

Always got error when click `trigger dag  test.py`

 
{code:java}
  / (  ()   )  \___
 /( (  (  )   _))  )   )\
   (( (   )()  )   (   )  )
 ((/  ( _(   )   (   _) ) (  () )  )
( (  ( (_)   (((   )  .((_ ) .  )_
   ( (  )(  (  ))   ) . ) (   )
  (  (   (  (   ) (  _  ( _) ).  ) . ) ) ( )
  ( (  (   ) (  )   (  )) ) _)(   )  )  )
 ( (  ( \ ) ((_  ( ) ( )  )   ) )  )) ( )
  (  (   (  (   (_ ( ) ( _)  ) (  )  )   )
 ( (  ( (  (  ) (_  )  ) )  _)   ) _( ( )
  ((  (   )(( _)   _) _(_ (  (_ )
   (_((__(_(__(( ( ( |  ) ) ) )_))__))_)___)
   ((__)\\||lll|l||///  \_))
(   /(/ (  )  ) )\   )
  (( ( ( | | ) ) )\   )
   (   /(| / ( )) ) ) )) )
 ( ( _(|)_) )
  (  ||\(|(|)|/|| )
(|(||(||))
  ( //|/l|||)|\\ \ )
(/ / //  /|//\\  \ \  \ _)
---
Node: dc09
---
Traceback (most recent call last):
  File "/opt/anaconda3/envs/airflow/lib/python3.6/site-packages/flask/app.py", 
line 2311, in wsgi_app
response = self.full_dispatch_request()
  File "/opt/anaconda3/envs/airflow/lib/python3.6/site-packages/flask/app.py", 
line 1834, in full_dispatch_request
rv = self.handle_user_exception(e)
  File "/opt/anaconda3/envs/airflow/lib/python3.6/site-packages/flask/app.py", 
line 1737, in handle_user_exception
reraise(exc_type, exc_value, tb)
  File 
"/opt/anaconda3/envs/airflow/lib/python3.6/site-packages/flask/_compat.py", 
line 36, in reraise
raise value
  File "/opt/anaconda3/envs/airflow/lib/python3.6/site-packages/flask/app.py", 
line 1832, in full_dispatch_request
rv = self.dispatch_request()
  File "/opt/anaconda3/envs/airflow/lib/python3.6/site-packages/flask/app.py", 
line 1818, in dispatch_request
return self.view_functions[rule.endpoint](**req.view_args)
  File 
"/opt/anaconda3/envs/airflow/lib/python3.6/site-packages/flask_admin/base.py", 
line 69, in inner
return self._run_view(f, *args, **kwargs)
  File 
"/opt/anaconda3/envs/airflow/lib/python3.6/site-packages/flask_admin/base.py", 
line 368, in _run_view
return fn(self, *args, **kwargs)
  File 
"/opt/anaconda3/envs/airflow/lib/python3.6/site-packages/flask_login/utils.py", 
line 258, in decorated_view
return func(*args, **kwargs)
  File 
"/opt/anaconda3/envs/airflow/lib/python3.6/site-packages/airflow/www/utils.py", 
line 275, in wrapper
return f(*args, **kwargs)
  File 
"/opt/anaconda3/envs/airflow/lib/python3.6/site-packages/airflow/www/utils.py", 
line 322, in wrapper
return f(*args, **kwargs)
  File 
"/opt/anaconda3/envs/airflow/lib/python3.6/site-packages/airflow/utils/db.py", 
line 73, in wrapper
return func(*args, **kwargs)
  File 
"/opt/anaconda3/envs/airflow/lib/python3.6/site-packages/airflow/www/views.py", 
line 1138, in trigger
external_trigger=True
  File 
"/opt/anaconda3/envs/airflow/lib/python3.6/site-packages/airflow/utils/db.py", 
line 73, in wrapper
return func(*args, **kwargs)
  File 
"/opt/anaconda3/envs/airflow/lib/python3.6/site-packages/airflow/models/__init__.py",
 line 2925, in create_dagrun
return self.get_dag().create_dagrun(run_id=run_id,
AttributeError: 'NoneType' object has no attribute 'create_dagrun'
{code}
If I extracted test.py and delete dags.zip ,  `trigger dag  test.py` would be 
fine .

If I extract all file in dags.zip and delete dags.zip ,  `trigger dag  test.py` 
got wrong too .

 

 

 

 

 

  was:
I have dags.zip like below :  

 
{code:java}
Archive: dags.zip
extracting: jobs.zip
extracting: libs.zip
inflating: log4j.properties
inflating: main.py
creating: resources/
creating: resources/result/
creating: resources/train/
inflating: resources/train/sale_count.parquet
inflating: 

[jira] [Updated] (AIRFLOW-4801) trigger dag got when dag is zipped

2019-06-17 Thread kasim (JIRA)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-4801?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

kasim updated AIRFLOW-4801:
---
Description: 
I have dags.zip like below :  

 
{code:java}
Archive: dags.zip
extracting: jobs.zip
extracting: libs.zip
inflating: log4j.properties
inflating: main.py
creating: resources/
creating: resources/result/
creating: resources/train/
inflating: resources/train/sale_count.parquet
inflating: resources/words.txt
inflating: salecount.py
inflating: test.py
{code}
 

Always got error when click `trigger dag`

 
{code:java}
  / (  ()   )  \___
 /( (  (  )   _))  )   )\
   (( (   )()  )   (   )  )
 ((/  ( _(   )   (   _) ) (  () )  )
( (  ( (_)   (((   )  .((_ ) .  )_
   ( (  )(  (  ))   ) . ) (   )
  (  (   (  (   ) (  _  ( _) ).  ) . ) ) ( )
  ( (  (   ) (  )   (  )) ) _)(   )  )  )
 ( (  ( \ ) ((_  ( ) ( )  )   ) )  )) ( )
  (  (   (  (   (_ ( ) ( _)  ) (  )  )   )
 ( (  ( (  (  ) (_  )  ) )  _)   ) _( ( )
  ((  (   )(( _)   _) _(_ (  (_ )
   (_((__(_(__(( ( ( |  ) ) ) )_))__))_)___)
   ((__)\\||lll|l||///  \_))
(   /(/ (  )  ) )\   )
  (( ( ( | | ) ) )\   )
   (   /(| / ( )) ) ) )) )
 ( ( _(|)_) )
  (  ||\(|(|)|/|| )
(|(||(||))
  ( //|/l|||)|\\ \ )
(/ / //  /|//\\  \ \  \ _)
---
Node: dc09
---
Traceback (most recent call last):
  File "/opt/anaconda3/envs/airflow/lib/python3.6/site-packages/flask/app.py", 
line 2311, in wsgi_app
response = self.full_dispatch_request()
  File "/opt/anaconda3/envs/airflow/lib/python3.6/site-packages/flask/app.py", 
line 1834, in full_dispatch_request
rv = self.handle_user_exception(e)
  File "/opt/anaconda3/envs/airflow/lib/python3.6/site-packages/flask/app.py", 
line 1737, in handle_user_exception
reraise(exc_type, exc_value, tb)
  File 
"/opt/anaconda3/envs/airflow/lib/python3.6/site-packages/flask/_compat.py", 
line 36, in reraise
raise value
  File "/opt/anaconda3/envs/airflow/lib/python3.6/site-packages/flask/app.py", 
line 1832, in full_dispatch_request
rv = self.dispatch_request()
  File "/opt/anaconda3/envs/airflow/lib/python3.6/site-packages/flask/app.py", 
line 1818, in dispatch_request
return self.view_functions[rule.endpoint](**req.view_args)
  File 
"/opt/anaconda3/envs/airflow/lib/python3.6/site-packages/flask_admin/base.py", 
line 69, in inner
return self._run_view(f, *args, **kwargs)
  File 
"/opt/anaconda3/envs/airflow/lib/python3.6/site-packages/flask_admin/base.py", 
line 368, in _run_view
return fn(self, *args, **kwargs)
  File 
"/opt/anaconda3/envs/airflow/lib/python3.6/site-packages/flask_login/utils.py", 
line 258, in decorated_view
return func(*args, **kwargs)
  File 
"/opt/anaconda3/envs/airflow/lib/python3.6/site-packages/airflow/www/utils.py", 
line 275, in wrapper
return f(*args, **kwargs)
  File 
"/opt/anaconda3/envs/airflow/lib/python3.6/site-packages/airflow/www/utils.py", 
line 322, in wrapper
return f(*args, **kwargs)
  File 
"/opt/anaconda3/envs/airflow/lib/python3.6/site-packages/airflow/utils/db.py", 
line 73, in wrapper
return func(*args, **kwargs)
  File 
"/opt/anaconda3/envs/airflow/lib/python3.6/site-packages/airflow/www/views.py", 
line 1138, in trigger
external_trigger=True
  File 
"/opt/anaconda3/envs/airflow/lib/python3.6/site-packages/airflow/utils/db.py", 
line 73, in wrapper
return func(*args, **kwargs)
  File 
"/opt/anaconda3/envs/airflow/lib/python3.6/site-packages/airflow/models/__init__.py",
 line 2925, in create_dagrun
return self.get_dag().create_dagrun(run_id=run_id,
AttributeError: 'NoneType' object has no attribute 'create_dagrun'
{code}
If I extracted test.py and delete dags.zip ,  `trigger dag ` would be fine .

If I extract all file in dags.zip and delete dags.zip ,  got wrong too .

 

 

 

 

 

  was:
I found this only happened with dags inside zip file , after I extract them, 
this problem gone .

 
{code:java}
  / (  ()   )  \___
 /( (  (  )   _))  )   )\
   (( (   )()  )   (   )  )
 ((/  ( _(   )   (   _) ) (  () )  )
( (  ( (_)   (((   )  .((_ ) .  )_
   ( (  )(  (  ))   ) . 

[jira] [Created] (AIRFLOW-4801) trigger dag got when dag is zipped

2019-06-17 Thread kasim (JIRA)
kasim created AIRFLOW-4801:
--

 Summary: trigger dag got when dag is zipped 
 Key: AIRFLOW-4801
 URL: https://issues.apache.org/jira/browse/AIRFLOW-4801
 Project: Apache Airflow
  Issue Type: Bug
  Components: DagRun
Affects Versions: 1.10.3
Reporter: kasim


I found this only happened with dags inside zip file , after I extract them, 
this problem gone .

 
{code:java}
  / (  ()   )  \___
 /( (  (  )   _))  )   )\
   (( (   )()  )   (   )  )
 ((/  ( _(   )   (   _) ) (  () )  )
( (  ( (_)   (((   )  .((_ ) .  )_
   ( (  )(  (  ))   ) . ) (   )
  (  (   (  (   ) (  _  ( _) ).  ) . ) ) ( )
  ( (  (   ) (  )   (  )) ) _)(   )  )  )
 ( (  ( \ ) ((_  ( ) ( )  )   ) )  )) ( )
  (  (   (  (   (_ ( ) ( _)  ) (  )  )   )
 ( (  ( (  (  ) (_  )  ) )  _)   ) _( ( )
  ((  (   )(( _)   _) _(_ (  (_ )
   (_((__(_(__(( ( ( |  ) ) ) )_))__))_)___)
   ((__)\\||lll|l||///  \_))
(   /(/ (  )  ) )\   )
  (( ( ( | | ) ) )\   )
   (   /(| / ( )) ) ) )) )
 ( ( _(|)_) )
  (  ||\(|(|)|/|| )
(|(||(||))
  ( //|/l|||)|\\ \ )
(/ / //  /|//\\  \ \  \ _)
---
Node: dc09
---
Traceback (most recent call last):
  File "/opt/anaconda3/envs/airflow/lib/python3.6/site-packages/flask/app.py", 
line 2311, in wsgi_app
response = self.full_dispatch_request()
  File "/opt/anaconda3/envs/airflow/lib/python3.6/site-packages/flask/app.py", 
line 1834, in full_dispatch_request
rv = self.handle_user_exception(e)
  File "/opt/anaconda3/envs/airflow/lib/python3.6/site-packages/flask/app.py", 
line 1737, in handle_user_exception
reraise(exc_type, exc_value, tb)
  File 
"/opt/anaconda3/envs/airflow/lib/python3.6/site-packages/flask/_compat.py", 
line 36, in reraise
raise value
  File "/opt/anaconda3/envs/airflow/lib/python3.6/site-packages/flask/app.py", 
line 1832, in full_dispatch_request
rv = self.dispatch_request()
  File "/opt/anaconda3/envs/airflow/lib/python3.6/site-packages/flask/app.py", 
line 1818, in dispatch_request
return self.view_functions[rule.endpoint](**req.view_args)
  File 
"/opt/anaconda3/envs/airflow/lib/python3.6/site-packages/flask_admin/base.py", 
line 69, in inner
return self._run_view(f, *args, **kwargs)
  File 
"/opt/anaconda3/envs/airflow/lib/python3.6/site-packages/flask_admin/base.py", 
line 368, in _run_view
return fn(self, *args, **kwargs)
  File 
"/opt/anaconda3/envs/airflow/lib/python3.6/site-packages/flask_login/utils.py", 
line 258, in decorated_view
return func(*args, **kwargs)
  File 
"/opt/anaconda3/envs/airflow/lib/python3.6/site-packages/airflow/www/utils.py", 
line 275, in wrapper
return f(*args, **kwargs)
  File 
"/opt/anaconda3/envs/airflow/lib/python3.6/site-packages/airflow/www/utils.py", 
line 322, in wrapper
return f(*args, **kwargs)
  File 
"/opt/anaconda3/envs/airflow/lib/python3.6/site-packages/airflow/utils/db.py", 
line 73, in wrapper
return func(*args, **kwargs)
  File 
"/opt/anaconda3/envs/airflow/lib/python3.6/site-packages/airflow/www/views.py", 
line 1138, in trigger
external_trigger=True
  File 
"/opt/anaconda3/envs/airflow/lib/python3.6/site-packages/airflow/utils/db.py", 
line 73, in wrapper
return func(*args, **kwargs)
  File 
"/opt/anaconda3/envs/airflow/lib/python3.6/site-packages/airflow/models/__init__.py",
 line 2925, in create_dagrun
return self.get_dag().create_dagrun(run_id=run_id,
AttributeError: 'NoneType' object has no attribute 'create_dagrun'
{code}
 

 

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)