[ https://issues.apache.org/jira/browse/AIRFLOW-3568?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Kaxil Naik resolved AIRFLOW-3568. --------------------------------- Resolution: Fixed Fix Version/s: 1.10.2 https://github.com/apache/incubator-airflow/pull/4371 > S3ToGoogleCloudStorageOperator failed after succeeding in copying files from > s3 > ------------------------------------------------------------------------------- > > Key: AIRFLOW-3568 > URL: https://issues.apache.org/jira/browse/AIRFLOW-3568 > Project: Apache Airflow > Issue Type: Bug > Components: contrib > Affects Versions: 1.10.0 > Reporter: Yohei Onishi > Assignee: Yohei Onishi > Priority: Major > Fix For: 1.10.2 > > > I tried to copy files from s3 to gcs using > S3ToGoogleCloudStorageOperator. The file successfully was uploaded to GCS but > the task failed with the following error. > {code:java} > [2018-12-26 07:56:33,062] {base_task_runner.py:107} INFO - Job 39: Subtask > gcs_copy_files_from_s3 [2018-12-26 07:56:33,062] {discovery.py:871} INFO - > URL being requested: POST > https://www.googleapis.com/upload/storage/v1/b/stg-rfid-etl-tmp/o?name=rfid_wh%2Fuq%2Fjp%2Fno_resp_carton_1D%2F2018%2F12%2F24%2F21%2Fno_resp_carton_20181224210201.csv&alt=json&uploadType=media > [2018-12-26 07:56:33,214] {base_task_runner.py:107} INFO - Job 39: Subtask > gcs_copy_files_from_s3 [2018-12-26 07:56:33,213] {s3_to_gcs_operator.py:177} > INFO - All done, uploaded 1 files to Google Cloud Storage > [2018-12-26 07:56:33,217] {models.py:1736} ERROR - Object of type 'set' is > not JSON serializable > Traceback (most recent call last) > File "/usr/local/lib/airflow/airflow/models.py", line 1637, in _run_raw_tas > self.xcom_push(key=XCOM_RETURN_KEY, value=result > File "/usr/local/lib/airflow/airflow/models.py", line 1983, in xcom_pus > execution_date=execution_date or self.execution_date > File "/usr/local/lib/airflow/airflow/utils/db.py", line 74, in wrappe > return func(*args, **kwargs > File "/usr/local/lib/airflow/airflow/models.py", line 4531, in se > value = json.dumps(value).encode('UTF-8' > File "/usr/local/lib/python3.6/json/__init__.py", line 231, in dump > return _default_encoder.encode(obj > File "/usr/local/lib/python3.6/json/encoder.py", line 199, in encod > chunks = self.iterencode(o, _one_shot=True > File "/usr/local/lib/python3.6/json/encoder.py", line 257, in iterencod > return _iterencode(o, 0 > File "/usr/local/lib/python3.6/json/encoder.py", line 180, in defaul > o.__class__.__name__ > TypeError: Object of type 'set' is not JSON serializabl > [2018-12-26 07:56:33,220] {base_task_runner.py:107} INFO - Job 39: Subtask > gcs_copy_files_from_s3 [2018-12-26 07:56:33,217] {models.py:1736} ERROR - > Object of type 'set' is not JSON serializable > [2018-12-26 07:56:33,220] {base_task_runner.py:107} INFO - Job 39: Subtask > gcs_copy_files_from_s3 Traceback (most recent call last): > [2018-12-26 07:56:33,220] {base_task_runner.py:107} INFO - Job 39: Subtask > gcs_copy_files_from_s3 File "/usr/local/lib/airflow/airflow/models.py", > line 1637, in _run_raw_task > [2018-12-26 07:56:33,220] {models.py:1756} INFO - Marking task as UP_FOR_RETRY > [2018-12-26 07:56:33,220] {base_task_runner.py:107} INFO - Job 39: Subtask > gcs_copy_files_from_s3 self.xcom_push(key=XCOM_RETURN_KEY, value=result) > [2018-12-26 07:56:33,220] {base_task_runner.py:107} INFO - Job 39: Subtask > gcs_copy_files_from_s3 File "/usr/local/lib/airflow/airflow/models.py", > line 1983, in xcom_push > [2018-12-26 07:56:33,221] {base_task_runner.py:107} INFO - Job 39: Subtask > gcs_copy_files_from_s3 execution_date=execution_date or > self.execution_date) > [2018-12-26 07:56:33,221] {base_task_runner.py:107} INFO - Job 39: Subtask > gcs_copy_files_from_s3 File "/usr/local/lib/airflow/airflow/utils/db.py", > line 74, in wrapper > [2018-12-26 07:56:33,221] {base_task_runner.py:107} INFO - Job 39: Subtask > gcs_copy_files_from_s3 return func(*args, **kwargs) > [2018-12-26 07:56:33,221] {base_task_runner.py:107} INFO - Job 39: Subtask > gcs_copy_files_from_s3 File "/usr/local/lib/airflow/airflow/models.py", > line 4531, in set > [2018-12-26 07:56:33,221] {base_task_runner.py:107} INFO - Job 39: Subtask > gcs_copy_files_from_s3 value = json.dumps(value).encode('UTF-8') > [2018-12-26 07:56:33,221] {base_task_runner.py:107} INFO - Job 39: Subtask > gcs_copy_files_from_s3 File "/usr/local/lib/python3.6/json/__init__.py", > line 231, in dumps > [2018-12-26 07:56:33,221] {base_task_runner.py:107} INFO - Job 39: Subtask > gcs_copy_files_from_s3 return _default_encoder.encode(obj) > [2018-12-26 07:56:33,221] {base_task_runner.py:107} INFO - Job 39: Subtask > gcs_copy_files_from_s3 File "/usr/local/lib/python3.6/json/encoder.py", > line 199, in encode > [2018-12-26 07:56:33,222] {base_task_runner.py:107} INFO - Job 39: Subtask > gcs_copy_files_from_s3 chunks = self.iterencode(o, _one_shot=True) > [2018-12-26 07:56:33,222] {base_task_runner.py:107} INFO - Job 39: Subtask > gcs_copy_files_from_s3 File "/usr/local/lib/python3.6/json/encoder.py", > line 257, in iterencode > [2018-12-26 07:56:33,222] {base_task_runner.py:107} INFO - Job 39: Subtask > gcs_copy_files_from_s3 return _iterencode(o, 0) > [2018-12-26 07:56:33,222] {base_task_runner.py:107} INFO - Job 39: Subtask > gcs_copy_files_from_s3 File "/usr/local/lib/python3.6/json/encoder.py", > line 180, in default > [2018-12-26 07:56:33,222] {base_task_runner.py:107} INFO - Job 39: Subtask > gcs_copy_files_from_s3 o.__class__.__name__) > [2018-12-26 07:56:33,222] {base_task_runner.py:107} INFO - Job 39: Subtask > gcs_copy_files_from_s3 TypeError: Object of type 'set' is not JSON > serializable > [2018-12-26 07:56:33,222] {base_task_runner.py:107} INFO - Job 39: Subtask > gcs_copy_files_from_s3 [2018-12-26 07:56:33,220] {models.py:1756} INFO - > Marking task as UP_FOR_RETRY > {code} > > According the error log, it failed because it tries to push return files list > as set to xcom but xcom_push does not support set. > [https://github.com/apache/incubator-airflow/blob/1.10.0/airflow/models.py#L1637] > {code:java} > # If the task returns a result, push an XCom containing it > if result is not None: > self.xcom_push(key=XCOM_RETURN_KEY, value=result){code} > [https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/operators/s3_to_gcs_operator.py#L155] > [https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/operators/s3_to_gcs_operator.py#L198] > {code:java} > files = set(files) - set(existing_files) > ... > return files{code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)