[jira] [Updated] (AIRFLOW-5927) Airflow cache import file or variable
[ https://issues.apache.org/jira/browse/AIRFLOW-5927?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] kasim updated AIRFLOW-5927: --- Description: I have a `config.py` pull configure from `Variable` and merge into default config : {code:java} from datetime import datetime from airflow.models import Variable class Config: version = "V21" etl_feature_dir = f'/data/dm/sales_forecast/features/version={version}' forecast_result_dir = f'/data/dm/sales_forecast/results/fbprophet/version={version}' forecast_model_dir = f'/data/dm/sales_forecast/models/fbprophet/version={version}' forecast_result_s3_dir = f's3a://x/data/dm/sales_forecast/fbprophet/version={version}' etl_dir = '/data/dm/sales_forecast/etl' feature_current_path = 'hdfs://pupuxdc/test/MERGE_OUT' dag_start_date = datetime(2019, 10, 25) etl_start_time = "2019-06-01 00:00:00" etl_end_time = " (execution_date + macros.timedelta(days=8)).strftime('%Y-%m-%d 00:00:00') " train_start_time = " (execution_date macros.timedelta(days=90)).strftime('%Y%m-%d 00:00:00') " train_end_time = " execution_date.strftime('%Y-%m-%d 00:00:00') " predict_start_time = " execution_date.strftime('%Y-%m-%d 00:00:00') " predict_end_time = " (execution_date + macros.timedelta(days=7)).strftime('%Y-%m-%d 00:00:00') " report_start_date = " (execution_date macros.timedelta(days=6)).strftime('%Y%m-%d 00:00:00') " report_end_date = " execution_date.strftime('%Y-%m-%d 00:00:00') " sf_schedule_report = "30 8 * " sf_schedule_etl = '30 1 * ' sf_schedule_main_flow = "45 2 * " CONFIG_KEY = 'sf_config_%s' % Config.version sf_config = Variable.get(CONFIG_KEY, deserialize_json=True, default_var={}) if sf_config: for k, v in sf_config.items(): print(f'Overwrite {k} by {v}') if hasattr(Config, k): if k == 'dag_start_date': setattr(Config, k, datetime.strptime(v, '%Y-%m-%d') ) elif v == 'None': setattr(Config, k, None) else: setattr(Config, k, v) {code} And I have 5 dag file import this Config . they have some similar code like {code:java} from datetime import datetime, timedelta from airflow import DAG from airflow.operators.dummy_operator import DummyOperator from airflow.operators.bash_operator import BashOperator from airflow.operators.dagrun_operator import TriggerDagRunOperator from airflow.models import Variable from sf_dags_n.config import Config default_args = {'owner': 'mithril','depends_on_past': False, 'email': ['mithril'],'email_on_failure': False,'email_on_retry': False, 'retries': 2,'schedule_interval': schedule_interval,} {code} {code:java} dag = DAG('dm_sfx_etl_%s' % Config.version, start_date=Config.dag_start_date, default_args=default_args, schedule_interval='20 1 * ', user_defined_filters= { 'mod' : lambda s, d:s%d } , ) {code} The stange thing is : Change `sf_schedule_etl` in Variable took effect several times, but at some point , I couldn't change it from variable any more, even I directly hard code it : ``` dag = DAG('dm_sfx_etl_%s' % Config.version, start_date=Config.dag_start_date , default_args=default_args, schedule_interval='20 1* **', user_defined_filters= \{ 'mod' : lambda s, d:s%d } , ) ``` If such situation came, even delete dag file and delete from airflow webui ,didn't change `schedule_interval` . PS: my dag file have running for some days, in these days ,I may add some operator to it , or change some operator type, but it still fine . I think there must be some cache in airflow lead to this problem. was: I have a `config.py` pull configure from `Variable` and merge into default config : {code:java} from datetime import datetime from airflow.models import Variable class Config: version = "V21" etl_feature_dir = f'/data/dm/sales_forecast/features/version={version}' forecast_result_dir = f'/data/dm/sales_forecast/results/fbprophet/version={version}' forecast_model_dir = f'/data/dm/sales_forecast/models/fbprophet/version={version}' forecast_result_s3_dir = f's3a://x/data/dm/sales_forecast/fbprophet/version={version}' etl_dir = '/data/dm/sales_forecast/etl' feature_current_path = 'hdfs://pupuxdc/test/MERGE_OUT' dag_start_date = datetime(2019, 10, 25) etl_start_time = "2019-06-01 00:00:00" etl_end_time = " (execution_date + macros.timedelta(days=8)).strftime('%Y-%m-%d 00:00:00') " train_start_time = " (execution_date macros.timedelta(days=90)).strftime('%Y%m-%d 00:00:00') " train_end_time = " execution_date.strftime('%Y-%m-%d 00:00:00') " predict_start_time = " execution_date.strftime('%Y-%m-%d 00:00:00') "
[jira] [Updated] (AIRFLOW-5927) Airflow cache import file or variable
[ https://issues.apache.org/jira/browse/AIRFLOW-5927?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] kasim updated AIRFLOW-5927: --- Description: I have a `config.py` pull configure from `Variable` and merge into default config : {code:java} from datetime import datetime from airflow.models import Variable class Config: version = "V21" etl_feature_dir = f'/data/dm/sales_forecast/features/version={version}' forecast_result_dir = f'/data/dm/sales_forecast/results/fbprophet/version={version}' forecast_model_dir = f'/data/dm/sales_forecast/models/fbprophet/version={version}' forecast_result_s3_dir = f's3a://x/data/dm/sales_forecast/fbprophet/version={version}' etl_dir = '/data/dm/sales_forecast/etl' feature_current_path = 'hdfs://pupuxdc/test/MERGE_OUT' dag_start_date = datetime(2019, 10, 25) etl_start_time = "2019-06-01 00:00:00" etl_end_time = " (execution_date + macros.timedelta(days=8)).strftime('%Y-%m-%d 00:00:00') " train_start_time = " (execution_date macros.timedelta(days=90)).strftime('%Y%m-%d 00:00:00') " train_end_time = " execution_date.strftime('%Y-%m-%d 00:00:00') " predict_start_time = " execution_date.strftime('%Y-%m-%d 00:00:00') " predict_end_time = " (execution_date + macros.timedelta(days=7)).strftime('%Y-%m-%d 00:00:00') " report_start_date = " (execution_date macros.timedelta(days=6)).strftime('%Y%m-%d 00:00:00') " report_end_date = " execution_date.strftime('%Y-%m-%d 00:00:00') " sf_schedule_report = "30 8 * " sf_schedule_etl = '30 1 * ' sf_schedule_main_flow = "45 2 * " CONFIG_KEY = 'sf_config_%s' % Config.version sf_config = Variable.get(CONFIG_KEY, deserialize_json=True, default_var={}) if sf_config: for k, v in sf_config.items(): print(f'Overwrite {k} by {v}') if hasattr(Config, k): if k == 'dag_start_date': setattr(Config, k, datetime.strptime(v, '%Y-%m-%d') ) elif v == 'None': setattr(Config, k, None) else: setattr(Config, k, v) {code} And I have 5 dag file import this Config . they have some similar code like {code:java} from datetime import datetime, timedelta from airflow import DAG from airflow.operators.dummy_operator import DummyOperator from airflow.operators.bash_operator import BashOperator from airflow.operators.dagrun_operator import TriggerDagRunOperator from airflow.models import Variable from sf_dags_n.config import Config default_args = {'owner': 'mithril','depends_on_past': False, 'email': ['mithril'],'email_on_failure': False,'email_on_retry': False, 'retries': 2,'schedule_interval': schedule_interval,} {code} {code:java} dag = DAG('dm_sfx_etl_%s' % Config.version, start_date=datetime.strptime(Config.dag_start_date, '%Y-%m-%d') , default_args=default_args, schedule_interval='20 1 * ', user_defined_filters= { 'mod' : lambda s, d:s%d } , ) {code} The stange thing is : Change `sf_schedule_etl` in Variable took effect several times, but at some point , I couldn't change it from variable any more, even I directly hard code it : ``` dag = DAG('dm_sfx_etl_%s' % Config.version, start_date=Config.dag_start_date , default_args=default_args, schedule_interval='20 1** * ', user_defined_filters= \{ 'mod' : lambda s, d:s%d } , ) ``` If such situation came, even delete dag file and delete from airflow webui ,didn't change `schedule_interval` . PS: my dag file have running for some days, in these days ,I may add some operator to it , or change some operator type, but it still fine . I think there must be some cache in airflow lead to this problem. was: I have a `config.py` pull configure from `Variable` and merge into default config : {code:java} from datetime import datetime from airflow.models import Variable class Config: version = "V21" etl_feature_dir = f'/data/dm/sales_forecast/features/version={version}' forecast_result_dir = f'/data/dm/sales_forecast/results/fbprophet/version={version}' forecast_model_dir = f'/data/dm/sales_forecast/models/fbprophet/version={version}' forecast_result_s3_dir = f's3a://pupumall-dc-tmp/data/dm/sales_forecast/fbprophet/version={version}' etl_dir = '/data/dm/sales_forecast/etl' feature_current_path = 'hdfs://pupuxdc/test/MERGE_OUT' dag_start_date = datetime(2019, 10, 25) etl_start_time = "2019-06-01 00:00:00" etl_end_time = " (execution_date + macros.timedelta(days=8)).strftime('%Y-%m-%d 00:00:00') " train_start_time = " (execution_date macros.timedelta(days=90)).strftime('%Y%m-%d 00:00:00') " train_end_time = " execution_date.strftime('%Y-%m-%d 00:00:00') " predict_start_time = "
[jira] [Updated] (AIRFLOW-5927) Airflow cache import file or variable
[ https://issues.apache.org/jira/browse/AIRFLOW-5927?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] kasim updated AIRFLOW-5927: --- Description: I have a `config.py` pull configure from `Variable` and merge into default config : {code:java} from datetime import datetime from airflow.models import Variable class Config: version = "V21" etl_feature_dir = f'/data/dm/sales_forecast/features/version={version}' forecast_result_dir = f'/data/dm/sales_forecast/results/fbprophet/version={version}' forecast_model_dir = f'/data/dm/sales_forecast/models/fbprophet/version={version}' forecast_result_s3_dir = f's3a://pupumall-dc-tmp/data/dm/sales_forecast/fbprophet/version={version}' etl_dir = '/data/dm/sales_forecast/etl' feature_current_path = 'hdfs://pupuxdc/test/MERGE_OUT' dag_start_date = datetime(2019, 10, 25) etl_start_time = "2019-06-01 00:00:00" etl_end_time = " (execution_date + macros.timedelta(days=8)).strftime('%Y-%m-%d 00:00:00') " train_start_time = " (execution_date macros.timedelta(days=90)).strftime('%Y%m-%d 00:00:00') " train_end_time = " execution_date.strftime('%Y-%m-%d 00:00:00') " predict_start_time = " execution_date.strftime('%Y-%m-%d 00:00:00') " predict_end_time = " (execution_date + macros.timedelta(days=7)).strftime('%Y-%m-%d 00:00:00') " report_start_date = " (execution_date macros.timedelta(days=6)).strftime('%Y%m-%d 00:00:00') " report_end_date = " execution_date.strftime('%Y-%m-%d 00:00:00') " sf_schedule_report = "30 8 * " sf_schedule_etl = '30 1 * ' sf_schedule_main_flow = "45 2 * " CONFIG_KEY = 'sf_config_%s' % Config.version sf_config = Variable.get(CONFIG_KEY, deserialize_json=True, default_var={}) if sf_config: for k, v in sf_config.items(): print(f'Overwrite {k} by {v}') if hasattr(Config, k): if k == 'dag_start_date': setattr(Config, k, datetime.strptime(v, '%Y-%m-%d') ) elif v == 'None': setattr(Config, k, None) else: setattr(Config, k, v) {code} And I have 5 dag file import this Config . they have some similar code like {code:java} from datetime import datetime, timedelta from airflow import DAG from airflow.operators.dummy_operator import DummyOperator from airflow.operators.bash_operator import BashOperator from airflow.operators.dagrun_operator import TriggerDagRunOperator from airflow.models import Variable from sf_dags_n.config import Config default_args = {'owner': 'mithril','depends_on_past': False, 'email': ['mithril'],'email_on_failure': False,'email_on_retry': False, 'retries': 2,'schedule_interval': schedule_interval,} {code} {code:java} dag = DAG('dm_sfx_etl_%s' % Config.version, start_date=datetime.strptime(Config.dag_start_date, '%Y-%m-%d') , default_args=default_args, schedule_interval='20 1 * ', user_defined_filters= { 'mod' : lambda s, d:s%d } , ) {code} The stange thing is : Change `sf_schedule_etl` in Variable took effect several times, but at some point , I couldn't change it from variable any more, even I directly hard code it : ``` dag = DAG('dm_sfx_etl_%s' % Config.version, start_date=Config.dag_start_date , default_args=default_args, schedule_interval='20 1 * ** ', user_defined_filters= \{ 'mod' : lambda s, d:s%d } , ) ``` If such situation came, even delete dag file and delete from airflow webui ,didn't change `schedule_interval` . PS: my dag file have running for some days, in these days ,I may add some operator to it , or change some operator type, but it still fine . I think there must be some cache in airflow lead to this problem. was: I have a `config.py` pull configure from `Variable` and merge into default config : ```python from datetime import datetime from airflow.models import Variable class Config: version = "V21" etl_feature_dir = f'/data/dm/sales_forecast/features/version=\{version}' forecast_result_dir = f'/data/dm/sales_forecast/results/fbprophet/version=\{version}' forecast_model_dir = f'/data/dm/sales_forecast/models/fbprophet/version=\{version}' forecast_result_s3_dir = f's3a://pupumall-dc-tmp/data/dm/sales_forecast/fbprophet/version=\{version}' etl_dir = '/data/dm/sales_forecast/etl' feature_current_path = 'hdfs://pupuxdc/test/MERGE_OUT' dag_start_date = datetime(2019, 10, 25) etl_start_time = "2019-06-01 00:00:00" etl_end_time = "{{ (execution_date + macros.timedelta(days=8)).strftime('%Y-%m-%d 00:00:00') }}" train_start_time = "{{ (execution_date -macros.timedelta(days=90)).strftime('%Y-%m-%d 00:00:00') }}" train_end_time = "{{ execution_date.strftime('%Y-%m-%d 00:00:00') }}"
[jira] [Updated] (AIRFLOW-5927) Airflow cache import file or variable
[ https://issues.apache.org/jira/browse/AIRFLOW-5927?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] kasim updated AIRFLOW-5927: --- Description: I have a `config.py` pull configure from `Variable` and merge into default config : ```python from datetime import datetime from airflow.models import Variable class Config: version = "V21" etl_feature_dir = f'/data/dm/sales_forecast/features/version=\{version}' forecast_result_dir = f'/data/dm/sales_forecast/results/fbprophet/version=\{version}' forecast_model_dir = f'/data/dm/sales_forecast/models/fbprophet/version=\{version}' forecast_result_s3_dir = f's3a://pupumall-dc-tmp/data/dm/sales_forecast/fbprophet/version=\{version}' etl_dir = '/data/dm/sales_forecast/etl' feature_current_path = 'hdfs://pupuxdc/test/MERGE_OUT' dag_start_date = datetime(2019, 10, 25) etl_start_time = "2019-06-01 00:00:00" etl_end_time = "{{ (execution_date + macros.timedelta(days=8)).strftime('%Y-%m-%d 00:00:00') }}" train_start_time = "{{ (execution_date -macros.timedelta(days=90)).strftime('%Y-%m-%d 00:00:00') }}" train_end_time = "{{ execution_date.strftime('%Y-%m-%d 00:00:00') }}" predict_start_time = "{{ execution_date.strftime('%Y-%m-%d 00:00:00') }}" predict_end_time = "{{ (execution_date + macros.timedelta(days=7)).strftime('%Y-%m-%d 00:00:00') }}" report_start_date = "{{ (execution_date -macros.timedelta(days=6)).strftime('%Y-%m-%d 00:00:00') }}" report_end_date = "{{ execution_date.strftime('%Y-%m-%d 00:00:00') }}" sf_schedule_report = "30 8 ** *" sf_schedule_etl = '30 1 ** *' sf_schedule_main_flow = "45 2 ** *" CONFIG_KEY = 'sf_config_%s' % Config.version sf_config = Variable.get(CONFIG_KEY, deserialize_json=True, default_var={}) if sf_config: for k, v in sf_config.items(): print(f'Overwrite \{k} by \{v}') if hasattr(Config, k): if k == 'dag_start_date': setattr(Config, k, datetime.strptime(v, '%Y-%m-%d') ) elif v == 'None': setattr(Config, k, None) else: setattr(Config, k, v)``` And I have 5 dag file import this Config . they have some similar code like ```python from datetime import datetime, timedelta from airflow import DAG from airflow.operators.dummy_operator import DummyOperator from airflow.operators.bash_operator import BashOperator from airflow.operators.dagrun_operator import TriggerDagRunOperator from airflow.models import Variable from sf_dags_n.config import Config default_args = { 'owner': 'mithril', 'depends_on_past': False, 'email': ['mithril'], 'email_on_failure': False, 'email_on_retry': False, 'retries': 2, } dag = DAG('dm_sfx_etl_%s' % Config.version, start_date=datetime.strptime(Config.dag_start_date, '%Y-%m-%d') , default_args=default_args, schedule_interval=Config.sf_schedule_etl, user_defined_filters= { 'mod' : lambda s, d:s%d } , ) # other codes ``` The stange thing is : Change `sf_schedule_etl` in Variable took effect several times, but at some point , I couldn't change it from variable any more, even I directly hard code it : ``` dag = DAG('dm_sfx_etl_%s' % Config.version, start_date=datetime.strptime(Config.dag_start_date, '%Y-%m-%d') , default_args=default_args, schedule_interval='20 1 ** *', user_defined_filters= \{ 'mod' : lambda s, d:s%d } , ) ``` If such situation came, even delete dag file and delete from airflow webui ,didn't change `schedule_interval` . PS: my dag file have running for some days, in these days ,I may add some operator to it , or change some operator type, but it still fine . I think there must be some cache in airflow lead to this problem. was: I have a `config.py` pull configure from `Variable` and merge into default config : ```python from datetime import datetime from airflow.models import Variable class Config: version = "V21" etl_feature_dir = f'/data/dm/sales_forecast/features/version=\{version}' forecast_result_dir = f'/data/dm/sales_forecast/results/fbprophet/version=\{version}' forecast_model_dir = f'/data/dm/sales_forecast/models/fbprophet/version=\{version}' forecast_result_s3_dir = f's3a://pupumall-dc-tmp/data/dm/sales_forecast/fbprophet/version=\{version}' etl_dir = '/data/dm/sales_forecast/etl' feature_current_path = 'hdfs://pupuxdc/test/MERGE_OUT' dag_start_date = datetime(2019, 10, 25) etl_start_time = "2019-06-01 00:00:00" etl_end_time = "\{{ (execution_date + macros.timedelta(days=8)).strftime('%Y-%m-%d 00:00:00') }}" train_start_time = "\{{ (execution_date - macros.timedelta(days=90)).strftime('%Y-%m-%d 00:00:00') }}" train_end_time = "\{{
[jira] [Created] (AIRFLOW-5927) Airflow cache import file or variable
kasim created AIRFLOW-5927: -- Summary: Airflow cache import file or variable Key: AIRFLOW-5927 URL: https://issues.apache.org/jira/browse/AIRFLOW-5927 Project: Apache Airflow Issue Type: Bug Components: DAG, database Affects Versions: 1.10.3 Reporter: kasim I have a `config.py` pull configure from `Variable` and merge into default config : ```python from datetime import datetime from airflow.models import Variable class Config: version = "V21" etl_feature_dir = f'/data/dm/sales_forecast/features/version=\{version}' forecast_result_dir = f'/data/dm/sales_forecast/results/fbprophet/version=\{version}' forecast_model_dir = f'/data/dm/sales_forecast/models/fbprophet/version=\{version}' forecast_result_s3_dir = f's3a://pupumall-dc-tmp/data/dm/sales_forecast/fbprophet/version=\{version}' etl_dir = '/data/dm/sales_forecast/etl' feature_current_path = 'hdfs://pupuxdc/test/MERGE_OUT' dag_start_date = datetime(2019, 10, 25) etl_start_time = "2019-06-01 00:00:00" etl_end_time = "\{{ (execution_date + macros.timedelta(days=8)).strftime('%Y-%m-%d 00:00:00') }}" train_start_time = "\{{ (execution_date - macros.timedelta(days=90)).strftime('%Y-%m-%d 00:00:00') }}" train_end_time = "\{{ execution_date.strftime('%Y-%m-%d 00:00:00') }}" predict_start_time = "\{{ execution_date.strftime('%Y-%m-%d 00:00:00') }}" predict_end_time = "\{{ (execution_date + macros.timedelta(days=7)).strftime('%Y-%m-%d 00:00:00') }}" report_start_date = "\{{ (execution_date - macros.timedelta(days=6)).strftime('%Y-%m-%d 00:00:00') }}" report_end_date = "\{{ execution_date.strftime('%Y-%m-%d 00:00:00') }}" sf_schedule_report = "30 8 * * *" sf_schedule_etl = '30 1 * * *' sf_schedule_main_flow = "45 2 * * *" CONFIG_KEY = 'sf_config_%s' % Config.version sf_config = Variable.get(CONFIG_KEY, deserialize_json=True, default_var={}) if sf_config: for k, v in sf_config.items(): print(f'Overwrite \{k} by \{v}') if hasattr(Config, k): if k == 'dag_start_date': print(datetime.strptime(v, '%Y-%m-%d')) print(type(datetime.strptime(v, '%Y-%m-%d'))) setattr(Config, k, datetime.strptime(v, '%Y-%m-%d') ) print(Config.dag_start_date) print(type(Config.dag_start_date)) if v == 'None': setattr(Config, k, None) else: setattr(Config, k, v)``` And I have 5 dag file import this Config . they have some similar code like ```python from datetime import datetime, timedelta from airflow import DAG from airflow.operators.dummy_operator import DummyOperator from airflow.operators.bash_operator import BashOperator from airflow.operators.dagrun_operator import TriggerDagRunOperator from airflow.models import Variable from sf_dags_n.config import Config default_args = { 'owner': 'mithril', 'depends_on_past': False, 'email': ['mithril'], 'email_on_failure': False, 'email_on_retry': False, 'retries': 2, } print(Config.dag_start_date) print(type(Config.dag_start_date)) dag = DAG('dm_sfx_etl_%s' % Config.version, start_date=datetime.strptime(Config.dag_start_date, '%Y-%m-%d') , default_args=default_args, schedule_interval=Config.sf_schedule_etl, user_defined_filters={ 'mod' : lambda s, d:s%d }, ) # other codes ``` The stange thing is : 1. At first , in dag file which `from sf_dags_n.config import Config` , Config.dag_start_date was datetime type . But it became str several days ago, I check it type in config.py and the dag file , Config.dag_start_date is still datetime type , but in dag file is str . As I remenber, I set default Config.dag_start_date with type str, but changed it when occured problem . Then it was fine for some time before went wrong. 2. Change `sf_schedule_etl` in Variable took effect several times, but at some point , I couldn't change it from variable any more, even I directly hard code it : ``` dag = DAG('dm_sfx_etl_%s' % Config.version, start_date=datetime.strptime(Config.dag_start_date, '%Y-%m-%d') , default_args=default_args, schedule_interval='20 1 * * *', user_defined_filters={ 'mod' : lambda s, d:s%d }, ) ``` If such situation came, even delete dag file and delete from airflow webui ,didn't change `schedule_interval` . PS: my dag file have running for some days, in these days ,I may add some operator to it , or change some operator type, but it still fine . I think there must be some cache in airflow lead to this problem. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (AIRFLOW-5795) Airflow cached old Code and Veriables
kasim created AIRFLOW-5795: -- Summary: Airflow cached old Code and Veriables Key: AIRFLOW-5795 URL: https://issues.apache.org/jira/browse/AIRFLOW-5795 Project: Apache Airflow Issue Type: Bug Components: DAG, database Affects Versions: 1.10.3 Reporter: kasim My dag start_date is configed in Variables, {code:java} from datetime import datetime from airflow.models import Variable class Config(object): version = "V21" dag_start_date = datetime(2019, 10, 25) sf_schedule_report = "30 8 * * *" sf_schedule_etl = '30 1 * * *' sf_schedule_main = "45 3,4,5,6,7,8 * * *" CONFIG_KEY = 'sf_config_%s' % Config.version sf_config = Variable.get(CONFIG_KEY, deserialize_json=True, default_var={}) if sf_config: for k, v in sf_config.items(): print(f'Overwrite {k} by {v}') if hasattr(Config, k): if k == 'dag_start_date': setattr(Config, k, datetime.strptime(v, '%Y-%m-%d') ) else: setattr(Config, k, v) print(' config ### \n\n') print(f'CONFIG_KEY: {CONFIG_KEY}') print(f'CONFIG DICT: {sf_config}') print(' config end ### \n\n') {code} My dag init as : {code:java} dag = DAG('dm_sf_etl_%s' % Config.version, start_date=Config.dag_start_date, default_args=default_args, schedule_interval=schedule, user_defined_filters={ 'mod' : lambda s, d:s%d }, ) {code} Variables is : !image-2019-10-28-12-25-56-300.png! But the log shows: * airflow tried 4 times * it didn't read Variable at first time * it use somewhere cached start start_date `2019-10-17` , schedule: 45 1,2,3,4,5,6 * * * (This is old settings on Variable 10 days ago ) * it did read new Variable on later attampt I have tried delete files, delete dag on webui, delete related task_instace in database. But still same error . I think there are some cache in scheduler memory cache , I can't restart airflow , so can't confirm. But this shouldn't happen. {code:java} *** Reading local file: /data/opt/workflow/airflow/logs/dm_sf_main_V21/branch_task/2019-10-17T01:45:00+08:00/1.log [2019-10-17 02:45:15,628] {__init__.py:1139} INFO - Dependencies all met for [2019-10-17 02:45:15,644] {__init__.py:1139} INFO - Dependencies all met for [2019-10-17 02:45:15,644] {__init__.py:1353} INFO - [2019-10-17 02:45:15,645] {__init__.py:1354} INFO - Starting attempt 1 of 1 [2019-10-17 02:45:15,645] {__init__.py:1355} INFO - [2019-10-17 02:45:15,702] {__init__.py:1374} INFO - Executing on 2019-10-17T01:45:00+08:00 [2019-10-17 02:45:15,702] {base_task_runner.py:119} INFO - Running: ['airflow', 'run', 'dm_sf_main_V21', 'branch_task', '2019-10-17T01:45:00+08:00', '--job_id', '154142', '--raw', '-sd', 'DAGS_FOLDER/sf_dags_n/dm_sf_main.py', '--cfg_path', '/tmp/tmp4p1ml6pq'] [2019-10-17 02:45:16,180] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task [2019-10-17 02:45:16,179] {settings.py:182} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, pid=17786 [2019-10-17 02:45:16,341] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task [2019-10-17 02:45:16,340] {__init__.py:51} INFO - Using executor CeleryExecutor [2019-10-17 02:45:16,643] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task [2019-10-17 02:45:16,642] {__init__.py:305} INFO - Filling up the DagBag from /opt/workflow/airflow/dags/sf_dags_n/dm_sf_main.py [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task schedule: 45 1,2,3,4,5,6 * * * [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task divisor: 5 [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task ### found sales_forecast_prophet in operator_pyspark_conf, overwrite default conf ! [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task ## Job[sales_forecast_prophet] Command-line : [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task --- [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task spark-submitx [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task [2019-10-17 02:45:16,704]
[jira] [Updated] (AIRFLOW-5795) Airflow cached old Code and Veriables
[ https://issues.apache.org/jira/browse/AIRFLOW-5795?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] kasim updated AIRFLOW-5795: --- Description: My dag start_date is configed in Variables, {code:java} from datetime import datetime from airflow.models import Variable class Config(object): version = "V21" dag_start_date = datetime(2019, 10, 25) sf_schedule_report = "30 8 * * *" sf_schedule_etl = '30 1 * * *' sf_schedule_main = "45 3,4,5,6,7,8 * * *" CONFIG_KEY = 'sf_config_%s' % Config.version sf_config = Variable.get(CONFIG_KEY, deserialize_json=True, default_var={}) if sf_config: for k, v in sf_config.items(): print(f'Overwrite {k} by {v}') if hasattr(Config, k): if k == 'dag_start_date': setattr(Config, k, datetime.strptime(v, '%Y-%m-%d') ) else: setattr(Config, k, v) print(' config ### \n\n') print(f'CONFIG_KEY: {CONFIG_KEY}') print(f'CONFIG DICT: {sf_config}') print(' config end ### \n\n') {code} My dag init as : {code:java} dag = DAG('dm_sf_main_%s' % Config.version, start_date=Config.dag_start_date, default_args=default_args, schedule_interval=schedule, user_defined_filters={ 'mod' : lambda s, d:s%d }, ) {code} Variables is : !image-2019-10-28-12-32-55-531.png! But the log shows: * airflow tried 4 times * it didn't read Variable at first time * it use somewhere cached start start_date `2019-10-17` , schedule: 45 1,2,3,4,5,6 * * * (This is old settings on Variable 10 days ago ) * it did read new Variable on later attampt I have tried delete files, delete dag on webui, delete related task_instace in database. But still same error . I think there are some cache in scheduler memory cache , I can't restart airflow , so can't confirm. But this shouldn't happen. {code:java} *** Reading local file: /data/opt/workflow/airflow/logs/dm_sf_main_V21/branch_task/2019-10-17T01:45:00+08:00/1.log [2019-10-17 02:45:15,628] {__init__.py:1139} INFO - Dependencies all met for [2019-10-17 02:45:15,644] {__init__.py:1139} INFO - Dependencies all met for [2019-10-17 02:45:15,644] {__init__.py:1353} INFO - [2019-10-17 02:45:15,645] {__init__.py:1354} INFO - Starting attempt 1 of 1 [2019-10-17 02:45:15,645] {__init__.py:1355} INFO - [2019-10-17 02:45:15,702] {__init__.py:1374} INFO - Executing on 2019-10-17T01:45:00+08:00 [2019-10-17 02:45:15,702] {base_task_runner.py:119} INFO - Running: ['airflow', 'run', 'dm_sf_main_V21', 'branch_task', '2019-10-17T01:45:00+08:00', '--job_id', '154142', '--raw', '-sd', 'DAGS_FOLDER/sf_dags_n/dm_sf_main.py', '--cfg_path', '/tmp/tmp4p1ml6pq'] [2019-10-17 02:45:16,180] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task [2019-10-17 02:45:16,179] {settings.py:182} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, pid=17786 [2019-10-17 02:45:16,341] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task [2019-10-17 02:45:16,340] {__init__.py:51} INFO - Using executor CeleryExecutor [2019-10-17 02:45:16,643] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task [2019-10-17 02:45:16,642] {__init__.py:305} INFO - Filling up the DagBag from /opt/workflow/airflow/dags/sf_dags_n/dm_sf_main.py [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task schedule: 45 1,2,3,4,5,6 * * * [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task divisor: 5 [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task ### found sales_forecast_prophet in operator_pyspark_conf, overwrite default conf ! [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task ## Job[sales_forecast_prophet] Command-line : [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task --- [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task spark-submitx [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task --- [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task [2019-10-17
[jira] [Updated] (AIRFLOW-5795) Airflow cached old Code and Veriables
[ https://issues.apache.org/jira/browse/AIRFLOW-5795?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] kasim updated AIRFLOW-5795: --- Description: My dag start_date is configed in Variables, {code:java} from datetime import datetime from airflow.models import Variable class Config(object): version = "V21" dag_start_date = datetime(2019, 10, 25) sf_schedule_report = "30 8 * * *" sf_schedule_etl = '30 1 * * *' sf_schedule_main = "45 3,4,5,6,7,8 * * *" CONFIG_KEY = 'sf_config_%s' % Config.version sf_config = Variable.get(CONFIG_KEY, deserialize_json=True, default_var={}) if sf_config: for k, v in sf_config.items(): print(f'Overwrite {k} by {v}') if hasattr(Config, k): if k == 'dag_start_date': setattr(Config, k, datetime.strptime(v, '%Y-%m-%d') ) else: setattr(Config, k, v) print(' config ### \n\n') print(f'CONFIG_KEY: {CONFIG_KEY}') print(f'CONFIG DICT: {sf_config}') print(' config end ### \n\n') {code} My dag init as : {code:java} dag = DAG('dm_sf_etl_%s' % Config.version, start_date=Config.dag_start_date, default_args=default_args, schedule_interval=schedule, user_defined_filters={ 'mod' : lambda s, d:s%d }, ) {code} Variables is : !image-2019-10-28-12-32-55-531.png! But the log shows: * airflow tried 4 times * it didn't read Variable at first time * it use somewhere cached start start_date `2019-10-17` , schedule: 45 1,2,3,4,5,6 * * * (This is old settings on Variable 10 days ago ) * it did read new Variable on later attampt I have tried delete files, delete dag on webui, delete related task_instace in database. But still same error . I think there are some cache in scheduler memory cache , I can't restart airflow , so can't confirm. But this shouldn't happen. {code:java} *** Reading local file: /data/opt/workflow/airflow/logs/dm_sf_main_V21/branch_task/2019-10-17T01:45:00+08:00/1.log [2019-10-17 02:45:15,628] {__init__.py:1139} INFO - Dependencies all met for [2019-10-17 02:45:15,644] {__init__.py:1139} INFO - Dependencies all met for [2019-10-17 02:45:15,644] {__init__.py:1353} INFO - [2019-10-17 02:45:15,645] {__init__.py:1354} INFO - Starting attempt 1 of 1 [2019-10-17 02:45:15,645] {__init__.py:1355} INFO - [2019-10-17 02:45:15,702] {__init__.py:1374} INFO - Executing on 2019-10-17T01:45:00+08:00 [2019-10-17 02:45:15,702] {base_task_runner.py:119} INFO - Running: ['airflow', 'run', 'dm_sf_main_V21', 'branch_task', '2019-10-17T01:45:00+08:00', '--job_id', '154142', '--raw', '-sd', 'DAGS_FOLDER/sf_dags_n/dm_sf_main.py', '--cfg_path', '/tmp/tmp4p1ml6pq'] [2019-10-17 02:45:16,180] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task [2019-10-17 02:45:16,179] {settings.py:182} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, pid=17786 [2019-10-17 02:45:16,341] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task [2019-10-17 02:45:16,340] {__init__.py:51} INFO - Using executor CeleryExecutor [2019-10-17 02:45:16,643] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task [2019-10-17 02:45:16,642] {__init__.py:305} INFO - Filling up the DagBag from /opt/workflow/airflow/dags/sf_dags_n/dm_sf_main.py [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task schedule: 45 1,2,3,4,5,6 * * * [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task divisor: 5 [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task ### found sales_forecast_prophet in operator_pyspark_conf, overwrite default conf ! [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task ## Job[sales_forecast_prophet] Command-line : [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task --- [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task spark-submitx [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task --- [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: Subtask branch_task [2019-10-17
[jira] [Updated] (AIRFLOW-5795) Airflow cached old Code and Veriables
[ https://issues.apache.org/jira/browse/AIRFLOW-5795?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] kasim updated AIRFLOW-5795: --- Attachment: (was: 1)A]~UL_N@TVU072M)68WHP.png) > Airflow cached old Code and Veriables > -- > > Key: AIRFLOW-5795 > URL: https://issues.apache.org/jira/browse/AIRFLOW-5795 > Project: Apache Airflow > Issue Type: Bug > Components: DAG, database >Affects Versions: 1.10.3 >Reporter: kasim >Priority: Major > Attachments: image-2019-10-28-12-32-55-531.png > > > My dag start_date is configed in Variables, > > {code:java} > from datetime import datetime > from airflow.models import Variable > class Config(object): > version = "V21" > > dag_start_date = datetime(2019, 10, 25) > sf_schedule_report = "30 8 * * *" > sf_schedule_etl = '30 1 * * *' > sf_schedule_main = "45 3,4,5,6,7,8 * * *" > CONFIG_KEY = 'sf_config_%s' % Config.version > sf_config = Variable.get(CONFIG_KEY, deserialize_json=True, default_var={}) > if sf_config: > for k, v in sf_config.items(): > print(f'Overwrite {k} by {v}') > if hasattr(Config, k): > if k == 'dag_start_date': > setattr(Config, k, datetime.strptime(v, '%Y-%m-%d') ) > else: > setattr(Config, k, v) > print(' config ### \n\n') > print(f'CONFIG_KEY: {CONFIG_KEY}') > print(f'CONFIG DICT: {sf_config}') > print(' config end ### \n\n') > {code} > My dag init as : > {code:java} > dag = DAG('dm_sf_etl_%s' % Config.version, > start_date=Config.dag_start_date, > default_args=default_args, > schedule_interval=schedule, > user_defined_filters={ > 'mod' : lambda s, d:s%d > }, > ) > {code} > Variables is : > !image-2019-10-28-12-32-55-531.png! > > > But the log shows: > * airflow tried 4 times > * it didn't read Variable at first time > * it use somewhere cached start start_date `2019-10-17` , schedule: 45 > 1,2,3,4,5,6 * * * (This is old settings on Variable 10 days ago ) > * it did read new Variable on later attampt > > I have tried delete files, delete dag on webui, delete related task_instace > in database. But still same error . > I think there are some cache in scheduler memory cache , I can't restart > airflow , so can't confirm. But this shouldn't happen. > > {code:java} > *** Reading local file: > /data/opt/workflow/airflow/logs/dm_sf_main_V21/branch_task/2019-10-17T01:45:00+08:00/1.log > [2019-10-17 02:45:15,628] {__init__.py:1139} INFO - Dependencies all met for > > [2019-10-17 02:45:15,644] {__init__.py:1139} INFO - Dependencies all met for > > [2019-10-17 02:45:15,644] {__init__.py:1353} INFO - > > [2019-10-17 02:45:15,645] {__init__.py:1354} INFO - Starting attempt 1 of 1 > [2019-10-17 02:45:15,645] {__init__.py:1355} INFO - > > [2019-10-17 02:45:15,702] {__init__.py:1374} INFO - Executing > on 2019-10-17T01:45:00+08:00 > [2019-10-17 02:45:15,702] {base_task_runner.py:119} INFO - Running: > ['airflow', 'run', 'dm_sf_main_V21', 'branch_task', > '2019-10-17T01:45:00+08:00', '--job_id', '154142', '--raw', '-sd', > 'DAGS_FOLDER/sf_dags_n/dm_sf_main.py', '--cfg_path', '/tmp/tmp4p1ml6pq'] > [2019-10-17 02:45:16,180] {base_task_runner.py:101} INFO - Job 154142: > Subtask branch_task [2019-10-17 02:45:16,179] {settings.py:182} INFO - > settings.configure_orm(): Using pool settings. pool_size=5, > pool_recycle=1800, pid=17786 > [2019-10-17 02:45:16,341] {base_task_runner.py:101} INFO - Job 154142: > Subtask branch_task [2019-10-17 02:45:16,340] {__init__.py:51} INFO - Using > executor CeleryExecutor > [2019-10-17 02:45:16,643] {base_task_runner.py:101} INFO - Job 154142: > Subtask branch_task [2019-10-17 02:45:16,642] {__init__.py:305} INFO - > Filling up the DagBag from /opt/workflow/airflow/dags/sf_dags_n/dm_sf_main.py > [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: > Subtask branch_task > [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: > Subtask branch_task schedule: 45 1,2,3,4,5,6 * * * > [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: > Subtask branch_task divisor: 5 > [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: > Subtask branch_task > [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: > Subtask branch_task ### found sales_forecast_prophet in > operator_pyspark_conf, overwrite default conf ! > [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: > Subtask branch_task > [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: > Subtask branch_task ##
[jira] [Updated] (AIRFLOW-5795) Airflow cached old Code and Veriables
[ https://issues.apache.org/jira/browse/AIRFLOW-5795?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] kasim updated AIRFLOW-5795: --- Attachment: image-2019-10-28-12-32-55-531.png > Airflow cached old Code and Veriables > -- > > Key: AIRFLOW-5795 > URL: https://issues.apache.org/jira/browse/AIRFLOW-5795 > Project: Apache Airflow > Issue Type: Bug > Components: DAG, database >Affects Versions: 1.10.3 >Reporter: kasim >Priority: Major > Attachments: 1)A]~UL_N@TVU072M)68WHP.png, > image-2019-10-28-12-32-55-531.png > > > My dag start_date is configed in Variables, > > {code:java} > from datetime import datetime > from airflow.models import Variable > class Config(object): > version = "V21" > > dag_start_date = datetime(2019, 10, 25) > sf_schedule_report = "30 8 * * *" > sf_schedule_etl = '30 1 * * *' > sf_schedule_main = "45 3,4,5,6,7,8 * * *" > CONFIG_KEY = 'sf_config_%s' % Config.version > sf_config = Variable.get(CONFIG_KEY, deserialize_json=True, default_var={}) > if sf_config: > for k, v in sf_config.items(): > print(f'Overwrite {k} by {v}') > if hasattr(Config, k): > if k == 'dag_start_date': > setattr(Config, k, datetime.strptime(v, '%Y-%m-%d') ) > else: > setattr(Config, k, v) > print(' config ### \n\n') > print(f'CONFIG_KEY: {CONFIG_KEY}') > print(f'CONFIG DICT: {sf_config}') > print(' config end ### \n\n') > {code} > My dag init as : > {code:java} > dag = DAG('dm_sf_etl_%s' % Config.version, > start_date=Config.dag_start_date, > default_args=default_args, > schedule_interval=schedule, > user_defined_filters={ > 'mod' : lambda s, d:s%d > }, > ) > {code} > Variables is : > !image-2019-10-28-12-25-56-300.png! > > > But the log shows: > * airflow tried 4 times > * it didn't read Variable at first time > * it use somewhere cached start start_date `2019-10-17` , schedule: 45 > 1,2,3,4,5,6 * * * (This is old settings on Variable 10 days ago ) > * it did read new Variable on later attampt > > I have tried delete files, delete dag on webui, delete related task_instace > in database. But still same error . > I think there are some cache in scheduler memory cache , I can't restart > airflow , so can't confirm. But this shouldn't happen. > > {code:java} > *** Reading local file: > /data/opt/workflow/airflow/logs/dm_sf_main_V21/branch_task/2019-10-17T01:45:00+08:00/1.log > [2019-10-17 02:45:15,628] {__init__.py:1139} INFO - Dependencies all met for > > [2019-10-17 02:45:15,644] {__init__.py:1139} INFO - Dependencies all met for > > [2019-10-17 02:45:15,644] {__init__.py:1353} INFO - > > [2019-10-17 02:45:15,645] {__init__.py:1354} INFO - Starting attempt 1 of 1 > [2019-10-17 02:45:15,645] {__init__.py:1355} INFO - > > [2019-10-17 02:45:15,702] {__init__.py:1374} INFO - Executing > on 2019-10-17T01:45:00+08:00 > [2019-10-17 02:45:15,702] {base_task_runner.py:119} INFO - Running: > ['airflow', 'run', 'dm_sf_main_V21', 'branch_task', > '2019-10-17T01:45:00+08:00', '--job_id', '154142', '--raw', '-sd', > 'DAGS_FOLDER/sf_dags_n/dm_sf_main.py', '--cfg_path', '/tmp/tmp4p1ml6pq'] > [2019-10-17 02:45:16,180] {base_task_runner.py:101} INFO - Job 154142: > Subtask branch_task [2019-10-17 02:45:16,179] {settings.py:182} INFO - > settings.configure_orm(): Using pool settings. pool_size=5, > pool_recycle=1800, pid=17786 > [2019-10-17 02:45:16,341] {base_task_runner.py:101} INFO - Job 154142: > Subtask branch_task [2019-10-17 02:45:16,340] {__init__.py:51} INFO - Using > executor CeleryExecutor > [2019-10-17 02:45:16,643] {base_task_runner.py:101} INFO - Job 154142: > Subtask branch_task [2019-10-17 02:45:16,642] {__init__.py:305} INFO - > Filling up the DagBag from /opt/workflow/airflow/dags/sf_dags_n/dm_sf_main.py > [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: > Subtask branch_task > [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: > Subtask branch_task schedule: 45 1,2,3,4,5,6 * * * > [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: > Subtask branch_task divisor: 5 > [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: > Subtask branch_task > [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: > Subtask branch_task ### found sales_forecast_prophet in > operator_pyspark_conf, overwrite default conf ! > [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: > Subtask branch_task > [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: > Subtask
[jira] [Updated] (AIRFLOW-5795) Airflow cached old Code and Veriables
[ https://issues.apache.org/jira/browse/AIRFLOW-5795?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] kasim updated AIRFLOW-5795: --- Attachment: 1)A]~UL_N@TVU072M)68WHP.png > Airflow cached old Code and Veriables > -- > > Key: AIRFLOW-5795 > URL: https://issues.apache.org/jira/browse/AIRFLOW-5795 > Project: Apache Airflow > Issue Type: Bug > Components: DAG, database >Affects Versions: 1.10.3 >Reporter: kasim >Priority: Major > Attachments: 1)A]~UL_N@TVU072M)68WHP.png, > image-2019-10-28-12-32-55-531.png > > > My dag start_date is configed in Variables, > > {code:java} > from datetime import datetime > from airflow.models import Variable > class Config(object): > version = "V21" > > dag_start_date = datetime(2019, 10, 25) > sf_schedule_report = "30 8 * * *" > sf_schedule_etl = '30 1 * * *' > sf_schedule_main = "45 3,4,5,6,7,8 * * *" > CONFIG_KEY = 'sf_config_%s' % Config.version > sf_config = Variable.get(CONFIG_KEY, deserialize_json=True, default_var={}) > if sf_config: > for k, v in sf_config.items(): > print(f'Overwrite {k} by {v}') > if hasattr(Config, k): > if k == 'dag_start_date': > setattr(Config, k, datetime.strptime(v, '%Y-%m-%d') ) > else: > setattr(Config, k, v) > print(' config ### \n\n') > print(f'CONFIG_KEY: {CONFIG_KEY}') > print(f'CONFIG DICT: {sf_config}') > print(' config end ### \n\n') > {code} > My dag init as : > {code:java} > dag = DAG('dm_sf_etl_%s' % Config.version, > start_date=Config.dag_start_date, > default_args=default_args, > schedule_interval=schedule, > user_defined_filters={ > 'mod' : lambda s, d:s%d > }, > ) > {code} > Variables is : > !image-2019-10-28-12-25-56-300.png! > > > But the log shows: > * airflow tried 4 times > * it didn't read Variable at first time > * it use somewhere cached start start_date `2019-10-17` , schedule: 45 > 1,2,3,4,5,6 * * * (This is old settings on Variable 10 days ago ) > * it did read new Variable on later attampt > > I have tried delete files, delete dag on webui, delete related task_instace > in database. But still same error . > I think there are some cache in scheduler memory cache , I can't restart > airflow , so can't confirm. But this shouldn't happen. > > {code:java} > *** Reading local file: > /data/opt/workflow/airflow/logs/dm_sf_main_V21/branch_task/2019-10-17T01:45:00+08:00/1.log > [2019-10-17 02:45:15,628] {__init__.py:1139} INFO - Dependencies all met for > > [2019-10-17 02:45:15,644] {__init__.py:1139} INFO - Dependencies all met for > > [2019-10-17 02:45:15,644] {__init__.py:1353} INFO - > > [2019-10-17 02:45:15,645] {__init__.py:1354} INFO - Starting attempt 1 of 1 > [2019-10-17 02:45:15,645] {__init__.py:1355} INFO - > > [2019-10-17 02:45:15,702] {__init__.py:1374} INFO - Executing > on 2019-10-17T01:45:00+08:00 > [2019-10-17 02:45:15,702] {base_task_runner.py:119} INFO - Running: > ['airflow', 'run', 'dm_sf_main_V21', 'branch_task', > '2019-10-17T01:45:00+08:00', '--job_id', '154142', '--raw', '-sd', > 'DAGS_FOLDER/sf_dags_n/dm_sf_main.py', '--cfg_path', '/tmp/tmp4p1ml6pq'] > [2019-10-17 02:45:16,180] {base_task_runner.py:101} INFO - Job 154142: > Subtask branch_task [2019-10-17 02:45:16,179] {settings.py:182} INFO - > settings.configure_orm(): Using pool settings. pool_size=5, > pool_recycle=1800, pid=17786 > [2019-10-17 02:45:16,341] {base_task_runner.py:101} INFO - Job 154142: > Subtask branch_task [2019-10-17 02:45:16,340] {__init__.py:51} INFO - Using > executor CeleryExecutor > [2019-10-17 02:45:16,643] {base_task_runner.py:101} INFO - Job 154142: > Subtask branch_task [2019-10-17 02:45:16,642] {__init__.py:305} INFO - > Filling up the DagBag from /opt/workflow/airflow/dags/sf_dags_n/dm_sf_main.py > [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: > Subtask branch_task > [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: > Subtask branch_task schedule: 45 1,2,3,4,5,6 * * * > [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: > Subtask branch_task divisor: 5 > [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: > Subtask branch_task > [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: > Subtask branch_task ### found sales_forecast_prophet in > operator_pyspark_conf, overwrite default conf ! > [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: > Subtask branch_task > [2019-10-17 02:45:16,704] {base_task_runner.py:101} INFO - Job 154142: > Subtask
[jira] [Closed] (AIRFLOW-4806) Raise `Cannot execute error` even if that dag successed
[ https://issues.apache.org/jira/browse/AIRFLOW-4806?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] kasim closed AIRFLOW-4806. -- Resolution: Not A Bug > Raise `Cannot execute error` even if that dag successed > --- > > Key: AIRFLOW-4806 > URL: https://issues.apache.org/jira/browse/AIRFLOW-4806 > Project: Apache Airflow > Issue Type: Bug > Components: DagRun >Affects Versions: 1.10.3 >Reporter: kasim >Priority: Major > > > airflow log : > > {code:java} > *** Log file does not exist: > /opt/airflow/logs/test_wordcount/test_wordcount/2019-05-31T16:00:00+00:00/7.log > *** Fetching from: > http://dc07:8793/log/test_wordcount/test_wordcount/2019-05-31T16:00:00+00:00/7.log > [2019-06-18 15:58:33,562] {__init__.py:1139} INFO - Dependencies all met for > [queued]> > [2019-06-18 15:58:33,585] {__init__.py:1139} INFO - Dependencies all met for > [queued]> > [2019-06-18 15:58:33,585] {__init__.py:1353} INFO - > > [2019-06-18 15:58:33,585] {__init__.py:1354} INFO - Starting attempt 7 of 7 > [2019-06-18 15:58:33,585] {__init__.py:1355} INFO - > > [2019-06-18 15:58:33,594] {__init__.py:1374} INFO - Executing > on 2019-05-31T16:00:00+00:00 > [2019-06-18 15:58:33,594] {base_task_runner.py:119} INFO - Running: > ['airflow', 'run', 'test_wordcount', 'test_wordcount', > '2019-05-31T16:00:00+00:00', '--job_id', '413', '--raw', '-sd', > 'DAGS_FOLDER/wordcount.py', '--cfg_path', '/tmp/tmpkqb2n943'] > [2019-06-18 15:58:34,094] {base_task_runner.py:101} INFO - Job 413: Subtask > test_wordcount > /opt/anaconda3/envs/airflow/lib/python3.6/site-packages/airflow/configuration.py:590: > DeprecationWarning: You have two airflow.cfg files: > /opt/airflow/airflow/airflow.cfg and /opt/airflow/airflow.cfg. Airflow used > to look at ~/airflow/airflow.cfg, even when AIRFLOW_HOME was set to a > different value. Airflow will now only read /opt/airflow/airflow.cfg, and you > should remove the other file > [2019-06-18 15:58:34,095] {base_task_runner.py:101} INFO - Job 413: Subtask > test_wordcount category=DeprecationWarning, > [2019-06-18 15:58:34,191] {base_task_runner.py:101} INFO - Job 413: Subtask > test_wordcount [2019-06-18 15:58:34,191] {settings.py:182} INFO - > settings.configure_orm(): Using pool settings. pool_size=5, > pool_recycle=1800, pid=30605 > [2019-06-18 15:58:34,429] {base_task_runner.py:101} INFO - Job 413: Subtask > test_wordcount [2019-06-18 15:58:34,429] {default_celery.py:90} WARNING - You > have configured a result_backend of redis://192.168.20.17/1, it is highly > recommended to use an alternative result_backend (i.e. a database). > [2019-06-18 15:58:34,430] {base_task_runner.py:101} INFO - Job 413: Subtask > test_wordcount [2019-06-18 15:58:34,430] {__init__.py:51} INFO - Using > executor CeleryExecutor > [2019-06-18 15:58:34,704] {base_task_runner.py:101} INFO - Job 413: Subtask > test_wordcount [2019-06-18 15:58:34,704] {__init__.py:305} INFO - Filling up > the DagBag from /opt/airflow/dags/wordcount.py > [2019-06-18 15:58:34,754] {base_task_runner.py:101} INFO - Job 413: Subtask > test_wordcount [2019-06-18 15:58:34,754] {cli.py:517} INFO - Running > [running]> on host dc07 > [2019-06-18 15:58:34,875] {logging_mixin.py:95} INFO - [2019-06-18 > 15:58:34,875] {base_hook.py:83} INFO - Using connection to: id: > spark_default. Host: yarn, Port: None, Schema: None, Login: None, Password: > None, extra: {'master': 'yarn', 'deploy-mode': 'cluster', 'queue': 'data', > 'env_vars': {'HADOOP_USER_NAME': 'hdfs'}, 'spark_home': > '/opt/cloudera/parcels/CDH/lib/spark/'} > [2019-06-18 15:58:34,876] {logging_mixin.py:95} INFO - [2019-06-18 > 15:58:34,876] {spark_submit_hook.py:295} INFO - Spark-Submit cmd: > ['spark-submit', '--master', 'yarn', '--py-files', > '/opt/airflow/dags/jobs.zip,/opt/airflow/dags/libs.zip', '--num-executors', > '1', '--executor-cores', '1', '--executor-memory', '1g', '--driver-memory', > '1g', '--name', 'test_wordcount', '--queue', 'data', '--deploy-mode', > 'cluster', '/opt/airflow/dags/main.py', '--job', 'wordcount', '--job-args', > 'input_path=/test/words.txt', 'output_path=/test/wordcount.csv'] > [2019-06-18 15:58:41,557] {logging_mixin.py:95} INFO - [2019-06-18 > 15:58:41,557] {spark_submit_hook.py:400} INFO - Identified spark driver id: > application_1560762064551_0032 > [2019-06-18 15:58:44,791] {logging_mixin.py:95} INFO - [2019-06-18 > 15:58:44,790] {spark_submit_hook.py:400} INFO - Identified spark driver id: > application_1560762064551_0032 > [2019-06-18 15:58:44,929] {logging_mixin.py:95} INFO - [2019-06-18 > 15:58:44,929] {spark_submit_hook.py:400} INFO - Identified spark
[jira] [Updated] (AIRFLOW-5538) Add a flag to make scheduling trigger on start_date instead of execution_date (make execution_date equal to start_date)
[ https://issues.apache.org/jira/browse/AIRFLOW-5538?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] kasim updated AIRFLOW-5538: --- Description: >From [https://airflow.apache.org/scheduler.html] : > Note that if you run a DAG on a schedule_interval of one day, the run > stamped 2016-01-01 will be trigger soon after 2016-01-01T23:59. In > other words, the job instance is started once the period it covers has > ended. This feature is very hurt . For example I have etl job which run every day, schedule_interval is `0 1 * * *`, so it will trigger 2019-09-22 01:00:00 job on 2019-09-23 01:00:00 . But my etl is processing all data before start_date , means data range is between (history, 2019-09-23 00:00:00) , and we can't use `datetime.now()` because this is unable to reproduce. This force me add 1 day to execution_date: ```python etl_end_time = "\{{ (execution_date + macros.timedelta(days=1)).strftime('%Y-%m-%d 00:00:00') }}" ``` However, when I need run a job with schedule_interval `45 2,3,4,5,6 * * *` , the `2019-09-22 06:45:00` job would run on `2019-09-23 02:45:00`, which is the day after execution_date . Instead of adding a day, I had to changed schedule_interval `45 2,3,4,5,6,7 * * *` and put a dummy operator on last run. And in this situation , you don't need add one day to execution_date , this means you have to define two `etl_end_time` to represent a same date in jobs with different schedule_interval . All these are very uncomfortable for me , adding a config or built-in method to make execution_date equal to start_date. would be very nice . If this become true, would only have one etl_end_time = "\{{ (execution_date.strftime('%Y-%m-%d 00:00:00') }}" . was: >From [https://airflow.apache.org/scheduler.html] : > Note that if you run a DAG on a schedule_interval of one day, the run > stamped 2016-01-01 will be trigger soon after 2016-01-01T23:59. In > other words, the job instance is started once the period it covers has > ended. This feature is very hurt . For example I have etl job which run every day, schedule_interval is `0 1 * * *`, so it will trigger 2019-09-22 01:00:00 job on 2019-09-23 01:00:00 . But my etl is processing all data before start_date , means data range is between (history, 2019-09-23 00:00:00) , and we can't use `datetime.now()` because this is unable to reproduce. This force me add 1 day to execution_date: ```python etl_end_time = "\{{ (execution_date + macros.timedelta(days=1)).strftime('%Y-%m-%d 00:00:00') }}" ``` However, when I need run a job with schedule_interval `45 2,3,4,5,6 * * *` , the `2019-09-22 06:45:00` job would run on `2019-09-23 02:45:00`, which is the day after execution_date . Instead of adding a day, I had to changed schedule_interval `45 2,3,4,5,6,7 * * *` and put a dummy operator on last run. And in this situation , you don't need add one day to execution_date , this means you have to define two `etl_end_time` to represent a same date in jobs with different schedule_interval . All these are very uncomfortable for me , adding a config or built-in method to make execution_date equal to start_date. would be very nice . > Add a flag to make scheduling trigger on start_date instead of execution_date > (make execution_date equal to start_date) > --- > > Key: AIRFLOW-5538 > URL: https://issues.apache.org/jira/browse/AIRFLOW-5538 > Project: Apache Airflow > Issue Type: Improvement > Components: DagRun >Affects Versions: 1.10.5 >Reporter: kasim >Priority: Major > > From [https://airflow.apache.org/scheduler.html] : > > Note that if you run a DAG on a schedule_interval of one day, the run > > stamped 2016-01-01 will be trigger soon after 2016-01-01T23:59. In > > other words, the job instance is started once the period it covers has > > ended. > This feature is very hurt . > For example I have etl job which run every day, schedule_interval is `0 1 * * > *`, so it will trigger 2019-09-22 01:00:00 job on 2019-09-23 01:00:00 . But > my etl is processing all data before start_date , means data range is between > (history, 2019-09-23 00:00:00) , and we can't use `datetime.now()` because > this is unable to reproduce. This force me add 1 day to execution_date: > ```python > etl_end_time = "\{{ (execution_date + > macros.timedelta(days=1)).strftime('%Y-%m-%d 00:00:00') }}" > ``` > However, when I need run a job with schedule_interval `45 2,3,4,5,6 * * *` , > the `2019-09-22 06:45:00` job would run on `2019-09-23 02:45:00`, which is > the day after execution_date . Instead of adding a day, I had to changed > schedule_interval `45 2,3,4,5,6,7 * * *` and put a dummy operator on last run. > And in this situation , you don't need add one day to
[jira] [Updated] (AIRFLOW-5538) Add a flag to make scheduling trigger on start_date instead of execution_date (make execution_date equal to start_date)
[ https://issues.apache.org/jira/browse/AIRFLOW-5538?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] kasim updated AIRFLOW-5538: --- Description: >From [https://airflow.apache.org/scheduler.html] : > Note that if you run a DAG on a schedule_interval of one day, the run > stamped 2016-01-01 will be trigger soon after 2016-01-01T23:59. In > other words, the job instance is started once the period it covers has > ended. This feature is very hurt . For example I have etl job which run every day, schedule_interval is `0 1 * * *`, so it will trigger 2019-09-22 01:00:00 job on 2019-09-23 01:00:00 . But my etl is processing all data before start_date , means data range is between (history, 2019-09-23 00:00:00) , and we can't use `datetime.now()` because this is unable to reproduce. This force me add 1 day to execution_date: ```python etl_end_time = "\{{ (execution_date + macros.timedelta(days=1)).strftime('%Y-%m-%d 00:00:00') }}" ``` However, when I need run a job with schedule_interval `45 2,3,4,5,6 * * *` , the `2019-09-22 06:45:00` job would run on `2019-09-23 02:45:00`, which is the day after execution_date . Instead of adding a day, I had to changed schedule_interval `45 2,3,4,5,6,7 * * *` and put a dummy operator on last run. And in this situation , you don't need add one day to execution_date , this means you have to define two `etl_end_time` to represent a same date in jobs with different schedule_interval . All these are very uncomfortable for me , adding a config or built-in method to make execution_date equal to start_date. would be very nice . was: >From https://airflow.apache.org/scheduler.html : > Note that if you run a DAG on a schedule_interval of one day, the run > stamped 2016-01-01 will be trigger soon after 2016-01-01T23:59. In > other words, the job instance is started once the period it covers has > ended. This feature is very hurt . For example I have etl job which run every day, schedule_interval is `0 1 * * *`, so it will trigger 2019-09-22 01:00:00 job on 2019-09-23 01:00:00 . But my etl is processing all data before start_date , means data range is between (history, 2019-09-23 00:00:00) , and we can't use `datetime.now()` because this is unable to reproduce. This force me add 1 day to execution_date: ```python etl_end_time = "\{{ (execution_date + macros.timedelta(days=1)).strftime('%Y-%m-%d 00:00:00') }}" ``` However, when I need run a job with schedule_interval `45 2,3,4,5,6 * * *` , the `2019-09-22 06:45:00` job would run on `2019-09-23 02:45:00`, which is the day after execution_date . Instead of adding a day, I had to changed schedule_interval `45 2,3,4,5,6,7 * * *` and put a dummy operator on last run. And in this situation , you don't need add one day to execution_date , this means you have to define two `etl_end_time` to represent a same date in jobs with different schedule_interval . All these are very uncomfortable for me , is there any config or built-in method to make execution_date equal to start_date ? Or I have to modify airflow source code ... > Add a flag to make scheduling trigger on start_date instead of execution_date > (make execution_date equal to start_date) > --- > > Key: AIRFLOW-5538 > URL: https://issues.apache.org/jira/browse/AIRFLOW-5538 > Project: Apache Airflow > Issue Type: Improvement > Components: DagRun >Affects Versions: 1.10.5 >Reporter: kasim >Priority: Major > > From [https://airflow.apache.org/scheduler.html] : > > Note that if you run a DAG on a schedule_interval of one day, the run > > stamped 2016-01-01 will be trigger soon after 2016-01-01T23:59. In > > other words, the job instance is started once the period it covers has > > ended. > This feature is very hurt . > For example I have etl job which run every day, schedule_interval is `0 1 * * > *`, so it will trigger 2019-09-22 01:00:00 job on 2019-09-23 01:00:00 . But > my etl is processing all data before start_date , means data range is between > (history, 2019-09-23 00:00:00) , and we can't use `datetime.now()` because > this is unable to reproduce. This force me add 1 day to execution_date: > ```python > etl_end_time = "\{{ (execution_date + > macros.timedelta(days=1)).strftime('%Y-%m-%d 00:00:00') }}" > ``` > However, when I need run a job with schedule_interval `45 2,3,4,5,6 * * *` , > the `2019-09-22 06:45:00` job would run on `2019-09-23 02:45:00`, which is > the day after execution_date . Instead of adding a day, I had to changed > schedule_interval `45 2,3,4,5,6,7 * * *` and put a dummy operator on last run. > And in this situation , you don't need add one day to execution_date , this > means you have to define two `etl_end_time` to represent a same date in
[jira] [Created] (AIRFLOW-5538) Add a flag to make scheduling trigger on start_date instead of execution_date (make execution_date equal to start_date)
kasim created AIRFLOW-5538: -- Summary: Add a flag to make scheduling trigger on start_date instead of execution_date (make execution_date equal to start_date) Key: AIRFLOW-5538 URL: https://issues.apache.org/jira/browse/AIRFLOW-5538 Project: Apache Airflow Issue Type: Improvement Components: DagRun Affects Versions: 1.10.5 Reporter: kasim >From https://airflow.apache.org/scheduler.html : > Note that if you run a DAG on a schedule_interval of one day, the run > stamped 2016-01-01 will be trigger soon after 2016-01-01T23:59. In > other words, the job instance is started once the period it covers has > ended. This feature is very hurt . For example I have etl job which run every day, schedule_interval is `0 1 * * *`, so it will trigger 2019-09-22 01:00:00 job on 2019-09-23 01:00:00 . But my etl is processing all data before start_date , means data range is between (history, 2019-09-23 00:00:00) , and we can't use `datetime.now()` because this is unable to reproduce. This force me add 1 day to execution_date: ```python etl_end_time = "\{{ (execution_date + macros.timedelta(days=1)).strftime('%Y-%m-%d 00:00:00') }}" ``` However, when I need run a job with schedule_interval `45 2,3,4,5,6 * * *` , the `2019-09-22 06:45:00` job would run on `2019-09-23 02:45:00`, which is the day after execution_date . Instead of adding a day, I had to changed schedule_interval `45 2,3,4,5,6,7 * * *` and put a dummy operator on last run. And in this situation , you don't need add one day to execution_date , this means you have to define two `etl_end_time` to represent a same date in jobs with different schedule_interval . All these are very uncomfortable for me , is there any config or built-in method to make execution_date equal to start_date ? Or I have to modify airflow source code ... -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (AIRFLOW-4806) Raise `Cannot execute error` even if that dag successed
[ https://issues.apache.org/jira/browse/AIRFLOW-4806?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] kasim updated AIRFLOW-4806: --- Description: airflow log : {code:java} *** Log file does not exist: /opt/airflow/logs/test_wordcount/test_wordcount/2019-05-31T16:00:00+00:00/7.log *** Fetching from: http://dc07:8793/log/test_wordcount/test_wordcount/2019-05-31T16:00:00+00:00/7.log [2019-06-18 15:58:33,562] {__init__.py:1139} INFO - Dependencies all met for [2019-06-18 15:58:33,585] {__init__.py:1139} INFO - Dependencies all met for [2019-06-18 15:58:33,585] {__init__.py:1353} INFO - [2019-06-18 15:58:33,585] {__init__.py:1354} INFO - Starting attempt 7 of 7 [2019-06-18 15:58:33,585] {__init__.py:1355} INFO - [2019-06-18 15:58:33,594] {__init__.py:1374} INFO - Executing on 2019-05-31T16:00:00+00:00 [2019-06-18 15:58:33,594] {base_task_runner.py:119} INFO - Running: ['airflow', 'run', 'test_wordcount', 'test_wordcount', '2019-05-31T16:00:00+00:00', '--job_id', '413', '--raw', '-sd', 'DAGS_FOLDER/wordcount.py', '--cfg_path', '/tmp/tmpkqb2n943'] [2019-06-18 15:58:34,094] {base_task_runner.py:101} INFO - Job 413: Subtask test_wordcount /opt/anaconda3/envs/airflow/lib/python3.6/site-packages/airflow/configuration.py:590: DeprecationWarning: You have two airflow.cfg files: /opt/airflow/airflow/airflow.cfg and /opt/airflow/airflow.cfg. Airflow used to look at ~/airflow/airflow.cfg, even when AIRFLOW_HOME was set to a different value. Airflow will now only read /opt/airflow/airflow.cfg, and you should remove the other file [2019-06-18 15:58:34,095] {base_task_runner.py:101} INFO - Job 413: Subtask test_wordcount category=DeprecationWarning, [2019-06-18 15:58:34,191] {base_task_runner.py:101} INFO - Job 413: Subtask test_wordcount [2019-06-18 15:58:34,191] {settings.py:182} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, pid=30605 [2019-06-18 15:58:34,429] {base_task_runner.py:101} INFO - Job 413: Subtask test_wordcount [2019-06-18 15:58:34,429] {default_celery.py:90} WARNING - You have configured a result_backend of redis://192.168.20.17/1, it is highly recommended to use an alternative result_backend (i.e. a database). [2019-06-18 15:58:34,430] {base_task_runner.py:101} INFO - Job 413: Subtask test_wordcount [2019-06-18 15:58:34,430] {__init__.py:51} INFO - Using executor CeleryExecutor [2019-06-18 15:58:34,704] {base_task_runner.py:101} INFO - Job 413: Subtask test_wordcount [2019-06-18 15:58:34,704] {__init__.py:305} INFO - Filling up the DagBag from /opt/airflow/dags/wordcount.py [2019-06-18 15:58:34,754] {base_task_runner.py:101} INFO - Job 413: Subtask test_wordcount [2019-06-18 15:58:34,754] {cli.py:517} INFO - Running on host dc07 [2019-06-18 15:58:34,875] {logging_mixin.py:95} INFO - [2019-06-18 15:58:34,875] {base_hook.py:83} INFO - Using connection to: id: spark_default. Host: yarn, Port: None, Schema: None, Login: None, Password: None, extra: {'master': 'yarn', 'deploy-mode': 'cluster', 'queue': 'data', 'env_vars': {'HADOOP_USER_NAME': 'hdfs'}, 'spark_home': '/opt/cloudera/parcels/CDH/lib/spark/'} [2019-06-18 15:58:34,876] {logging_mixin.py:95} INFO - [2019-06-18 15:58:34,876] {spark_submit_hook.py:295} INFO - Spark-Submit cmd: ['spark-submit', '--master', 'yarn', '--py-files', '/opt/airflow/dags/jobs.zip,/opt/airflow/dags/libs.zip', '--num-executors', '1', '--executor-cores', '1', '--executor-memory', '1g', '--driver-memory', '1g', '--name', 'test_wordcount', '--queue', 'data', '--deploy-mode', 'cluster', '/opt/airflow/dags/main.py', '--job', 'wordcount', '--job-args', 'input_path=/test/words.txt', 'output_path=/test/wordcount.csv'] [2019-06-18 15:58:41,557] {logging_mixin.py:95} INFO - [2019-06-18 15:58:41,557] {spark_submit_hook.py:400} INFO - Identified spark driver id: application_1560762064551_0032 [2019-06-18 15:58:44,791] {logging_mixin.py:95} INFO - [2019-06-18 15:58:44,790] {spark_submit_hook.py:400} INFO - Identified spark driver id: application_1560762064551_0032 [2019-06-18 15:58:44,929] {logging_mixin.py:95} INFO - [2019-06-18 15:58:44,929] {spark_submit_hook.py:400} INFO - Identified spark driver id: application_1560762064551_0032 [2019-06-18 15:58:45,061] {logging_mixin.py:95} INFO - [2019-06-18 15:58:45,061] {spark_submit_hook.py:400} INFO - Identified spark driver id: application_1560762064551_0032 [2019-06-18 15:58:45,186] {logging_mixin.py:95} INFO - [2019-06-18 15:58:45,186] {spark_submit_hook.py:400} INFO - Identified spark driver id: application_1560762064551_0032 [2019-06-18 15:58:45,320] {logging_mixin.py:95} INFO - [2019-06-18 15:58:45,320] {spark_submit_hook.py:400} INFO - Identified spark driver id: application_1560762064551_0032 [2019-06-18 15:58:45,602]
[jira] [Updated] (AIRFLOW-4806) Raise `Cannot execute error` even if that dag successed
[ https://issues.apache.org/jira/browse/AIRFLOW-4806?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] kasim updated AIRFLOW-4806: --- Description: airflow log : {code:java} *** Log file does not exist: /opt/airflow/logs/test_wordcount/test_wordcount/2019-05-31T16:00:00+00:00/7.log *** Fetching from: http://dc07:8793/log/test_wordcount/test_wordcount/2019-05-31T16:00:00+00:00/7.log [2019-06-18 15:58:33,562] {__init__.py:1139} INFO - Dependencies all met for [2019-06-18 15:58:33,585] {__init__.py:1139} INFO - Dependencies all met for [2019-06-18 15:58:33,585] {__init__.py:1353} INFO - [2019-06-18 15:58:33,585] {__init__.py:1354} INFO - Starting attempt 7 of 7 [2019-06-18 15:58:33,585] {__init__.py:1355} INFO - [2019-06-18 15:58:33,594] {__init__.py:1374} INFO - Executing on 2019-05-31T16:00:00+00:00 [2019-06-18 15:58:33,594] {base_task_runner.py:119} INFO - Running: ['airflow', 'run', 'test_wordcount', 'test_wordcount', '2019-05-31T16:00:00+00:00', '--job_id', '413', '--raw', '-sd', 'DAGS_FOLDER/wordcount.py', '--cfg_path', '/tmp/tmpkqb2n943'] [2019-06-18 15:58:34,094] {base_task_runner.py:101} INFO - Job 413: Subtask test_wordcount /opt/anaconda3/envs/airflow/lib/python3.6/site-packages/airflow/configuration.py:590: DeprecationWarning: You have two airflow.cfg files: /opt/airflow/airflow/airflow.cfg and /opt/airflow/airflow.cfg. Airflow used to look at ~/airflow/airflow.cfg, even when AIRFLOW_HOME was set to a different value. Airflow will now only read /opt/airflow/airflow.cfg, and you should remove the other file [2019-06-18 15:58:34,095] {base_task_runner.py:101} INFO - Job 413: Subtask test_wordcount category=DeprecationWarning, [2019-06-18 15:58:34,191] {base_task_runner.py:101} INFO - Job 413: Subtask test_wordcount [2019-06-18 15:58:34,191] {settings.py:182} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, pid=30605 [2019-06-18 15:58:34,429] {base_task_runner.py:101} INFO - Job 413: Subtask test_wordcount [2019-06-18 15:58:34,429] {default_celery.py:90} WARNING - You have configured a result_backend of redis://192.168.20.17/1, it is highly recommended to use an alternative result_backend (i.e. a database). [2019-06-18 15:58:34,430] {base_task_runner.py:101} INFO - Job 413: Subtask test_wordcount [2019-06-18 15:58:34,430] {__init__.py:51} INFO - Using executor CeleryExecutor [2019-06-18 15:58:34,704] {base_task_runner.py:101} INFO - Job 413: Subtask test_wordcount [2019-06-18 15:58:34,704] {__init__.py:305} INFO - Filling up the DagBag from /opt/airflow/dags/wordcount.py [2019-06-18 15:58:34,754] {base_task_runner.py:101} INFO - Job 413: Subtask test_wordcount [2019-06-18 15:58:34,754] {cli.py:517} INFO - Running on host dc07 [2019-06-18 15:58:34,875] {logging_mixin.py:95} INFO - [2019-06-18 15:58:34,875] {base_hook.py:83} INFO - Using connection to: id: spark_default. Host: yarn, Port: None, Schema: None, Login: None, Password: None, extra: {'master': 'yarn', 'deploy-mode': 'cluster', 'queue': 'data', 'env_vars': {'HADOOP_USER_NAME': 'hdfs'}, 'spark_home': '/opt/cloudera/parcels/CDH/lib/spark/'} [2019-06-18 15:58:34,876] {logging_mixin.py:95} INFO - [2019-06-18 15:58:34,876] {spark_submit_hook.py:295} INFO - Spark-Submit cmd: ['spark-submit', '--master', 'yarn', '--py-files', '/opt/airflow/dags/jobs.zip,/opt/airflow/dags/libs.zip', '--num-executors', '1', '--executor-cores', '1', '--executor-memory', '1g', '--driver-memory', '1g', '--name', 'test_wordcount', '--queue', 'data', '--deploy-mode', 'cluster', '/opt/airflow/dags/main.py', '--job', 'wordcount', '--job-args', 'input_path=/test/words.txt', 'output_path=/test/wordcount.csv'] [2019-06-18 15:58:41,557] {logging_mixin.py:95} INFO - [2019-06-18 15:58:41,557] {spark_submit_hook.py:400} INFO - Identified spark driver id: application_1560762064551_0032 [2019-06-18 15:58:44,791] {logging_mixin.py:95} INFO - [2019-06-18 15:58:44,790] {spark_submit_hook.py:400} INFO - Identified spark driver id: application_1560762064551_0032 [2019-06-18 15:58:44,929] {logging_mixin.py:95} INFO - [2019-06-18 15:58:44,929] {spark_submit_hook.py:400} INFO - Identified spark driver id: application_1560762064551_0032 [2019-06-18 15:58:45,061] {logging_mixin.py:95} INFO - [2019-06-18 15:58:45,061] {spark_submit_hook.py:400} INFO - Identified spark driver id: application_1560762064551_0032 [2019-06-18 15:58:45,186] {logging_mixin.py:95} INFO - [2019-06-18 15:58:45,186] {spark_submit_hook.py:400} INFO - Identified spark driver id: application_1560762064551_0032 [2019-06-18 15:58:45,320] {logging_mixin.py:95} INFO - [2019-06-18 15:58:45,320] {spark_submit_hook.py:400} INFO - Identified spark driver id: application_1560762064551_0032 [2019-06-18 15:58:45,602]
[jira] [Updated] (AIRFLOW-4806) Raise `Cannot execute error` even if that dag successed
[ https://issues.apache.org/jira/browse/AIRFLOW-4806?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] kasim updated AIRFLOW-4806: --- Description: airflow log : {code:java} *** Log file does not exist: /opt/airflow/logs/test_wordcount/test_wordcount/2019-05-31T16:00:00+00:00/7.log *** Fetching from: http://dc07:8793/log/test_wordcount/test_wordcount/2019-05-31T16:00:00+00:00/7.log [2019-06-18 15:58:33,562] {__init__.py:1139} INFO - Dependencies all met for [2019-06-18 15:58:33,585] {__init__.py:1139} INFO - Dependencies all met for [2019-06-18 15:58:33,585] {__init__.py:1353} INFO - [2019-06-18 15:58:33,585] {__init__.py:1354} INFO - Starting attempt 7 of 7 [2019-06-18 15:58:33,585] {__init__.py:1355} INFO - [2019-06-18 15:58:33,594] {__init__.py:1374} INFO - Executing on 2019-05-31T16:00:00+00:00 [2019-06-18 15:58:33,594] {base_task_runner.py:119} INFO - Running: ['airflow', 'run', 'test_wordcount', 'test_wordcount', '2019-05-31T16:00:00+00:00', '--job_id', '413', '--raw', '-sd', 'DAGS_FOLDER/wordcount.py', '--cfg_path', '/tmp/tmpkqb2n943'] [2019-06-18 15:58:34,094] {base_task_runner.py:101} INFO - Job 413: Subtask test_wordcount /opt/anaconda3/envs/airflow/lib/python3.6/site-packages/airflow/configuration.py:590: DeprecationWarning: You have two airflow.cfg files: /opt/airflow/airflow/airflow.cfg and /opt/airflow/airflow.cfg. Airflow used to look at ~/airflow/airflow.cfg, even when AIRFLOW_HOME was set to a different value. Airflow will now only read /opt/airflow/airflow.cfg, and you should remove the other file [2019-06-18 15:58:34,095] {base_task_runner.py:101} INFO - Job 413: Subtask test_wordcount category=DeprecationWarning, [2019-06-18 15:58:34,191] {base_task_runner.py:101} INFO - Job 413: Subtask test_wordcount [2019-06-18 15:58:34,191] {settings.py:182} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, pid=30605 [2019-06-18 15:58:34,429] {base_task_runner.py:101} INFO - Job 413: Subtask test_wordcount [2019-06-18 15:58:34,429] {default_celery.py:90} WARNING - You have configured a result_backend of redis://192.168.20.17/1, it is highly recommended to use an alternative result_backend (i.e. a database). [2019-06-18 15:58:34,430] {base_task_runner.py:101} INFO - Job 413: Subtask test_wordcount [2019-06-18 15:58:34,430] {__init__.py:51} INFO - Using executor CeleryExecutor [2019-06-18 15:58:34,704] {base_task_runner.py:101} INFO - Job 413: Subtask test_wordcount [2019-06-18 15:58:34,704] {__init__.py:305} INFO - Filling up the DagBag from /opt/airflow/dags/wordcount.py [2019-06-18 15:58:34,754] {base_task_runner.py:101} INFO - Job 413: Subtask test_wordcount [2019-06-18 15:58:34,754] {cli.py:517} INFO - Running on host dc07 [2019-06-18 15:58:34,875] {logging_mixin.py:95} INFO - [2019-06-18 15:58:34,875] {base_hook.py:83} INFO - Using connection to: id: spark_default. Host: yarn, Port: None, Schema: None, Login: None, Password: None, extra: {'master': 'yarn', 'deploy-mode': 'cluster', 'queue': 'data', 'env_vars': {'HADOOP_USER_NAME': 'hdfs'}, 'spark_home': '/opt/cloudera/parcels/CDH/lib/spark/'} [2019-06-18 15:58:34,876] {logging_mixin.py:95} INFO - [2019-06-18 15:58:34,876] {spark_submit_hook.py:295} INFO - Spark-Submit cmd: ['spark-submit', '--master', 'yarn', '--py-files', '/opt/airflow/dags/jobs.zip,/opt/airflow/dags/libs.zip', '--num-executors', '1', '--executor-cores', '1', '--executor-memory', '1g', '--driver-memory', '1g', '--name', 'test_wordcount', '--queue', 'data', '--deploy-mode', 'cluster', '/opt/airflow/dags/main.py', '--job', 'wordcount', '--job-args', 'input_path=/test/words.txt', 'output_path=/test/wordcount.csv'] [2019-06-18 15:58:41,557] {logging_mixin.py:95} INFO - [2019-06-18 15:58:41,557] {spark_submit_hook.py:400} INFO - Identified spark driver id: application_1560762064551_0032 [2019-06-18 15:58:44,791] {logging_mixin.py:95} INFO - [2019-06-18 15:58:44,790] {spark_submit_hook.py:400} INFO - Identified spark driver id: application_1560762064551_0032 [2019-06-18 15:58:44,929] {logging_mixin.py:95} INFO - [2019-06-18 15:58:44,929] {spark_submit_hook.py:400} INFO - Identified spark driver id: application_1560762064551_0032 [2019-06-18 15:58:45,061] {logging_mixin.py:95} INFO - [2019-06-18 15:58:45,061] {spark_submit_hook.py:400} INFO - Identified spark driver id: application_1560762064551_0032 [2019-06-18 15:58:45,186] {logging_mixin.py:95} INFO - [2019-06-18 15:58:45,186] {spark_submit_hook.py:400} INFO - Identified spark driver id: application_1560762064551_0032 [2019-06-18 15:58:45,320] {logging_mixin.py:95} INFO - [2019-06-18 15:58:45,320] {spark_submit_hook.py:400} INFO - Identified spark driver id: application_1560762064551_0032 [2019-06-18 15:58:45,602]
[jira] [Created] (AIRFLOW-4806) Raise `Cannot execute error` even if that dag successed
kasim created AIRFLOW-4806: -- Summary: Raise `Cannot execute error` even if that dag successed Key: AIRFLOW-4806 URL: https://issues.apache.org/jira/browse/AIRFLOW-4806 Project: Apache Airflow Issue Type: Bug Components: DagRun Affects Versions: 1.10.3 Reporter: kasim log : {code:java} *** Log file does not exist: /opt/airflow/logs/test_wordcount/test_wordcount/2019-05-31T16:00:00+00:00/7.log *** Fetching from: http://dc07:8793/log/test_wordcount/test_wordcount/2019-05-31T16:00:00+00:00/7.log [2019-06-18 15:58:33,562] {__init__.py:1139} INFO - Dependencies all met for [2019-06-18 15:58:33,585] {__init__.py:1139} INFO - Dependencies all met for [2019-06-18 15:58:33,585] {__init__.py:1353} INFO - [2019-06-18 15:58:33,585] {__init__.py:1354} INFO - Starting attempt 7 of 7 [2019-06-18 15:58:33,585] {__init__.py:1355} INFO - [2019-06-18 15:58:33,594] {__init__.py:1374} INFO - Executing on 2019-05-31T16:00:00+00:00 [2019-06-18 15:58:33,594] {base_task_runner.py:119} INFO - Running: ['airflow', 'run', 'test_wordcount', 'test_wordcount', '2019-05-31T16:00:00+00:00', '--job_id', '413', '--raw', '-sd', 'DAGS_FOLDER/wordcount.py', '--cfg_path', '/tmp/tmpkqb2n943'] [2019-06-18 15:58:34,094] {base_task_runner.py:101} INFO - Job 413: Subtask test_wordcount /opt/anaconda3/envs/airflow/lib/python3.6/site-packages/airflow/configuration.py:590: DeprecationWarning: You have two airflow.cfg files: /opt/airflow/airflow/airflow.cfg and /opt/airflow/airflow.cfg. Airflow used to look at ~/airflow/airflow.cfg, even when AIRFLOW_HOME was set to a different value. Airflow will now only read /opt/airflow/airflow.cfg, and you should remove the other file [2019-06-18 15:58:34,095] {base_task_runner.py:101} INFO - Job 413: Subtask test_wordcount category=DeprecationWarning, [2019-06-18 15:58:34,191] {base_task_runner.py:101} INFO - Job 413: Subtask test_wordcount [2019-06-18 15:58:34,191] {settings.py:182} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, pid=30605 [2019-06-18 15:58:34,429] {base_task_runner.py:101} INFO - Job 413: Subtask test_wordcount [2019-06-18 15:58:34,429] {default_celery.py:90} WARNING - You have configured a result_backend of redis://192.168.20.17/1, it is highly recommended to use an alternative result_backend (i.e. a database). [2019-06-18 15:58:34,430] {base_task_runner.py:101} INFO - Job 413: Subtask test_wordcount [2019-06-18 15:58:34,430] {__init__.py:51} INFO - Using executor CeleryExecutor [2019-06-18 15:58:34,704] {base_task_runner.py:101} INFO - Job 413: Subtask test_wordcount [2019-06-18 15:58:34,704] {__init__.py:305} INFO - Filling up the DagBag from /opt/airflow/dags/wordcount.py [2019-06-18 15:58:34,754] {base_task_runner.py:101} INFO - Job 413: Subtask test_wordcount [2019-06-18 15:58:34,754] {cli.py:517} INFO - Running on host dc07 [2019-06-18 15:58:34,875] {logging_mixin.py:95} INFO - [2019-06-18 15:58:34,875] {base_hook.py:83} INFO - Using connection to: id: spark_default. Host: yarn, Port: None, Schema: None, Login: None, Password: None, extra: {'master': 'yarn', 'deploy-mode': 'cluster', 'queue': 'data', 'env_vars': {'HADOOP_USER_NAME': 'hdfs'}, 'spark_home': '/opt/cloudera/parcels/CDH/lib/spark/'} [2019-06-18 15:58:34,876] {logging_mixin.py:95} INFO - [2019-06-18 15:58:34,876] {spark_submit_hook.py:295} INFO - Spark-Submit cmd: ['spark-submit', '--master', 'yarn', '--py-files', '/opt/airflow/dags/jobs.zip,/opt/airflow/dags/libs.zip', '--num-executors', '1', '--executor-cores', '1', '--executor-memory', '1g', '--driver-memory', '1g', '--name', 'test_wordcount', '--queue', 'data', '--deploy-mode', 'cluster', '/opt/airflow/dags/main.py', '--job', 'wordcount', '--job-args', 'input_path=/test/words.txt', 'output_path=/test/wordcount.csv'] [2019-06-18 15:58:41,557] {logging_mixin.py:95} INFO - [2019-06-18 15:58:41,557] {spark_submit_hook.py:400} INFO - Identified spark driver id: application_1560762064551_0032 [2019-06-18 15:58:44,791] {logging_mixin.py:95} INFO - [2019-06-18 15:58:44,790] {spark_submit_hook.py:400} INFO - Identified spark driver id: application_1560762064551_0032 [2019-06-18 15:58:44,929] {logging_mixin.py:95} INFO - [2019-06-18 15:58:44,929] {spark_submit_hook.py:400} INFO - Identified spark driver id: application_1560762064551_0032 [2019-06-18 15:58:45,061] {logging_mixin.py:95} INFO - [2019-06-18 15:58:45,061] {spark_submit_hook.py:400} INFO - Identified spark driver id: application_1560762064551_0032 [2019-06-18 15:58:45,186] {logging_mixin.py:95} INFO - [2019-06-18 15:58:45,186] {spark_submit_hook.py:400} INFO - Identified spark driver id: application_1560762064551_0032 [2019-06-18
[jira] [Updated] (AIRFLOW-4801) trigger dag got when dag is zipped
[ https://issues.apache.org/jira/browse/AIRFLOW-4801?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] kasim updated AIRFLOW-4801: --- Description: I have dags.zip like below : {code:java} Archive: dags.zip extracting: jobs.zip extracting: libs.zip inflating: log4j.properties inflating: main.py# spark-summit python file creating: resources/ creating: resources/result/ creating: resources/train/ inflating: resources/train/sale_count.parquet inflating: resources/words.txt inflating: salecount.py # a dag contain SparkSubmitOperator inflating: test.py# very simple one as airflow example {code} Always got error when click `trigger dag test.py` {code:java} / ( () ) \___ /( ( ( ) _)) ) )\ (( ( )() ) ( ) ) ((/ ( _( ) ( _) ) ( () ) ) ( ( ( (_) ((( ) .((_ ) . )_ ( ( )( ( )) ) . ) ( ) ( ( ( ( ) ( _ ( _) ). ) . ) ) ( ) ( ( ( ) ( ) ( )) ) _)( ) ) ) ( ( ( \ ) ((_ ( ) ( ) ) ) ) )) ( ) ( ( ( ( (_ ( ) ( _) ) ( ) ) ) ( ( ( ( ( ) (_ ) ) ) _) ) _( ( ) (( ( )(( _) _) _(_ ( (_ ) (_((__(_(__(( ( ( | ) ) ) )_))__))_)___) ((__)\\||lll|l||/// \_)) ( /(/ ( ) ) )\ ) (( ( ( | | ) ) )\ ) ( /(| / ( )) ) ) )) ) ( ( _(|)_) ) ( ||\(|(|)|/|| ) (|(||(||)) ( //|/l|||)|\\ \ ) (/ / // /|//\\ \ \ \ _) --- Node: dc09 --- Traceback (most recent call last): File "/opt/anaconda3/envs/airflow/lib/python3.6/site-packages/flask/app.py", line 2311, in wsgi_app response = self.full_dispatch_request() File "/opt/anaconda3/envs/airflow/lib/python3.6/site-packages/flask/app.py", line 1834, in full_dispatch_request rv = self.handle_user_exception(e) File "/opt/anaconda3/envs/airflow/lib/python3.6/site-packages/flask/app.py", line 1737, in handle_user_exception reraise(exc_type, exc_value, tb) File "/opt/anaconda3/envs/airflow/lib/python3.6/site-packages/flask/_compat.py", line 36, in reraise raise value File "/opt/anaconda3/envs/airflow/lib/python3.6/site-packages/flask/app.py", line 1832, in full_dispatch_request rv = self.dispatch_request() File "/opt/anaconda3/envs/airflow/lib/python3.6/site-packages/flask/app.py", line 1818, in dispatch_request return self.view_functions[rule.endpoint](**req.view_args) File "/opt/anaconda3/envs/airflow/lib/python3.6/site-packages/flask_admin/base.py", line 69, in inner return self._run_view(f, *args, **kwargs) File "/opt/anaconda3/envs/airflow/lib/python3.6/site-packages/flask_admin/base.py", line 368, in _run_view return fn(self, *args, **kwargs) File "/opt/anaconda3/envs/airflow/lib/python3.6/site-packages/flask_login/utils.py", line 258, in decorated_view return func(*args, **kwargs) File "/opt/anaconda3/envs/airflow/lib/python3.6/site-packages/airflow/www/utils.py", line 275, in wrapper return f(*args, **kwargs) File "/opt/anaconda3/envs/airflow/lib/python3.6/site-packages/airflow/www/utils.py", line 322, in wrapper return f(*args, **kwargs) File "/opt/anaconda3/envs/airflow/lib/python3.6/site-packages/airflow/utils/db.py", line 73, in wrapper return func(*args, **kwargs) File "/opt/anaconda3/envs/airflow/lib/python3.6/site-packages/airflow/www/views.py", line 1138, in trigger external_trigger=True File "/opt/anaconda3/envs/airflow/lib/python3.6/site-packages/airflow/utils/db.py", line 73, in wrapper return func(*args, **kwargs) File "/opt/anaconda3/envs/airflow/lib/python3.6/site-packages/airflow/models/__init__.py", line 2925, in create_dagrun return self.get_dag().create_dagrun(run_id=run_id, AttributeError: 'NoneType' object has no attribute 'create_dagrun' {code} If I extracted test.py and delete dags.zip , `trigger dag test.py` would be fine . If I extract all file in dags.zip and delete dags.zip , `trigger dag test.py` got wrong too . was: I have dags.zip like below : {code:java} Archive: dags.zip extracting: jobs.zip extracting: libs.zip inflating: log4j.properties inflating: main.py creating: resources/ creating: resources/result/ creating: resources/train/ inflating: resources/train/sale_count.parquet inflating:
[jira] [Updated] (AIRFLOW-4801) trigger dag got when dag is zipped
[ https://issues.apache.org/jira/browse/AIRFLOW-4801?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] kasim updated AIRFLOW-4801: --- Description: I have dags.zip like below : {code:java} Archive: dags.zip extracting: jobs.zip extracting: libs.zip inflating: log4j.properties inflating: main.py creating: resources/ creating: resources/result/ creating: resources/train/ inflating: resources/train/sale_count.parquet inflating: resources/words.txt inflating: salecount.py inflating: test.py {code} Always got error when click `trigger dag` {code:java} / ( () ) \___ /( ( ( ) _)) ) )\ (( ( )() ) ( ) ) ((/ ( _( ) ( _) ) ( () ) ) ( ( ( (_) ((( ) .((_ ) . )_ ( ( )( ( )) ) . ) ( ) ( ( ( ( ) ( _ ( _) ). ) . ) ) ( ) ( ( ( ) ( ) ( )) ) _)( ) ) ) ( ( ( \ ) ((_ ( ) ( ) ) ) ) )) ( ) ( ( ( ( (_ ( ) ( _) ) ( ) ) ) ( ( ( ( ( ) (_ ) ) ) _) ) _( ( ) (( ( )(( _) _) _(_ ( (_ ) (_((__(_(__(( ( ( | ) ) ) )_))__))_)___) ((__)\\||lll|l||/// \_)) ( /(/ ( ) ) )\ ) (( ( ( | | ) ) )\ ) ( /(| / ( )) ) ) )) ) ( ( _(|)_) ) ( ||\(|(|)|/|| ) (|(||(||)) ( //|/l|||)|\\ \ ) (/ / // /|//\\ \ \ \ _) --- Node: dc09 --- Traceback (most recent call last): File "/opt/anaconda3/envs/airflow/lib/python3.6/site-packages/flask/app.py", line 2311, in wsgi_app response = self.full_dispatch_request() File "/opt/anaconda3/envs/airflow/lib/python3.6/site-packages/flask/app.py", line 1834, in full_dispatch_request rv = self.handle_user_exception(e) File "/opt/anaconda3/envs/airflow/lib/python3.6/site-packages/flask/app.py", line 1737, in handle_user_exception reraise(exc_type, exc_value, tb) File "/opt/anaconda3/envs/airflow/lib/python3.6/site-packages/flask/_compat.py", line 36, in reraise raise value File "/opt/anaconda3/envs/airflow/lib/python3.6/site-packages/flask/app.py", line 1832, in full_dispatch_request rv = self.dispatch_request() File "/opt/anaconda3/envs/airflow/lib/python3.6/site-packages/flask/app.py", line 1818, in dispatch_request return self.view_functions[rule.endpoint](**req.view_args) File "/opt/anaconda3/envs/airflow/lib/python3.6/site-packages/flask_admin/base.py", line 69, in inner return self._run_view(f, *args, **kwargs) File "/opt/anaconda3/envs/airflow/lib/python3.6/site-packages/flask_admin/base.py", line 368, in _run_view return fn(self, *args, **kwargs) File "/opt/anaconda3/envs/airflow/lib/python3.6/site-packages/flask_login/utils.py", line 258, in decorated_view return func(*args, **kwargs) File "/opt/anaconda3/envs/airflow/lib/python3.6/site-packages/airflow/www/utils.py", line 275, in wrapper return f(*args, **kwargs) File "/opt/anaconda3/envs/airflow/lib/python3.6/site-packages/airflow/www/utils.py", line 322, in wrapper return f(*args, **kwargs) File "/opt/anaconda3/envs/airflow/lib/python3.6/site-packages/airflow/utils/db.py", line 73, in wrapper return func(*args, **kwargs) File "/opt/anaconda3/envs/airflow/lib/python3.6/site-packages/airflow/www/views.py", line 1138, in trigger external_trigger=True File "/opt/anaconda3/envs/airflow/lib/python3.6/site-packages/airflow/utils/db.py", line 73, in wrapper return func(*args, **kwargs) File "/opt/anaconda3/envs/airflow/lib/python3.6/site-packages/airflow/models/__init__.py", line 2925, in create_dagrun return self.get_dag().create_dagrun(run_id=run_id, AttributeError: 'NoneType' object has no attribute 'create_dagrun' {code} If I extracted test.py and delete dags.zip , `trigger dag ` would be fine . If I extract all file in dags.zip and delete dags.zip , got wrong too . was: I found this only happened with dags inside zip file , after I extract them, this problem gone . {code:java} / ( () ) \___ /( ( ( ) _)) ) )\ (( ( )() ) ( ) ) ((/ ( _( ) ( _) ) ( () ) ) ( ( ( (_) ((( ) .((_ ) . )_ ( ( )( ( )) ) .
[jira] [Created] (AIRFLOW-4801) trigger dag got when dag is zipped
kasim created AIRFLOW-4801: -- Summary: trigger dag got when dag is zipped Key: AIRFLOW-4801 URL: https://issues.apache.org/jira/browse/AIRFLOW-4801 Project: Apache Airflow Issue Type: Bug Components: DagRun Affects Versions: 1.10.3 Reporter: kasim I found this only happened with dags inside zip file , after I extract them, this problem gone . {code:java} / ( () ) \___ /( ( ( ) _)) ) )\ (( ( )() ) ( ) ) ((/ ( _( ) ( _) ) ( () ) ) ( ( ( (_) ((( ) .((_ ) . )_ ( ( )( ( )) ) . ) ( ) ( ( ( ( ) ( _ ( _) ). ) . ) ) ( ) ( ( ( ) ( ) ( )) ) _)( ) ) ) ( ( ( \ ) ((_ ( ) ( ) ) ) ) )) ( ) ( ( ( ( (_ ( ) ( _) ) ( ) ) ) ( ( ( ( ( ) (_ ) ) ) _) ) _( ( ) (( ( )(( _) _) _(_ ( (_ ) (_((__(_(__(( ( ( | ) ) ) )_))__))_)___) ((__)\\||lll|l||/// \_)) ( /(/ ( ) ) )\ ) (( ( ( | | ) ) )\ ) ( /(| / ( )) ) ) )) ) ( ( _(|)_) ) ( ||\(|(|)|/|| ) (|(||(||)) ( //|/l|||)|\\ \ ) (/ / // /|//\\ \ \ \ _) --- Node: dc09 --- Traceback (most recent call last): File "/opt/anaconda3/envs/airflow/lib/python3.6/site-packages/flask/app.py", line 2311, in wsgi_app response = self.full_dispatch_request() File "/opt/anaconda3/envs/airflow/lib/python3.6/site-packages/flask/app.py", line 1834, in full_dispatch_request rv = self.handle_user_exception(e) File "/opt/anaconda3/envs/airflow/lib/python3.6/site-packages/flask/app.py", line 1737, in handle_user_exception reraise(exc_type, exc_value, tb) File "/opt/anaconda3/envs/airflow/lib/python3.6/site-packages/flask/_compat.py", line 36, in reraise raise value File "/opt/anaconda3/envs/airflow/lib/python3.6/site-packages/flask/app.py", line 1832, in full_dispatch_request rv = self.dispatch_request() File "/opt/anaconda3/envs/airflow/lib/python3.6/site-packages/flask/app.py", line 1818, in dispatch_request return self.view_functions[rule.endpoint](**req.view_args) File "/opt/anaconda3/envs/airflow/lib/python3.6/site-packages/flask_admin/base.py", line 69, in inner return self._run_view(f, *args, **kwargs) File "/opt/anaconda3/envs/airflow/lib/python3.6/site-packages/flask_admin/base.py", line 368, in _run_view return fn(self, *args, **kwargs) File "/opt/anaconda3/envs/airflow/lib/python3.6/site-packages/flask_login/utils.py", line 258, in decorated_view return func(*args, **kwargs) File "/opt/anaconda3/envs/airflow/lib/python3.6/site-packages/airflow/www/utils.py", line 275, in wrapper return f(*args, **kwargs) File "/opt/anaconda3/envs/airflow/lib/python3.6/site-packages/airflow/www/utils.py", line 322, in wrapper return f(*args, **kwargs) File "/opt/anaconda3/envs/airflow/lib/python3.6/site-packages/airflow/utils/db.py", line 73, in wrapper return func(*args, **kwargs) File "/opt/anaconda3/envs/airflow/lib/python3.6/site-packages/airflow/www/views.py", line 1138, in trigger external_trigger=True File "/opt/anaconda3/envs/airflow/lib/python3.6/site-packages/airflow/utils/db.py", line 73, in wrapper return func(*args, **kwargs) File "/opt/anaconda3/envs/airflow/lib/python3.6/site-packages/airflow/models/__init__.py", line 2925, in create_dagrun return self.get_dag().create_dagrun(run_id=run_id, AttributeError: 'NoneType' object has no attribute 'create_dagrun' {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)