'Using airflow to uploade data on S3
I tried to upload a dataframe containing informations about apple stock (using their api) as csv on s3 using airflow and pythonoperator. The script is below. When launched the dags appears as success but nothing happen at s3 level. I created 3 tasks one for gathering data another for creating s3 bucket and the last for uploading dataframe to S3 as csv file. The script works well in pure python.
import urllib
from datetime import datetime
from datetime import timedelta
import pandas
from urllib.request import urlopen
from io import BytesIO
import logging
import boto3
from botocore.exceptions import ClientError
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python import PythonOperator, PythonVirtualenvOperator
from airflow.utils.dates import days_ago
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
s3_hook = S3Hook(aws_conn_id="my_conn_S3", region_name="eu-central-1")
buket_name = "BUCKET_NAME"
default_args = {
'owner': 'admin',
'depends_on_past': False,
'start_date': datetime.now(),
'email': [email protected]
'email_on_failure': True,
'email_on_retry': False,
'retries': 2,
'retry_delay': timedelta(minutes=0.5),
}
dag = DAG(
dag_id='apple_finance',
default_args=default_args,
schedule_interval=None,
start_date=days_ago(2),
tags=['apple-finance'],
)
def get_apple_data():
api_key = "MY_API_KEY"
#get quotes
quotes_url = f"https://financialmodelingprep.com/api/v3/profile/AAPL?apikey={api_key}"
response = urlopen(quotes_url)
quotes_data_ = response.read().decode("utf-8")
quotes_data = json.loads(quotes_data_)
pro_data= dict({key: val for key ,val in quotes_data[0].items() if key in ['companyName','price']})
#gets_ratings
rates_url = f"https://financialmodelingprep.com/api/v3/rating/AAPL?apikey={api_key}"
response_r = urlopen(rates_url)
rates_data_ = response_r.read().decode("utf-8")
rates_data = json.loads(rates_data_)[0]
rates_data = dict({key:val for key, val in rates_data.items() if key in ['rating','ratingScore', 'ratingRecommendation']})
pro_data.update(rates_data)
df = pandas.DataFrame(pro_data , index = [0])
df['timestamp'] = datetime.now().isoformat()
#df.to_csv("apple_data.csv")
return df
t1 = PythonOperator(
task_id="get-APPL-data",
python_callable = get_apple_data,
dag=dag
)
def create_bucket():
region = "eu-central-1"
logger = logging.getLogger("airflow.task")
try:
s3_hook.create_bucket(bucket_name = bucket_name, region_name= region)
#message =
logger.info("--------BUCKET CREATED---------")
except ClientError as e:
logger.error(e)
logger.info("----------CAN'T CREATE BUCKET------------")
return False
return True
t2 = PythonOperator(
task_id="create-s3-bucket",
python_callable=create_bucket,
dag=dag
)
def sendDataToS3(**kwargs):
ti = kwargs['ti']
apple_df = ti.xcom_pull(task_ids="get-APPL-data")
region = "eu-central-1"
#print(apple_df)
csv_buffer = BytesIO
apple_df.to_csv(csv_buffer)
s3_hook._upload_file_obj(file_obj = csv_buffer.getvalue(), key = f"apple_data_{datetime.now()}.csv",
bucket_name = bucket_name)
#s3_resource.Object(bucket, f"apple_data_{datetime.now()}.csv").put(Body=csv_buffer.getvalue())
t3 = PythonOperator(
task_id="UploadToS3",
python_callable=sendDataToS3,
dag=dag
)
t0 >> t1 >> t2 >> t3 ```
Solution 1:[1]
When you run this locally in pure python, you mean to say it's not running in a local dag?
Also, What's the error? If the file works locally then you must be hardcoding passwords into your script. If that's not the case in your dag, i.e., you're using a credentials manager, you may be running into the issue that MWAA will not need credentials because of the IAM role knows to call the respective credentials. If you're running into an accessdenied error then your permissions for the MWAA executable policy may not be permitted to write to S3 (putobject function may not be allowed for the MWAA executable policy).
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 |