Repository: incubator-airflow
Updated Branches:
  refs/heads/master ce362c312 -> adcccfa26


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/851adc55/tests/__init__.py
----------------------------------------------------------------------
diff --git a/tests/__init__.py b/tests/__init__.py
index ca8150b..4a79d0f 100644
--- a/tests/__init__.py
+++ b/tests/__init__.py
@@ -1,9 +1,9 @@
 from __future__ import absolute_import
 
 from .configuration import *
+from .contrib import *
 from .core import *
 from .jobs import *
 from .models import *
 from .operators import *
-from .contrib import *
 from .utils import *

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/851adc55/tests/core.py
----------------------------------------------------------------------
diff --git a/tests/core.py b/tests/core.py
index af791e3..5e6a4fd 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -11,6 +11,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+
 from __future__ import print_function
 
 import doctest
@@ -289,7 +290,7 @@ class CoreTest(unittest.TestCase):
         assert hash(self.dag) != hash(dag_subclass)
 
     def test_time_sensor(self):
-        t = operators.TimeSensor(
+        t = operators.sensors.TimeSensor(
             task_id='time_sensor_check',
             target_time=time(0),
             dag=self.dag)
@@ -380,21 +381,22 @@ class CoreTest(unittest.TestCase):
         t.dry_run()
 
     def test_sqlite(self):
-        t = operators.SqliteOperator(
+        import airflow.operators.sqlite_operator
+        t = airflow.operators.sqlite_operator.SqliteOperator(
             task_id='time_sqlite',
             sql="CREATE TABLE IF NOT EXISTS unitest (dummy VARCHAR(20))",
             dag=self.dag)
         t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
 
     def test_timedelta_sensor(self):
-        t = operators.TimeDeltaSensor(
+        t = operators.sensors.TimeDeltaSensor(
             task_id='timedelta_sensor_check',
             delta=timedelta(seconds=2),
             dag=self.dag)
         t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
 
     def test_external_task_sensor(self):
-        t = operators.ExternalTaskSensor(
+        t = operators.sensors.ExternalTaskSensor(
             task_id='test_external_task_sensor_check',
             external_dag_id=TEST_DAG_ID,
             external_task_id='time_sensor_check',
@@ -402,7 +404,7 @@ class CoreTest(unittest.TestCase):
         t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
 
     def test_external_task_sensor_delta(self):
-        t = operators.ExternalTaskSensor(
+        t = operators.sensors.ExternalTaskSensor(
             task_id='test_external_task_sensor_check_delta',
             external_dag_id=TEST_DAG_ID,
             external_task_id='time_sensor_check',
@@ -1077,97 +1079,6 @@ class WebLdapAuthTest(unittest.TestCase):
         session.close()
         configuration.conf.set("webserver", "authenticate", "False")
 
-
-if 'MySqlOperator' in dir(operators):
-    # Only testing if the operator is installed
-    class MySqlTest(unittest.TestCase):
-        def setUp(self):
-            configuration.test_mode()
-            args = {
-                'owner': 'airflow',
-                'mysql_conn_id': 'airflow_db',
-                'start_date': DEFAULT_DATE
-            }
-            dag = DAG(TEST_DAG_ID, default_args=args)
-            self.dag = dag
-
-        def mysql_operator_test(self):
-            sql = """
-            CREATE TABLE IF NOT EXISTS test_airflow (
-                dummy VARCHAR(50)
-            );
-            """
-            t = operators.MySqlOperator(
-                task_id='basic_mysql',
-                sql=sql,
-                mysql_conn_id='airflow_db',
-                dag=self.dag)
-            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
-
-        def mysql_operator_test_multi(self):
-            sql = [
-                "TRUNCATE TABLE test_airflow",
-                "INSERT INTO test_airflow VALUES ('X')",
-            ]
-            t = operators.MySqlOperator(
-                task_id='mysql_operator_test_multi',
-                mysql_conn_id='airflow_db',
-                sql=sql, dag=self.dag)
-            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
-
-        def test_mysql_to_mysql(self):
-            sql = "SELECT * FROM INFORMATION_SCHEMA.TABLES LIMIT 100;"
-            t = operators.GenericTransfer(
-                task_id='test_m2m',
-                preoperator=[
-                    "DROP TABLE IF EXISTS test_mysql_to_mysql",
-                    "CREATE TABLE IF NOT EXISTS "
-                    "test_mysql_to_mysql LIKE INFORMATION_SCHEMA.TABLES"
-                ],
-                source_conn_id='airflow_db',
-                destination_conn_id='airflow_db',
-                destination_table="test_mysql_to_mysql",
-                sql=sql,
-                dag=self.dag)
-            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
-
-        def test_sql_sensor(self):
-            t = operators.SqlSensor(
-                task_id='sql_sensor_check',
-                conn_id='mysql_default',
-                sql="SELECT count(1) FROM INFORMATION_SCHEMA.TABLES",
-                dag=self.dag)
-            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
-
-if 'PostgresOperator' in dir(operators):
-    # Only testing if the operator is installed
-    class PostgresTest(unittest.TestCase):
-        def setUp(self):
-            configuration.test_mode()
-            args = {'owner': 'airflow', 'start_date': DEFAULT_DATE}
-            dag = DAG(TEST_DAG_ID, default_args=args)
-            self.dag = dag
-
-        def postgres_operator_test(self):
-            sql = """
-            CREATE TABLE IF NOT EXISTS test_airflow (
-                dummy VARCHAR(50)
-            );
-            """
-            t = operators.PostgresOperator(
-                task_id='basic_postgres', sql=sql, dag=self.dag)
-            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
-
-            autocommitTask = operators.PostgresOperator(
-                task_id='basic_postgres_with_autocommit',
-                sql=sql,
-                dag=self.dag,
-                autocommit=True)
-            autocommitTask.run(
-                start_date=DEFAULT_DATE,
-                end_date=DEFAULT_DATE,
-                force=True)
-
 class FakeSession(object):
     def __init__(self):
         from requests import Response
@@ -1213,7 +1124,7 @@ class HttpOpSensorTest(unittest.TestCase):
 
     @mock.patch('requests.Session', FakeSession)
     def test_sensor(self):
-        sensor = operators.HttpSensor(
+        sensor = operators.sensors.HttpSensor(
             task_id='http_sensor_check',
             conn_id='http_default',
             endpoint='/search',
@@ -1235,15 +1146,6 @@ class FakeWebHDFSHook(object):
     def check_for_path(self, hdfs_path):
         return hdfs_path
 
-class WebHdfsSensorTest(unittest.TestCase):
-
-    @mock.patch('airflow.hooks.WebHDFSHook', FakeWebHDFSHook)
-    def test_poke(self):
-        s = operators.WebHdfsSensor(filepath='fakepath',
-                                    task_id='webhdfs_sensor_check',
-                                    owner='webhdfs')
-        assert s.poke({}) == 'fakepath'
-
 class ConnectionTest(unittest.TestCase):
     def setUp(self):
         configuration.test_mode()
@@ -1462,231 +1364,5 @@ class EmailSmtpTest(unittest.TestCase):
         assert not mock_smtp.called
         assert not mock_smtp_ssl.called
 
-
-if 'HiveOperator' in dir(operators):
-    class HiveServer2Test(unittest.TestCase):
-        def setUp(self):
-            configuration.test_mode()
-
-        def test_select_conn(self):
-            from airflow.hooks.hive_hooks import HiveServer2Hook
-            sql = "select 1"
-            hook = HiveServer2Hook()
-            hook.get_records(sql)
-
-        def test_multi_statements(self):
-            from airflow.hooks.hive_hooks import HiveServer2Hook
-            sqls = [
-                "CREATE TABLE IF NOT EXISTS test_multi_statements (i INT)",
-                "DROP TABLE test_multi_statements",
-            ]
-            hook = HiveServer2Hook()
-            hook.get_records(sqls)
-
-        def test_get_metastore_databases(self):
-            if six.PY2:
-                from airflow.hooks.hive_hooks import HiveMetastoreHook
-                hook = HiveMetastoreHook()
-                hook.get_databases()
-
-        def test_to_csv(self):
-            from airflow.hooks.hive_hooks import HiveServer2Hook
-            sql = "select 1"
-            hook = HiveServer2Hook()
-            hook.to_csv(hql=sql, csv_filepath="/tmp/test_to_csv")
-
-if 'MySqlOperator' in dir(operators) and 'HiveOperator' in dir(operators):
-    class TransferTests(unittest.TestCase):
-        cluster = None
-
-        def setUp(self):
-            configuration.test_mode()
-            args = {'owner': 'airflow', 'start_date': DEFAULT_DATE_ISO}
-            dag = DAG(TEST_DAG_ID, default_args=args)
-            self.dag = dag
-
-        def test_clear(self):
-            self.dag.clear(start_date=DEFAULT_DATE, end_date=datetime.now())
-
-        def test_mysql_to_hive(self):
-            # import airflow.operators
-            from airflow.operators.mysql_to_hive import MySqlToHiveTransfer
-            sql = "SELECT * FROM baby_names LIMIT 1000;"
-            t = MySqlToHiveTransfer(
-                task_id='test_m2h',
-                mysql_conn_id='airflow_ci',
-                hive_cli_conn_id='beeline_default',
-                sql=sql,
-                hive_table='test_mysql_to_hive',
-                recreate=True,
-                delimiter=",",
-                dag=self.dag)
-            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
-
-        def test_mysql_to_hive_partition(self):
-            from airflow.operators.mysql_to_hive import MySqlToHiveTransfer
-            sql = "SELECT * FROM baby_names LIMIT 1000;"
-            t = MySqlToHiveTransfer(
-                task_id='test_m2h',
-                mysql_conn_id='airflow_ci',
-                hive_cli_conn_id='beeline_default',
-                sql=sql,
-                hive_table='test_mysql_to_hive_part',
-                partition={'ds': DEFAULT_DATE_DS},
-                recreate=False,
-                create=True,
-                delimiter=",",
-                dag=self.dag)
-            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
-
-if 'AIRFLOW_RUNALL_TESTS' in os.environ:
-
-    class HivePrestoTest(unittest.TestCase):
-
-        def setUp(self):
-            configuration.test_mode()
-            args = {'owner': 'airflow', 'start_date': DEFAULT_DATE}
-            dag = DAG(TEST_DAG_ID, default_args=args)
-            self.dag = dag
-            self.hql = """
-            USE airflow;
-            DROP TABLE IF EXISTS static_babynames_partitioned;
-            CREATE TABLE IF NOT EXISTS static_babynames_partitioned (
-                state string,
-                year string,
-                name string,
-                gender string,
-                num int)
-            PARTITIONED BY (ds string);
-            INSERT OVERWRITE TABLE static_babynames_partitioned
-                PARTITION(ds='{{ ds }}')
-            SELECT state, year, name, gender, num FROM static_babynames;
-            """
-
-        def test_hive(self):
-            t = operators.HiveOperator(
-                task_id='basic_hql', hql=self.hql, dag=self.dag)
-            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
-
-        def test_hive_dryrun(self):
-            t = operators.HiveOperator(
-                task_id='basic_hql', hql=self.hql, dag=self.dag)
-            t.dry_run()
-
-        def test_beeline(self):
-            t = operators.HiveOperator(
-                task_id='beeline_hql', hive_cli_conn_id='beeline_default',
-                hql=self.hql, dag=self.dag)
-            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
-
-        def test_presto(self):
-            sql = """
-            SELECT count(1) FROM airflow.static_babynames_partitioned;
-            """
-            t = operators.PrestoCheckOperator(
-                task_id='presto_check', sql=sql, dag=self.dag)
-            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
-
-        def test_presto_to_mysql(self):
-            t = operators.PrestoToMySqlTransfer(
-                task_id='presto_to_mysql_check',
-                sql="""
-                SELECT name, count(*) as ccount
-                FROM airflow.static_babynames
-                GROUP BY name
-                """,
-                mysql_table='test_static_babynames',
-                mysql_preoperator='TRUNCATE TABLE test_static_babynames;',
-                dag=self.dag)
-            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
-
-        def test_presto_to_mysql(self):
-            t = operators.PrestoToMySqlTransfer(
-                task_id='presto_to_mysql_check',
-                sql="""
-                SELECT name, count(*) as ccount
-                FROM airflow.static_babynames
-                GROUP BY name
-                """,
-                mysql_table='test_static_babynames',
-                mysql_preoperator='TRUNCATE TABLE test_static_babynames;',
-                dag=self.dag)
-            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
-
-        def test_hdfs_sensor(self):
-            t = operators.HdfsSensor(
-                task_id='hdfs_sensor_check',
-                
filepath='hdfs://user/hive/warehouse/airflow.db/static_babynames',
-                dag=self.dag)
-            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
-
-        def test_webhdfs_sensor(self):
-            t = operators.WebHdfsSensor(
-                task_id='webhdfs_sensor_check',
-                
filepath='hdfs://user/hive/warehouse/airflow.db/static_babynames',
-                timeout=120,
-                dag=self.dag)
-            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
-
-        def test_sql_sensor(self):
-            t = operators.SqlSensor(
-                task_id='hdfs_sensor_check',
-                conn_id='presto_default',
-                sql="SELECT 'x' FROM airflow.static_babynames LIMIT 1;",
-                dag=self.dag)
-            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
-
-        def test_hive_stats(self):
-            t = operators.HiveStatsCollectionOperator(
-                task_id='hive_stats_check',
-                table="airflow.static_babynames_partitioned",
-                partition={'ds': DEFAULT_DATE_DS},
-                dag=self.dag)
-            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
-
-        def test_hive_partition_sensor(self):
-            t = operators.HivePartitionSensor(
-                task_id='hive_partition_check',
-                table='airflow.static_babynames_partitioned',
-                dag=self.dag)
-            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
-
-        def test_hive_metastore_sql_sensor(self):
-            t = operators.MetastorePartitionSensor(
-                task_id='hive_partition_check',
-                table='airflow.static_babynames_partitioned',
-                partition_name='ds={}'.format(DEFAULT_DATE_DS),
-                dag=self.dag)
-            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
-
-        def test_hive2samba(self):
-            if 'Hive2SambaOperator' in dir(operators):
-                t = operators.Hive2SambaOperator(
-                    task_id='hive2samba_check',
-                    samba_conn_id='tableau_samba',
-                    hql="SELECT * FROM airflow.static_babynames LIMIT 10000",
-                    destination_filepath='test_airflow.csv',
-                    dag=self.dag)
-                t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, 
force=True)
-
-        def test_hive_to_mysql(self):
-            t = operators.HiveToMySqlTransfer(
-                mysql_conn_id='airflow_db',
-                task_id='hive_to_mysql_check',
-                create=True,
-                sql="""
-                SELECT name
-                FROM airflow.static_babynames
-                LIMIT 100
-                """,
-                mysql_table='test_static_babynames',
-                mysql_preoperator=[
-                    'DROP TABLE IF EXISTS test_static_babynames;',
-                    'CREATE TABLE test_static_babynames (name VARCHAR(500))',
-                ],
-                dag=self.dag)
-            t.clear(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
-            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
-
 if __name__ == '__main__':
     unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/851adc55/tests/operators/__init__.py
----------------------------------------------------------------------
diff --git a/tests/operators/__init__.py b/tests/operators/__init__.py
index 98a17a7..63ff2a0 100644
--- a/tests/operators/__init__.py
+++ b/tests/operators/__init__.py
@@ -1,2 +1,19 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
 from .docker_operator import *
 from .subdag_operator import *
+from .operators import *
+from .sensors import *
+from .hive_operator import *

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/851adc55/tests/operators/hive_operator.py
----------------------------------------------------------------------
diff --git a/tests/operators/hive_operator.py b/tests/operators/hive_operator.py
new file mode 100644
index 0000000..202adcf
--- /dev/null
+++ b/tests/operators/hive_operator.py
@@ -0,0 +1,209 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import print_function
+
+import datetime
+import os
+import unittest
+import six
+
+from airflow import DAG, configuration, operators, utils
+configuration.test_mode()
+
+import os
+import unittest
+
+
+DEFAULT_DATE = datetime.datetime(2015, 1, 1)
+DEFAULT_DATE_ISO = DEFAULT_DATE.isoformat()
+DEFAULT_DATE_DS = DEFAULT_DATE_ISO[:10]
+
+
+if 'AIRFLOW_RUNALL_TESTS' in os.environ:
+
+    import airflow.hooks.hive_hooks
+    import airflow.operators.presto_to_mysql
+
+    class HiveServer2Test(unittest.TestCase):
+        def setUp(self):
+            configuration.test_mode()
+
+        def test_select_conn(self):
+            from airflow.hooks.hive_hooks import HiveServer2Hook
+            sql = "select 1"
+            hook = HiveServer2Hook()
+            hook.get_records(sql)
+
+        def test_multi_statements(self):
+            from airflow.hooks.hive_hooks import HiveServer2Hook
+            sqls = [
+                "CREATE TABLE IF NOT EXISTS test_multi_statements (i INT)",
+                "DROP TABLE test_multi_statements",
+            ]
+            hook = HiveServer2Hook()
+            hook.get_records(sqls)
+
+        def test_get_metastore_databases(self):
+            if six.PY2:
+                from airflow.hooks.hive_hooks import HiveMetastoreHook
+                hook = HiveMetastoreHook()
+                hook.get_databases()
+
+        def test_to_csv(self):
+            from airflow.hooks.hive_hooks import HiveServer2Hook
+            sql = "select 1"
+            hook = HiveServer2Hook()
+            hook.to_csv(hql=sql, csv_filepath="/tmp/test_to_csv")
+
+    class HivePrestoTest(unittest.TestCase):
+
+        def setUp(self):
+            configuration.test_mode()
+            args = {'owner': 'airflow', 'start_date': DEFAULT_DATE}
+            dag = DAG('test_dag_id', default_args=args)
+            self.dag = dag
+            self.hql = """
+            USE airflow;
+            DROP TABLE IF EXISTS static_babynames_partitioned;
+            CREATE TABLE IF NOT EXISTS static_babynames_partitioned (
+                state string,
+                year string,
+                name string,
+                gender string,
+                num int)
+            PARTITIONED BY (ds string);
+            INSERT OVERWRITE TABLE static_babynames_partitioned
+                PARTITION(ds='{{ ds }}')
+            SELECT state, year, name, gender, num FROM static_babynames;
+            """
+
+        def test_hive(self):
+            import airflow.operators.hive_operator
+            t = operators.hive_operator.HiveOperator(
+                task_id='basic_hql', hql=self.hql, dag=self.dag)
+            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
+
+        def test_hive_dryrun(self):
+            import airflow.operators.hive_operator
+            t = operators.hive_operator.HiveOperator(
+                task_id='basic_hql', hql=self.hql, dag=self.dag)
+            t.dry_run()
+
+        def test_beeline(self):
+            import airflow.operators.hive_operator
+            t = operators.hive_operator.HiveOperator(
+                task_id='beeline_hql', hive_cli_conn_id='beeline_default',
+                hql=self.hql, dag=self.dag)
+            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
+
+        def test_presto(self):
+            sql = """
+            SELECT count(1) FROM airflow.static_babynames_partitioned;
+            """
+            import airflow.operators.presto_check_operator
+            t = operators.presto_check_operator.PrestoCheckOperator(
+                task_id='presto_check', sql=sql, dag=self.dag)
+            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
+
+        def test_presto_to_mysql(self):
+            import airflow.operators.presto_to_mysql
+            t = operators.presto_to_mysql.PrestoToMySqlTransfer(
+                task_id='presto_to_mysql_check',
+                sql="""
+                SELECT name, count(*) as ccount
+                FROM airflow.static_babynames
+                GROUP BY name
+                """,
+                mysql_table='test_static_babynames',
+                mysql_preoperator='TRUNCATE TABLE test_static_babynames;',
+                dag=self.dag)
+            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
+
+        def test_hdfs_sensor(self):
+            t = operators.sensors.HdfsSensor(
+                task_id='hdfs_sensor_check',
+                
filepath='hdfs://user/hive/warehouse/airflow.db/static_babynames',
+                dag=self.dag)
+            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
+
+        def test_webhdfs_sensor(self):
+            t = operators.sensors.WebHdfsSensor(
+                task_id='webhdfs_sensor_check',
+                
filepath='hdfs://user/hive/warehouse/airflow.db/static_babynames',
+                timeout=120,
+                dag=self.dag)
+            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
+
+        def test_sql_sensor(self):
+            t = operators.sensors.SqlSensor(
+                task_id='hdfs_sensor_check',
+                conn_id='presto_default',
+                sql="SELECT 'x' FROM airflow.static_babynames LIMIT 1;",
+                dag=self.dag)
+            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
+
+        def test_hive_stats(self):
+            import airflow.operators.hive_stats_operator
+            t = operators.hive_stats_operator.HiveStatsCollectionOperator(
+                task_id='hive_stats_check',
+                table="airflow.static_babynames_partitioned",
+                partition={'ds': DEFAULT_DATE_DS},
+                dag=self.dag)
+            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
+
+        def test_hive_partition_sensor(self):
+            t = operators.sensors.HivePartitionSensor(
+                task_id='hive_partition_check',
+                table='airflow.static_babynames_partitioned',
+                dag=self.dag)
+            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
+
+        def test_hive_metastore_sql_sensor(self):
+            t = operators.sensors.MetastorePartitionSensor(
+                task_id='hive_partition_check',
+                table='airflow.static_babynames_partitioned',
+                partition_name='ds={}'.format(DEFAULT_DATE_DS),
+                dag=self.dag)
+            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
+
+        def test_hive2samba(self):
+            import airflow.operators.hive_to_samba_operator
+            t = operators.hive_to_samba_operator.Hive2SambaOperator(
+                task_id='hive2samba_check',
+                samba_conn_id='tableau_samba',
+                hql="SELECT * FROM airflow.static_babynames LIMIT 10000",
+                destination_filepath='test_airflow.csv',
+                dag=self.dag)
+            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
+
+        def test_hive_to_mysql(self):
+            import airflow.operators.hive_to_mysql
+            t = operators.hive_to_mysql.HiveToMySqlTransfer(
+                mysql_conn_id='airflow_db',
+                task_id='hive_to_mysql_check',
+                create=True,
+                sql="""
+                SELECT name
+                FROM airflow.static_babynames
+                LIMIT 100
+                """,
+                mysql_table='test_static_babynames',
+                mysql_preoperator=[
+                    'DROP TABLE IF EXISTS test_static_babynames;',
+                    'CREATE TABLE test_static_babynames (name VARCHAR(500))',
+                ],
+                dag=self.dag)
+            t.clear(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
+            t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/851adc55/tests/operators/operators.py
----------------------------------------------------------------------
diff --git a/tests/operators/operators.py b/tests/operators/operators.py
new file mode 100644
index 0000000..d9bc0c2
--- /dev/null
+++ b/tests/operators/operators.py
@@ -0,0 +1,174 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import print_function
+
+import datetime
+import os
+import unittest
+import six
+
+from airflow import DAG, configuration, operators, utils
+from airflow.utils.tests import skipUnlessImported
+configuration.test_mode()
+
+import os
+import unittest
+
+
+DEFAULT_DATE = datetime.datetime(2015, 1, 1)
+DEFAULT_DATE_ISO = DEFAULT_DATE.isoformat()
+DEFAULT_DATE_DS = DEFAULT_DATE_ISO[:10]
+TEST_DAG_ID = 'unit_test_dag'
+
+
+@skipUnlessImported('airflow.operators.mysql_operator', 'MySqlOperator')
+class MySqlTest(unittest.TestCase):
+    def setUp(self):
+        configuration.test_mode()
+        args = {
+            'owner': 'airflow',
+            'mysql_conn_id': 'airflow_db',
+            'start_date': DEFAULT_DATE
+        }
+        dag = DAG(TEST_DAG_ID, default_args=args)
+        self.dag = dag
+
+    def mysql_operator_test(self):
+        sql = """
+        CREATE TABLE IF NOT EXISTS test_airflow (
+            dummy VARCHAR(50)
+        );
+        """
+        import airflow.operators.mysql_operator
+        t = operators.mysql_operator.MySqlOperator(
+            task_id='basic_mysql',
+            sql=sql,
+            mysql_conn_id='airflow_db',
+            dag=self.dag)
+        t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
+
+    def mysql_operator_test_multi(self):
+        sql = [
+            "TRUNCATE TABLE test_airflow",
+            "INSERT INTO test_airflow VALUES ('X')",
+        ]
+        import airflow.operators.mysql_operator
+        t = operators.mysql_operator.MySqlOperator(
+            task_id='mysql_operator_test_multi',
+            mysql_conn_id='airflow_db',
+            sql=sql, dag=self.dag)
+        t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
+
+    def test_mysql_to_mysql(self):
+        sql = "SELECT * FROM INFORMATION_SCHEMA.TABLES LIMIT 100;"
+        import airflow.operators.generic_transfer
+        t = operators.generic_transfer.GenericTransfer(
+            task_id='test_m2m',
+            preoperator=[
+                "DROP TABLE IF EXISTS test_mysql_to_mysql",
+                "CREATE TABLE IF NOT EXISTS "
+                "test_mysql_to_mysql LIKE INFORMATION_SCHEMA.TABLES"
+            ],
+            source_conn_id='airflow_db',
+            destination_conn_id='airflow_db',
+            destination_table="test_mysql_to_mysql",
+            sql=sql,
+            dag=self.dag)
+        t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
+
+    def test_sql_sensor(self):
+        t = operators.sensors.SqlSensor(
+            task_id='sql_sensor_check',
+            conn_id='mysql_default',
+            sql="SELECT count(1) FROM INFORMATION_SCHEMA.TABLES",
+            dag=self.dag)
+        t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
+
+@skipUnlessImported('airflow.operators.postgres_operator', 'PostgresOperator')
+class PostgresTest(unittest.TestCase):
+    def setUp(self):
+        configuration.test_mode()
+        args = {'owner': 'airflow', 'start_date': DEFAULT_DATE}
+        dag = DAG(TEST_DAG_ID, default_args=args)
+        self.dag = dag
+
+    def postgres_operator_test(self):
+        sql = """
+        CREATE TABLE IF NOT EXISTS test_airflow (
+            dummy VARCHAR(50)
+        );
+        """
+        import airflow.operators.postgres_operator
+        t = operators.postgres_operator.PostgresOperator(
+            task_id='basic_postgres', sql=sql, dag=self.dag)
+        t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
+
+        autocommitTask = operators.postgres_operator.PostgresOperator(
+            task_id='basic_postgres_with_autocommit',
+            sql=sql,
+            dag=self.dag,
+            autocommit=True)
+        autocommitTask.run(
+            start_date=DEFAULT_DATE,
+            end_date=DEFAULT_DATE,
+            force=True)
+
+
+@skipUnlessImported('airflow.operators.hive_operator', 'HiveOperator')
+@skipUnlessImported('airflow.operators.postgres_operator', 'PostgresOperator')
+class TransferTests(unittest.TestCase):
+    cluster = None
+
+    def setUp(self):
+        configuration.test_mode()
+        args = {'owner': 'airflow', 'start_date': DEFAULT_DATE_ISO}
+        dag = DAG(TEST_DAG_ID, default_args=args)
+        self.dag = dag
+
+    def test_clear(self):
+        self.dag.clear(
+            start_date=DEFAULT_DATE,
+            end_date=datetime.datetime.now())
+
+    def test_mysql_to_hive(self):
+        # import airflow.operators
+        from airflow.operators.mysql_to_hive import MySqlToHiveTransfer
+        sql = "SELECT * FROM baby_names LIMIT 1000;"
+        t = MySqlToHiveTransfer(
+            task_id='test_m2h',
+            mysql_conn_id='airflow_ci',
+            hive_cli_conn_id='beeline_default',
+            sql=sql,
+            hive_table='test_mysql_to_hive',
+            recreate=True,
+            delimiter=",",
+            dag=self.dag)
+        t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
+
+    def test_mysql_to_hive_partition(self):
+        from airflow.operators.mysql_to_hive import MySqlToHiveTransfer
+        sql = "SELECT * FROM baby_names LIMIT 1000;"
+        t = MySqlToHiveTransfer(
+            task_id='test_m2h',
+            mysql_conn_id='airflow_ci',
+            hive_cli_conn_id='beeline_default',
+            sql=sql,
+            hive_table='test_mysql_to_hive_part',
+            partition={'ds': DEFAULT_DATE_DS},
+            recreate=False,
+            create=True,
+            delimiter=",",
+            dag=self.dag)
+        t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/851adc55/tests/operators/sensor.py
----------------------------------------------------------------------
diff --git a/tests/operators/sensor.py b/tests/operators/sensor.py
deleted file mode 100644
index 45d4b81..0000000
--- a/tests/operators/sensor.py
+++ /dev/null
@@ -1,39 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import os
-import unittest
-
-from airflow.operators import HttpSensor
-from airflow.exceptions import AirflowException
-
-
-class HttpSensorTests(unittest.TestCase):
-
-    def test_poke_exception(self):
-        """
-        Exception occurs in poke function should not be ignored.
-        """
-        def resp_check(resp):
-            raise AirflowException('AirflowException raised here!')
-
-        task = HttpSensor(
-            task_id='http_sensor_poke_exception',
-            http_conn_id='http_default',
-            endpoint='',
-            params={},
-            response_check=resp_check,
-            poke_interval=5)
-        with self.assertRaisesRegexp(AirflowException, 'AirflowException 
raised here!'):
-            task.execute(None)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/851adc55/tests/operators/sensors.py
----------------------------------------------------------------------
diff --git a/tests/operators/sensors.py b/tests/operators/sensors.py
new file mode 100644
index 0000000..025790e
--- /dev/null
+++ b/tests/operators/sensors.py
@@ -0,0 +1,39 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import unittest
+
+from airflow.operators.sensors import HttpSensor
+from airflow.exceptions import AirflowException
+
+
+class HttpSensorTests(unittest.TestCase):
+
+    def test_poke_exception(self):
+        """
+        Exception occurs in poke function should not be ignored.
+        """
+        def resp_check(resp):
+            raise AirflowException('AirflowException raised here!')
+
+        task = HttpSensor(
+            task_id='http_sensor_poke_exception',
+            http_conn_id='http_default',
+            endpoint='',
+            params={},
+            response_check=resp_check,
+            poke_interval=5)
+        with self.assertRaisesRegexp(AirflowException, 'AirflowException 
raised here!'):
+            task.execute(None)

Reply via email to