'OAuth authentication in apache airflow (Google Cloud Composer)
I have successfully written an API in Python to read Gmail message, URL from the message, call the URL and store CSV file, however, when I am deploying this in Apache Airflow [Google Cloud Composer] I am getting the below error (shown in the screenshot). I believe it because my code cannot find the token.json and credential.json. I tried many ways, almost did 2 days of research to fix this problem, but couldn't find any solution.
Please note: The dag file and API file are coming from the cloud storage bucket. Even the token.json and credential.json are in the same bucket.
Error in the apache airflow
I am using OAuth 2.0 key for the Gmail API:
import os.path
import logging
from google.cloud import bigquery
from google.oauth2 import service_account
from google.cloud import storage
import pandas as pd
import requests
import json
import sys
import csv, os
from datetime import datetime, timedelta
from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
# import the required libraries
from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
import pickle
import csv
import requests
import pandas as pd
import datetime
from apiclient.discovery import build
# If modifying these scopes, delete the file token.json.
SCOPES = ['https://www.googleapis.com/auth/gmail.readonly']
full_date = datetime.datetime.now()
day = full_date.strftime("%d")
month = full_date.strftime("%b")
Year = full_date.strftime("%Y")
day_month_year = day+" "+month+" "+Year
subject = ""
def get_data_from():
try:
"""Shows basic usage of the Gmail API.
Lists the user's Gmail labels.
"""
creds = None
# The file token.json stores the user's access and refresh tokens, and is
# created automatically when the authorization flow completes for the first
# time.
if os.path.exists('token.json'):
creds = Credentials.from_authorized_user_file('token.json', SCOPES)
service = build('gmail', 'v1', credentials=creds)
#show_chatty_threads(service)
# If there are no (valid) credentials available, let the user log in.
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
flow = InstalledAppFlow.from_client_secrets_file(
'credentials.json', SCOPES)
creds = flow.run_local_server(port=0)
# Save the credentials for the next run
with open('token.json', 'w') as token:
token.write(creds.to_json())
service = build('gmail', 'v1', credentials=creds)
# Call the Gmail API
threads = service.users().threads().list(userId='me').execute().get('threads', [])
for thread in threads:
tdata = service.users().threads().get(userId='me', id=thread['id']).execute()
nmsgs = len(tdata['messages'])
msg = tdata['messages'][0]['payload']
#print(msg['headers'])
subject = ''
for header in msg['headers']:
if header['name'] == 'Subject':
subject = header['value']
for header in msg['headers']:
if header['name'] == 'Date':
#print(len(header['value']))
date_email = header['value']
#if date_email == 'xyz':
if day_month_year in date_email:
print(date_email)
txt = service.users().messages().get(userId='me', id=thread['id']).execute()
#print(txt)
# Get value of 'payload' from dictionary 'txt'
payload = txt['payload']
# The Body of the message is in Encrypted format. So, we have to decode it.
# Get the data and decode it with base 64 decoder.
parts = payload.get('parts')[0]
data = parts['body']['data']
#print(data)
data = data.replace("-","+").replace("_","/")
#print(data)
decoded_data = base64.b64decode(data.encode('utf-8')).decode("utf-8")
#print(decoded_data)
#print(type(decoded_data))
#find URL in email body
URL = re.search("(?P<url>https?://[^\s]+)", decoded_data).group("url")
URL = URL.replace(">","")
print(URL)
req = requests.get(URL)
url_content = req.content
print(url_content)
# Load into a dataframe
df = pd.read_excel(url_content)
print(df)
# Write to csv
file_name = subject+"-"+day_month_year+".csv"
print(file_name)
bucket.blob('dags/orchestra/xyz/abc_temp/'+f'{file_name}').upload_from_string(df, 'text/csv')
df.to_csv(file_name)
except Exception as e: print(e)
Solution 1:[1]
try add this to 'service' API call cache_discovery=False
for the file_cache is unavailable when using oauth2client >= 4.0.0
example:
service = build('gmail', 'v1', credentials=creds, cache_discovery=False)
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 |