[ 
https://issues.apache.org/jira/browse/AIRFLOW-3568?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16729400#comment-16729400
 ] 

jack edited comment on AIRFLOW-3568 at 12/27/18 7:31 AM:
---------------------------------------------------------

[~yohei] No problem. note that it was approved but not yet merged. It might be 
few more days as maybe other committers will have more comments.

You are more than welcome to check the Jira for more issues to fix.


was (Author: jackjack10):
[~yohei] No problem. note that it was approved but not yet merged. It might be 
few more days as maybe other committers will have more comments. 

You are more than welcome to search more issues with the gcp label to fix:

https://issues.apache.org/jira/browse/AIRFLOW-3503?jql=project%20%3D%20AIRFLOW%20AND%20labels%20%3D%20gcp

> S3ToGoogleCloudStorageOperator failed after succeeding in copying files from 
> s3
> -------------------------------------------------------------------------------
>
>                 Key: AIRFLOW-3568
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-3568
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: contrib
>    Affects Versions: 1.10.0
>            Reporter: Yohei Onishi
>            Assignee: Yohei Onishi
>            Priority: Major
>
> I tried to copy files from s3 to gcs using 
> S3ToGoogleCloudStorageOperator. The file successfully was uploaded to GCS but 
> the task failed with the following error.
> {code:java}
> [2018-12-26 07:56:33,062] {base_task_runner.py:107} INFO - Job 39: Subtask 
> gcs_copy_files_from_s3 [2018-12-26 07:56:33,062] {discovery.py:871} INFO - 
> URL being requested: POST 
> https://www.googleapis.com/upload/storage/v1/b/stg-rfid-etl-tmp/o?name=rfid_wh%2Fuq%2Fjp%2Fno_resp_carton_1D%2F2018%2F12%2F24%2F21%2Fno_resp_carton_20181224210201.csv&alt=json&uploadType=media
> [2018-12-26 07:56:33,214] {base_task_runner.py:107} INFO - Job 39: Subtask 
> gcs_copy_files_from_s3 [2018-12-26 07:56:33,213] {s3_to_gcs_operator.py:177} 
> INFO - All done, uploaded 1 files to Google Cloud Storage
> [2018-12-26 07:56:33,217] {models.py:1736} ERROR - Object of type 'set' is 
> not JSON serializable
> Traceback (most recent call last)
>   File "/usr/local/lib/airflow/airflow/models.py", line 1637, in _run_raw_tas
>     self.xcom_push(key=XCOM_RETURN_KEY, value=result
>   File "/usr/local/lib/airflow/airflow/models.py", line 1983, in xcom_pus
>     execution_date=execution_date or self.execution_date
>   File "/usr/local/lib/airflow/airflow/utils/db.py", line 74, in wrappe
>     return func(*args, **kwargs
>   File "/usr/local/lib/airflow/airflow/models.py", line 4531, in se
>     value = json.dumps(value).encode('UTF-8'
>   File "/usr/local/lib/python3.6/json/__init__.py", line 231, in dump
>     return _default_encoder.encode(obj
>   File "/usr/local/lib/python3.6/json/encoder.py", line 199, in encod
>     chunks = self.iterencode(o, _one_shot=True
>   File "/usr/local/lib/python3.6/json/encoder.py", line 257, in iterencod
>     return _iterencode(o, 0
>   File "/usr/local/lib/python3.6/json/encoder.py", line 180, in defaul
>     o.__class__.__name__
> TypeError: Object of type 'set' is not JSON serializabl
> [2018-12-26 07:56:33,220] {base_task_runner.py:107} INFO - Job 39: Subtask 
> gcs_copy_files_from_s3 [2018-12-26 07:56:33,217] {models.py:1736} ERROR - 
> Object of type 'set' is not JSON serializable
> [2018-12-26 07:56:33,220] {base_task_runner.py:107} INFO - Job 39: Subtask 
> gcs_copy_files_from_s3 Traceback (most recent call last):
> [2018-12-26 07:56:33,220] {base_task_runner.py:107} INFO - Job 39: Subtask 
> gcs_copy_files_from_s3   File "/usr/local/lib/airflow/airflow/models.py", 
> line 1637, in _run_raw_task
> [2018-12-26 07:56:33,220] {models.py:1756} INFO - Marking task as UP_FOR_RETRY
> [2018-12-26 07:56:33,220] {base_task_runner.py:107} INFO - Job 39: Subtask 
> gcs_copy_files_from_s3     self.xcom_push(key=XCOM_RETURN_KEY, value=result)
> [2018-12-26 07:56:33,220] {base_task_runner.py:107} INFO - Job 39: Subtask 
> gcs_copy_files_from_s3   File "/usr/local/lib/airflow/airflow/models.py", 
> line 1983, in xcom_push
> [2018-12-26 07:56:33,221] {base_task_runner.py:107} INFO - Job 39: Subtask 
> gcs_copy_files_from_s3     execution_date=execution_date or 
> self.execution_date)
> [2018-12-26 07:56:33,221] {base_task_runner.py:107} INFO - Job 39: Subtask 
> gcs_copy_files_from_s3   File "/usr/local/lib/airflow/airflow/utils/db.py", 
> line 74, in wrapper
> [2018-12-26 07:56:33,221] {base_task_runner.py:107} INFO - Job 39: Subtask 
> gcs_copy_files_from_s3     return func(*args, **kwargs)
> [2018-12-26 07:56:33,221] {base_task_runner.py:107} INFO - Job 39: Subtask 
> gcs_copy_files_from_s3   File "/usr/local/lib/airflow/airflow/models.py", 
> line 4531, in set
> [2018-12-26 07:56:33,221] {base_task_runner.py:107} INFO - Job 39: Subtask 
> gcs_copy_files_from_s3     value = json.dumps(value).encode('UTF-8')
> [2018-12-26 07:56:33,221] {base_task_runner.py:107} INFO - Job 39: Subtask 
> gcs_copy_files_from_s3   File "/usr/local/lib/python3.6/json/__init__.py", 
> line 231, in dumps
> [2018-12-26 07:56:33,221] {base_task_runner.py:107} INFO - Job 39: Subtask 
> gcs_copy_files_from_s3     return _default_encoder.encode(obj)
> [2018-12-26 07:56:33,221] {base_task_runner.py:107} INFO - Job 39: Subtask 
> gcs_copy_files_from_s3   File "/usr/local/lib/python3.6/json/encoder.py", 
> line 199, in encode
> [2018-12-26 07:56:33,222] {base_task_runner.py:107} INFO - Job 39: Subtask 
> gcs_copy_files_from_s3     chunks = self.iterencode(o, _one_shot=True)
> [2018-12-26 07:56:33,222] {base_task_runner.py:107} INFO - Job 39: Subtask 
> gcs_copy_files_from_s3   File "/usr/local/lib/python3.6/json/encoder.py", 
> line 257, in iterencode
> [2018-12-26 07:56:33,222] {base_task_runner.py:107} INFO - Job 39: Subtask 
> gcs_copy_files_from_s3     return _iterencode(o, 0)
> [2018-12-26 07:56:33,222] {base_task_runner.py:107} INFO - Job 39: Subtask 
> gcs_copy_files_from_s3   File "/usr/local/lib/python3.6/json/encoder.py", 
> line 180, in default
> [2018-12-26 07:56:33,222] {base_task_runner.py:107} INFO - Job 39: Subtask 
> gcs_copy_files_from_s3     o.__class__.__name__)
> [2018-12-26 07:56:33,222] {base_task_runner.py:107} INFO - Job 39: Subtask 
> gcs_copy_files_from_s3 TypeError: Object of type 'set' is not JSON 
> serializable
> [2018-12-26 07:56:33,222] {base_task_runner.py:107} INFO - Job 39: Subtask 
> gcs_copy_files_from_s3 [2018-12-26 07:56:33,220] {models.py:1756} INFO - 
> Marking task as UP_FOR_RETRY
> {code}
>  
> According the error log, it failed because it tries to push return files list 
> as set to xcom but xcom_push does not support set.
> [https://github.com/apache/incubator-airflow/blob/1.10.0/airflow/models.py#L1637]
> {code:java}
> # If the task returns a result, push an XCom containing it
> if result is not None:
>     self.xcom_push(key=XCOM_RETURN_KEY, value=result){code}
> [https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/operators/s3_to_gcs_operator.py#L155]
> [https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/operators/s3_to_gcs_operator.py#L198]
> {code:java}
> files = set(files) - set(existing_files)
> ...
> return files{code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to