Hi,
   I am trying to do a simple upload of a file to an existing S3 bucket.
The UI shows there is a broken dag and I am unable to understand the
message.

[image: Screen Shot 2020-06-04 at 5.00.27 PM.png]

The dag code is attached. Not sure if it some kind of syntax error
(specific to Airflow...). Appreciate your help.

-- 
Thanks,
Srini. <http://csc.lsu.edu/%7Essrini1/>
import logging
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator


default_args = {
    'owner': 'srini',
    'start_date': datetime(2020, 6, 4),
    'retry_delay': timedelta(minutes=5)
}

    
def upload_file_to_S3_with_boto(filename, key, bucket_name):
    import boto3
    s3 = boto3.resource('s3')
    logging.info(f"Uploading file {filename} to s3://{bucket_name}")
    s3.Bucket(bucket_name).upload_file(filename, key)


start_task = DummyOperator(
    task_id='dummy_start'
)


dag = DAG(
    'Uploading one file to S3',
    default_args = default_args,
    schedule_interval='@once'
    )


LOCAL_FILE = '/Users/srini/research/cloud/terra/monoliths/test.txt'
TEST_KEY = 'test.txt'
TEST_BUCKET = 'srini-test-bkt-2'


upload_to_s3_task = PythonOperator(
    task_id="upload_file_to_s3",
    python_callable=upload_file_to_S3_with_boto,
    op_kwargs={'filename': LOCAL_FILE, 'key': TEST_KEY, 'bucket_name': TEST_BUCKET},
    dag=dag
    )

start_task >> upload_to_S3_task

Reply via email to