Repository: incubator-airflow Updated Branches: refs/heads/master ce362c312 -> adcccfa26
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/851adc55/tests/__init__.py ---------------------------------------------------------------------- diff --git a/tests/__init__.py b/tests/__init__.py index ca8150b..4a79d0f 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -1,9 +1,9 @@ from __future__ import absolute_import from .configuration import * +from .contrib import * from .core import * from .jobs import * from .models import * from .operators import * -from .contrib import * from .utils import * http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/851adc55/tests/core.py ---------------------------------------------------------------------- diff --git a/tests/core.py b/tests/core.py index af791e3..5e6a4fd 100644 --- a/tests/core.py +++ b/tests/core.py @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + from __future__ import print_function import doctest @@ -289,7 +290,7 @@ class CoreTest(unittest.TestCase): assert hash(self.dag) != hash(dag_subclass) def test_time_sensor(self): - t = operators.TimeSensor( + t = operators.sensors.TimeSensor( task_id='time_sensor_check', target_time=time(0), dag=self.dag) @@ -380,21 +381,22 @@ class CoreTest(unittest.TestCase): t.dry_run() def test_sqlite(self): - t = operators.SqliteOperator( + import airflow.operators.sqlite_operator + t = airflow.operators.sqlite_operator.SqliteOperator( task_id='time_sqlite', sql="CREATE TABLE IF NOT EXISTS unitest (dummy VARCHAR(20))", dag=self.dag) t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) def test_timedelta_sensor(self): - t = operators.TimeDeltaSensor( + t = operators.sensors.TimeDeltaSensor( task_id='timedelta_sensor_check', delta=timedelta(seconds=2), dag=self.dag) t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) def test_external_task_sensor(self): - t = operators.ExternalTaskSensor( + t = operators.sensors.ExternalTaskSensor( task_id='test_external_task_sensor_check', external_dag_id=TEST_DAG_ID, external_task_id='time_sensor_check', @@ -402,7 +404,7 @@ class CoreTest(unittest.TestCase): t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) def test_external_task_sensor_delta(self): - t = operators.ExternalTaskSensor( + t = operators.sensors.ExternalTaskSensor( task_id='test_external_task_sensor_check_delta', external_dag_id=TEST_DAG_ID, external_task_id='time_sensor_check', @@ -1077,97 +1079,6 @@ class WebLdapAuthTest(unittest.TestCase): session.close() configuration.conf.set("webserver", "authenticate", "False") - -if 'MySqlOperator' in dir(operators): - # Only testing if the operator is installed - class MySqlTest(unittest.TestCase): - def setUp(self): - configuration.test_mode() - args = { - 'owner': 'airflow', - 'mysql_conn_id': 'airflow_db', - 'start_date': DEFAULT_DATE - } - dag = DAG(TEST_DAG_ID, default_args=args) - self.dag = dag - - def mysql_operator_test(self): - sql = """ - CREATE TABLE IF NOT EXISTS test_airflow ( - dummy VARCHAR(50) - ); - """ - t = operators.MySqlOperator( - task_id='basic_mysql', - sql=sql, - mysql_conn_id='airflow_db', - dag=self.dag) - t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) - - def mysql_operator_test_multi(self): - sql = [ - "TRUNCATE TABLE test_airflow", - "INSERT INTO test_airflow VALUES ('X')", - ] - t = operators.MySqlOperator( - task_id='mysql_operator_test_multi', - mysql_conn_id='airflow_db', - sql=sql, dag=self.dag) - t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) - - def test_mysql_to_mysql(self): - sql = "SELECT * FROM INFORMATION_SCHEMA.TABLES LIMIT 100;" - t = operators.GenericTransfer( - task_id='test_m2m', - preoperator=[ - "DROP TABLE IF EXISTS test_mysql_to_mysql", - "CREATE TABLE IF NOT EXISTS " - "test_mysql_to_mysql LIKE INFORMATION_SCHEMA.TABLES" - ], - source_conn_id='airflow_db', - destination_conn_id='airflow_db', - destination_table="test_mysql_to_mysql", - sql=sql, - dag=self.dag) - t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) - - def test_sql_sensor(self): - t = operators.SqlSensor( - task_id='sql_sensor_check', - conn_id='mysql_default', - sql="SELECT count(1) FROM INFORMATION_SCHEMA.TABLES", - dag=self.dag) - t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) - -if 'PostgresOperator' in dir(operators): - # Only testing if the operator is installed - class PostgresTest(unittest.TestCase): - def setUp(self): - configuration.test_mode() - args = {'owner': 'airflow', 'start_date': DEFAULT_DATE} - dag = DAG(TEST_DAG_ID, default_args=args) - self.dag = dag - - def postgres_operator_test(self): - sql = """ - CREATE TABLE IF NOT EXISTS test_airflow ( - dummy VARCHAR(50) - ); - """ - t = operators.PostgresOperator( - task_id='basic_postgres', sql=sql, dag=self.dag) - t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) - - autocommitTask = operators.PostgresOperator( - task_id='basic_postgres_with_autocommit', - sql=sql, - dag=self.dag, - autocommit=True) - autocommitTask.run( - start_date=DEFAULT_DATE, - end_date=DEFAULT_DATE, - force=True) - class FakeSession(object): def __init__(self): from requests import Response @@ -1213,7 +1124,7 @@ class HttpOpSensorTest(unittest.TestCase): @mock.patch('requests.Session', FakeSession) def test_sensor(self): - sensor = operators.HttpSensor( + sensor = operators.sensors.HttpSensor( task_id='http_sensor_check', conn_id='http_default', endpoint='/search', @@ -1235,15 +1146,6 @@ class FakeWebHDFSHook(object): def check_for_path(self, hdfs_path): return hdfs_path -class WebHdfsSensorTest(unittest.TestCase): - - @mock.patch('airflow.hooks.WebHDFSHook', FakeWebHDFSHook) - def test_poke(self): - s = operators.WebHdfsSensor(filepath='fakepath', - task_id='webhdfs_sensor_check', - owner='webhdfs') - assert s.poke({}) == 'fakepath' - class ConnectionTest(unittest.TestCase): def setUp(self): configuration.test_mode() @@ -1462,231 +1364,5 @@ class EmailSmtpTest(unittest.TestCase): assert not mock_smtp.called assert not mock_smtp_ssl.called - -if 'HiveOperator' in dir(operators): - class HiveServer2Test(unittest.TestCase): - def setUp(self): - configuration.test_mode() - - def test_select_conn(self): - from airflow.hooks.hive_hooks import HiveServer2Hook - sql = "select 1" - hook = HiveServer2Hook() - hook.get_records(sql) - - def test_multi_statements(self): - from airflow.hooks.hive_hooks import HiveServer2Hook - sqls = [ - "CREATE TABLE IF NOT EXISTS test_multi_statements (i INT)", - "DROP TABLE test_multi_statements", - ] - hook = HiveServer2Hook() - hook.get_records(sqls) - - def test_get_metastore_databases(self): - if six.PY2: - from airflow.hooks.hive_hooks import HiveMetastoreHook - hook = HiveMetastoreHook() - hook.get_databases() - - def test_to_csv(self): - from airflow.hooks.hive_hooks import HiveServer2Hook - sql = "select 1" - hook = HiveServer2Hook() - hook.to_csv(hql=sql, csv_filepath="/tmp/test_to_csv") - -if 'MySqlOperator' in dir(operators) and 'HiveOperator' in dir(operators): - class TransferTests(unittest.TestCase): - cluster = None - - def setUp(self): - configuration.test_mode() - args = {'owner': 'airflow', 'start_date': DEFAULT_DATE_ISO} - dag = DAG(TEST_DAG_ID, default_args=args) - self.dag = dag - - def test_clear(self): - self.dag.clear(start_date=DEFAULT_DATE, end_date=datetime.now()) - - def test_mysql_to_hive(self): - # import airflow.operators - from airflow.operators.mysql_to_hive import MySqlToHiveTransfer - sql = "SELECT * FROM baby_names LIMIT 1000;" - t = MySqlToHiveTransfer( - task_id='test_m2h', - mysql_conn_id='airflow_ci', - hive_cli_conn_id='beeline_default', - sql=sql, - hive_table='test_mysql_to_hive', - recreate=True, - delimiter=",", - dag=self.dag) - t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) - - def test_mysql_to_hive_partition(self): - from airflow.operators.mysql_to_hive import MySqlToHiveTransfer - sql = "SELECT * FROM baby_names LIMIT 1000;" - t = MySqlToHiveTransfer( - task_id='test_m2h', - mysql_conn_id='airflow_ci', - hive_cli_conn_id='beeline_default', - sql=sql, - hive_table='test_mysql_to_hive_part', - partition={'ds': DEFAULT_DATE_DS}, - recreate=False, - create=True, - delimiter=",", - dag=self.dag) - t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) - -if 'AIRFLOW_RUNALL_TESTS' in os.environ: - - class HivePrestoTest(unittest.TestCase): - - def setUp(self): - configuration.test_mode() - args = {'owner': 'airflow', 'start_date': DEFAULT_DATE} - dag = DAG(TEST_DAG_ID, default_args=args) - self.dag = dag - self.hql = """ - USE airflow; - DROP TABLE IF EXISTS static_babynames_partitioned; - CREATE TABLE IF NOT EXISTS static_babynames_partitioned ( - state string, - year string, - name string, - gender string, - num int) - PARTITIONED BY (ds string); - INSERT OVERWRITE TABLE static_babynames_partitioned - PARTITION(ds='{{ ds }}') - SELECT state, year, name, gender, num FROM static_babynames; - """ - - def test_hive(self): - t = operators.HiveOperator( - task_id='basic_hql', hql=self.hql, dag=self.dag) - t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) - - def test_hive_dryrun(self): - t = operators.HiveOperator( - task_id='basic_hql', hql=self.hql, dag=self.dag) - t.dry_run() - - def test_beeline(self): - t = operators.HiveOperator( - task_id='beeline_hql', hive_cli_conn_id='beeline_default', - hql=self.hql, dag=self.dag) - t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) - - def test_presto(self): - sql = """ - SELECT count(1) FROM airflow.static_babynames_partitioned; - """ - t = operators.PrestoCheckOperator( - task_id='presto_check', sql=sql, dag=self.dag) - t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) - - def test_presto_to_mysql(self): - t = operators.PrestoToMySqlTransfer( - task_id='presto_to_mysql_check', - sql=""" - SELECT name, count(*) as ccount - FROM airflow.static_babynames - GROUP BY name - """, - mysql_table='test_static_babynames', - mysql_preoperator='TRUNCATE TABLE test_static_babynames;', - dag=self.dag) - t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) - - def test_presto_to_mysql(self): - t = operators.PrestoToMySqlTransfer( - task_id='presto_to_mysql_check', - sql=""" - SELECT name, count(*) as ccount - FROM airflow.static_babynames - GROUP BY name - """, - mysql_table='test_static_babynames', - mysql_preoperator='TRUNCATE TABLE test_static_babynames;', - dag=self.dag) - t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) - - def test_hdfs_sensor(self): - t = operators.HdfsSensor( - task_id='hdfs_sensor_check', - filepath='hdfs://user/hive/warehouse/airflow.db/static_babynames', - dag=self.dag) - t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) - - def test_webhdfs_sensor(self): - t = operators.WebHdfsSensor( - task_id='webhdfs_sensor_check', - filepath='hdfs://user/hive/warehouse/airflow.db/static_babynames', - timeout=120, - dag=self.dag) - t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) - - def test_sql_sensor(self): - t = operators.SqlSensor( - task_id='hdfs_sensor_check', - conn_id='presto_default', - sql="SELECT 'x' FROM airflow.static_babynames LIMIT 1;", - dag=self.dag) - t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) - - def test_hive_stats(self): - t = operators.HiveStatsCollectionOperator( - task_id='hive_stats_check', - table="airflow.static_babynames_partitioned", - partition={'ds': DEFAULT_DATE_DS}, - dag=self.dag) - t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) - - def test_hive_partition_sensor(self): - t = operators.HivePartitionSensor( - task_id='hive_partition_check', - table='airflow.static_babynames_partitioned', - dag=self.dag) - t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) - - def test_hive_metastore_sql_sensor(self): - t = operators.MetastorePartitionSensor( - task_id='hive_partition_check', - table='airflow.static_babynames_partitioned', - partition_name='ds={}'.format(DEFAULT_DATE_DS), - dag=self.dag) - t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) - - def test_hive2samba(self): - if 'Hive2SambaOperator' in dir(operators): - t = operators.Hive2SambaOperator( - task_id='hive2samba_check', - samba_conn_id='tableau_samba', - hql="SELECT * FROM airflow.static_babynames LIMIT 10000", - destination_filepath='test_airflow.csv', - dag=self.dag) - t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) - - def test_hive_to_mysql(self): - t = operators.HiveToMySqlTransfer( - mysql_conn_id='airflow_db', - task_id='hive_to_mysql_check', - create=True, - sql=""" - SELECT name - FROM airflow.static_babynames - LIMIT 100 - """, - mysql_table='test_static_babynames', - mysql_preoperator=[ - 'DROP TABLE IF EXISTS test_static_babynames;', - 'CREATE TABLE test_static_babynames (name VARCHAR(500))', - ], - dag=self.dag) - t.clear(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) - t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) - if __name__ == '__main__': unittest.main() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/851adc55/tests/operators/__init__.py ---------------------------------------------------------------------- diff --git a/tests/operators/__init__.py b/tests/operators/__init__.py index 98a17a7..63ff2a0 100644 --- a/tests/operators/__init__.py +++ b/tests/operators/__init__.py @@ -1,2 +1,19 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + from .docker_operator import * from .subdag_operator import * +from .operators import * +from .sensors import * +from .hive_operator import * http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/851adc55/tests/operators/hive_operator.py ---------------------------------------------------------------------- diff --git a/tests/operators/hive_operator.py b/tests/operators/hive_operator.py new file mode 100644 index 0000000..202adcf --- /dev/null +++ b/tests/operators/hive_operator.py @@ -0,0 +1,209 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function + +import datetime +import os +import unittest +import six + +from airflow import DAG, configuration, operators, utils +configuration.test_mode() + +import os +import unittest + + +DEFAULT_DATE = datetime.datetime(2015, 1, 1) +DEFAULT_DATE_ISO = DEFAULT_DATE.isoformat() +DEFAULT_DATE_DS = DEFAULT_DATE_ISO[:10] + + +if 'AIRFLOW_RUNALL_TESTS' in os.environ: + + import airflow.hooks.hive_hooks + import airflow.operators.presto_to_mysql + + class HiveServer2Test(unittest.TestCase): + def setUp(self): + configuration.test_mode() + + def test_select_conn(self): + from airflow.hooks.hive_hooks import HiveServer2Hook + sql = "select 1" + hook = HiveServer2Hook() + hook.get_records(sql) + + def test_multi_statements(self): + from airflow.hooks.hive_hooks import HiveServer2Hook + sqls = [ + "CREATE TABLE IF NOT EXISTS test_multi_statements (i INT)", + "DROP TABLE test_multi_statements", + ] + hook = HiveServer2Hook() + hook.get_records(sqls) + + def test_get_metastore_databases(self): + if six.PY2: + from airflow.hooks.hive_hooks import HiveMetastoreHook + hook = HiveMetastoreHook() + hook.get_databases() + + def test_to_csv(self): + from airflow.hooks.hive_hooks import HiveServer2Hook + sql = "select 1" + hook = HiveServer2Hook() + hook.to_csv(hql=sql, csv_filepath="/tmp/test_to_csv") + + class HivePrestoTest(unittest.TestCase): + + def setUp(self): + configuration.test_mode() + args = {'owner': 'airflow', 'start_date': DEFAULT_DATE} + dag = DAG('test_dag_id', default_args=args) + self.dag = dag + self.hql = """ + USE airflow; + DROP TABLE IF EXISTS static_babynames_partitioned; + CREATE TABLE IF NOT EXISTS static_babynames_partitioned ( + state string, + year string, + name string, + gender string, + num int) + PARTITIONED BY (ds string); + INSERT OVERWRITE TABLE static_babynames_partitioned + PARTITION(ds='{{ ds }}') + SELECT state, year, name, gender, num FROM static_babynames; + """ + + def test_hive(self): + import airflow.operators.hive_operator + t = operators.hive_operator.HiveOperator( + task_id='basic_hql', hql=self.hql, dag=self.dag) + t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) + + def test_hive_dryrun(self): + import airflow.operators.hive_operator + t = operators.hive_operator.HiveOperator( + task_id='basic_hql', hql=self.hql, dag=self.dag) + t.dry_run() + + def test_beeline(self): + import airflow.operators.hive_operator + t = operators.hive_operator.HiveOperator( + task_id='beeline_hql', hive_cli_conn_id='beeline_default', + hql=self.hql, dag=self.dag) + t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) + + def test_presto(self): + sql = """ + SELECT count(1) FROM airflow.static_babynames_partitioned; + """ + import airflow.operators.presto_check_operator + t = operators.presto_check_operator.PrestoCheckOperator( + task_id='presto_check', sql=sql, dag=self.dag) + t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) + + def test_presto_to_mysql(self): + import airflow.operators.presto_to_mysql + t = operators.presto_to_mysql.PrestoToMySqlTransfer( + task_id='presto_to_mysql_check', + sql=""" + SELECT name, count(*) as ccount + FROM airflow.static_babynames + GROUP BY name + """, + mysql_table='test_static_babynames', + mysql_preoperator='TRUNCATE TABLE test_static_babynames;', + dag=self.dag) + t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) + + def test_hdfs_sensor(self): + t = operators.sensors.HdfsSensor( + task_id='hdfs_sensor_check', + filepath='hdfs://user/hive/warehouse/airflow.db/static_babynames', + dag=self.dag) + t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) + + def test_webhdfs_sensor(self): + t = operators.sensors.WebHdfsSensor( + task_id='webhdfs_sensor_check', + filepath='hdfs://user/hive/warehouse/airflow.db/static_babynames', + timeout=120, + dag=self.dag) + t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) + + def test_sql_sensor(self): + t = operators.sensors.SqlSensor( + task_id='hdfs_sensor_check', + conn_id='presto_default', + sql="SELECT 'x' FROM airflow.static_babynames LIMIT 1;", + dag=self.dag) + t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) + + def test_hive_stats(self): + import airflow.operators.hive_stats_operator + t = operators.hive_stats_operator.HiveStatsCollectionOperator( + task_id='hive_stats_check', + table="airflow.static_babynames_partitioned", + partition={'ds': DEFAULT_DATE_DS}, + dag=self.dag) + t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) + + def test_hive_partition_sensor(self): + t = operators.sensors.HivePartitionSensor( + task_id='hive_partition_check', + table='airflow.static_babynames_partitioned', + dag=self.dag) + t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) + + def test_hive_metastore_sql_sensor(self): + t = operators.sensors.MetastorePartitionSensor( + task_id='hive_partition_check', + table='airflow.static_babynames_partitioned', + partition_name='ds={}'.format(DEFAULT_DATE_DS), + dag=self.dag) + t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) + + def test_hive2samba(self): + import airflow.operators.hive_to_samba_operator + t = operators.hive_to_samba_operator.Hive2SambaOperator( + task_id='hive2samba_check', + samba_conn_id='tableau_samba', + hql="SELECT * FROM airflow.static_babynames LIMIT 10000", + destination_filepath='test_airflow.csv', + dag=self.dag) + t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) + + def test_hive_to_mysql(self): + import airflow.operators.hive_to_mysql + t = operators.hive_to_mysql.HiveToMySqlTransfer( + mysql_conn_id='airflow_db', + task_id='hive_to_mysql_check', + create=True, + sql=""" + SELECT name + FROM airflow.static_babynames + LIMIT 100 + """, + mysql_table='test_static_babynames', + mysql_preoperator=[ + 'DROP TABLE IF EXISTS test_static_babynames;', + 'CREATE TABLE test_static_babynames (name VARCHAR(500))', + ], + dag=self.dag) + t.clear(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) + t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/851adc55/tests/operators/operators.py ---------------------------------------------------------------------- diff --git a/tests/operators/operators.py b/tests/operators/operators.py new file mode 100644 index 0000000..d9bc0c2 --- /dev/null +++ b/tests/operators/operators.py @@ -0,0 +1,174 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function + +import datetime +import os +import unittest +import six + +from airflow import DAG, configuration, operators, utils +from airflow.utils.tests import skipUnlessImported +configuration.test_mode() + +import os +import unittest + + +DEFAULT_DATE = datetime.datetime(2015, 1, 1) +DEFAULT_DATE_ISO = DEFAULT_DATE.isoformat() +DEFAULT_DATE_DS = DEFAULT_DATE_ISO[:10] +TEST_DAG_ID = 'unit_test_dag' + + +@skipUnlessImported('airflow.operators.mysql_operator', 'MySqlOperator') +class MySqlTest(unittest.TestCase): + def setUp(self): + configuration.test_mode() + args = { + 'owner': 'airflow', + 'mysql_conn_id': 'airflow_db', + 'start_date': DEFAULT_DATE + } + dag = DAG(TEST_DAG_ID, default_args=args) + self.dag = dag + + def mysql_operator_test(self): + sql = """ + CREATE TABLE IF NOT EXISTS test_airflow ( + dummy VARCHAR(50) + ); + """ + import airflow.operators.mysql_operator + t = operators.mysql_operator.MySqlOperator( + task_id='basic_mysql', + sql=sql, + mysql_conn_id='airflow_db', + dag=self.dag) + t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) + + def mysql_operator_test_multi(self): + sql = [ + "TRUNCATE TABLE test_airflow", + "INSERT INTO test_airflow VALUES ('X')", + ] + import airflow.operators.mysql_operator + t = operators.mysql_operator.MySqlOperator( + task_id='mysql_operator_test_multi', + mysql_conn_id='airflow_db', + sql=sql, dag=self.dag) + t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) + + def test_mysql_to_mysql(self): + sql = "SELECT * FROM INFORMATION_SCHEMA.TABLES LIMIT 100;" + import airflow.operators.generic_transfer + t = operators.generic_transfer.GenericTransfer( + task_id='test_m2m', + preoperator=[ + "DROP TABLE IF EXISTS test_mysql_to_mysql", + "CREATE TABLE IF NOT EXISTS " + "test_mysql_to_mysql LIKE INFORMATION_SCHEMA.TABLES" + ], + source_conn_id='airflow_db', + destination_conn_id='airflow_db', + destination_table="test_mysql_to_mysql", + sql=sql, + dag=self.dag) + t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) + + def test_sql_sensor(self): + t = operators.sensors.SqlSensor( + task_id='sql_sensor_check', + conn_id='mysql_default', + sql="SELECT count(1) FROM INFORMATION_SCHEMA.TABLES", + dag=self.dag) + t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) + +@skipUnlessImported('airflow.operators.postgres_operator', 'PostgresOperator') +class PostgresTest(unittest.TestCase): + def setUp(self): + configuration.test_mode() + args = {'owner': 'airflow', 'start_date': DEFAULT_DATE} + dag = DAG(TEST_DAG_ID, default_args=args) + self.dag = dag + + def postgres_operator_test(self): + sql = """ + CREATE TABLE IF NOT EXISTS test_airflow ( + dummy VARCHAR(50) + ); + """ + import airflow.operators.postgres_operator + t = operators.postgres_operator.PostgresOperator( + task_id='basic_postgres', sql=sql, dag=self.dag) + t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) + + autocommitTask = operators.postgres_operator.PostgresOperator( + task_id='basic_postgres_with_autocommit', + sql=sql, + dag=self.dag, + autocommit=True) + autocommitTask.run( + start_date=DEFAULT_DATE, + end_date=DEFAULT_DATE, + force=True) + + +@skipUnlessImported('airflow.operators.hive_operator', 'HiveOperator') +@skipUnlessImported('airflow.operators.postgres_operator', 'PostgresOperator') +class TransferTests(unittest.TestCase): + cluster = None + + def setUp(self): + configuration.test_mode() + args = {'owner': 'airflow', 'start_date': DEFAULT_DATE_ISO} + dag = DAG(TEST_DAG_ID, default_args=args) + self.dag = dag + + def test_clear(self): + self.dag.clear( + start_date=DEFAULT_DATE, + end_date=datetime.datetime.now()) + + def test_mysql_to_hive(self): + # import airflow.operators + from airflow.operators.mysql_to_hive import MySqlToHiveTransfer + sql = "SELECT * FROM baby_names LIMIT 1000;" + t = MySqlToHiveTransfer( + task_id='test_m2h', + mysql_conn_id='airflow_ci', + hive_cli_conn_id='beeline_default', + sql=sql, + hive_table='test_mysql_to_hive', + recreate=True, + delimiter=",", + dag=self.dag) + t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) + + def test_mysql_to_hive_partition(self): + from airflow.operators.mysql_to_hive import MySqlToHiveTransfer + sql = "SELECT * FROM baby_names LIMIT 1000;" + t = MySqlToHiveTransfer( + task_id='test_m2h', + mysql_conn_id='airflow_ci', + hive_cli_conn_id='beeline_default', + sql=sql, + hive_table='test_mysql_to_hive_part', + partition={'ds': DEFAULT_DATE_DS}, + recreate=False, + create=True, + delimiter=",", + dag=self.dag) + t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/851adc55/tests/operators/sensor.py ---------------------------------------------------------------------- diff --git a/tests/operators/sensor.py b/tests/operators/sensor.py deleted file mode 100644 index 45d4b81..0000000 --- a/tests/operators/sensor.py +++ /dev/null @@ -1,39 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import os -import unittest - -from airflow.operators import HttpSensor -from airflow.exceptions import AirflowException - - -class HttpSensorTests(unittest.TestCase): - - def test_poke_exception(self): - """ - Exception occurs in poke function should not be ignored. - """ - def resp_check(resp): - raise AirflowException('AirflowException raised here!') - - task = HttpSensor( - task_id='http_sensor_poke_exception', - http_conn_id='http_default', - endpoint='', - params={}, - response_check=resp_check, - poke_interval=5) - with self.assertRaisesRegexp(AirflowException, 'AirflowException raised here!'): - task.execute(None) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/851adc55/tests/operators/sensors.py ---------------------------------------------------------------------- diff --git a/tests/operators/sensors.py b/tests/operators/sensors.py new file mode 100644 index 0000000..025790e --- /dev/null +++ b/tests/operators/sensors.py @@ -0,0 +1,39 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import unittest + +from airflow.operators.sensors import HttpSensor +from airflow.exceptions import AirflowException + + +class HttpSensorTests(unittest.TestCase): + + def test_poke_exception(self): + """ + Exception occurs in poke function should not be ignored. + """ + def resp_check(resp): + raise AirflowException('AirflowException raised here!') + + task = HttpSensor( + task_id='http_sensor_poke_exception', + http_conn_id='http_default', + endpoint='', + params={}, + response_check=resp_check, + poke_interval=5) + with self.assertRaisesRegexp(AirflowException, 'AirflowException raised here!'): + task.execute(None)