OK this looks like an easy one to fix :) You can't use `params` as a param in an operator. It's already used in `BaseOperator` and has special handling for serialization.
So rename it to `task_params` or anything else, and you should be good. On Thu, Dec 9, 2021 at 9:44 AM Anthony Joyce <[email protected]> wrote: > Hi Daniel- > > After some trial and error, I was able to isolate the issue. It has to do > with my customer operator. See code below: > > from airflow.providers.mysql.hooks.mysql import MySqlHook > from airflow.hooks.postgres_hook import PostgresHook > from airflow.models import BaseOperator > from airflow.utils.decorators import apply_defaults > from contextlib import closing > from typing import Dict, Optional, Union > > class *MySqlToPostgresOperator*(BaseOperator): > """Selects data from a MySQL database and inserts that data into a > PostgreSQL database. Cursors are used to minimize memory usage for > large > queries. > """ > > template_fields = ("sql", "postgres_table", "params") > template_ext = (".sql",) > ui_color = "#944dff" # cool purple > > @*apply_defaults* > def *__init__*( > self, > sql: *str*, > mysql_conn_id: *str* = "mysql_default", > postgres_table: *str* = "", > postgres_conn_id: *str* = "postgres_default", > params: Optional[Dict[*str*, Union[*str*, *int*]]] = *None*, > rows_chunk: *int* = 5000, > *args, > **kwargs, > ): > *super*().__init__(*args, **kwargs) > if params is *None*: > params = {} > self.sql = sql > self.mysql_conn_id = mysql_conn_id > self.postgres_table = postgres_table > self.postgres_conn_id = postgres_conn_id > self.params = params > self.rows_chunk = rows_chunk > > def *execute*(self, context): > """Establish connections to both MySQL & PostgreSQL databases, > open > cursor and begin processing query, loading chunks of rows into > PostgreSQL. Repeat loading chunks until all rows processed for > query. > """ > source = MySqlHook(mysql_conn_id=self.mysql_conn_id) > target = PostgresHook(postgres_conn_id=self.postgres_conn_id) > with closing(source.get_conn()) as conn: > with closing(conn.cursor()) as cursor: > cursor.execute(self.sql, self.params) > target_fields = [x[0] for x in cursor.description] > row_count = 0 > rows = cursor.fetchmany(self.rows_chunk) > while *len*(rows) > 0: > row_count += *len*(rows) > target.insert_rows( > self.postgres_table, > rows, > target_fields=target_fields, > commit_every=self.rows_chunk, > ) > rows = cursor.fetchmany(self.rows_chunk) > self.log.info( > f"{row_count} row(s) inserted into > {self.postgres_table}." > ) > > > Thanks, > > Anthony > > > On Dec 9, 2021, at 11:52 AM, Daniel Standish <[email protected]> wrote: > > Can you provide a dag (as simplified as possible) which we can use to > reproduce this error? > > On Thu, Dec 9, 2021 at 8:45 AM Anthony Joyce < > [email protected]> wrote: > >> Hello fellow users- >> >> I have encountered an error which seems to be related to serialization: >> >> Broken DAG: [/home/etl/airflow/dags/airflow_platypus_etl_dag.py] >> Traceback (most recent call last): File >> "/home/etl/anaconda3/lib/python3.8/site-packages/airflow/serialization/serialized_objects.py", >> line 574, in serialize_operator serialize_op['params'] = >> cls._serialize_params_dict(op.params) File >> "/home/etl/anaconda3/lib/python3.8/site-packages/airflow/serialization/serialized_objects.py", >> line 447, in _serialize_params_dict if >> f'{v.__module__}.{v.__class__.__name__}' == 'airflow.models.param.Param': >> AttributeError: 'str' object has no attribute '__module__' During handling >> of the above exception, another exception occurred: Traceback (most recent >> call last): File >> "/home/etl/anaconda3/lib/python3.8/site-packages/airflow/serialization/serialized_objects.py", >> line 935, in to_dict json_dict = {"__version": cls.SERIALIZER_VERSION, >> "dag": cls.serialize_dag(var)} File >> "/home/etl/anaconda3/lib/python3.8/site-packages/airflow/serialization/serialized_objects.py", >> line 847, in serialize_dag raise SerializationError(f'Failed to serialize >> DAG {dag.dag_id!r}: {e}') airflow.exceptions.SerializationError: Failed to >> serialize DAG 'Platypus_ETL': 'str' object has no attribute ‘__module__' >> >> >> I have spent some time trying to figure out what is going on but to no >> avail. Anyone have any insight on an error like this? I am on Airflow >> release 2.2.2 and I am using the default packages constraints. >> >> Thanks all, >> >> Anthony >> >> >
