'OAuth authentication in apache airflow (Google Cloud Composer)

I have successfully written an API in Python to read Gmail message, URL from the message, call the URL and store CSV file, however, when I am deploying this in Apache Airflow [Google Cloud Composer] I am getting the below error (shown in the screenshot). I believe it because my code cannot find the token.json and credential.json. I tried many ways, almost did 2 days of research to fix this problem, but couldn't find any solution.

Please note: The dag file and API file are coming from the cloud storage bucket. Even the token.json and credential.json are in the same bucket.

enter image description here

Error in the apache airflow

enter image description here I am using OAuth 2.0 key for the Gmail API:

enter image description here

import os.path
import logging
from google.cloud import bigquery
from google.oauth2 import service_account
from google.cloud import storage
import pandas as pd
import requests
import json
import sys
import csv, os
from datetime import datetime, timedelta
from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
# import the required libraries 
from googleapiclient.discovery import build 
from google_auth_oauthlib.flow import InstalledAppFlow 
from google.auth.transport.requests import Request 
import pickle 
import csv
import requests
import pandas as pd
import datetime
from apiclient.discovery import build

# If modifying these scopes, delete the file token.json.
SCOPES = ['https://www.googleapis.com/auth/gmail.readonly']
    

full_date = datetime.datetime.now()
day = full_date.strftime("%d")
month = full_date.strftime("%b")
Year = full_date.strftime("%Y")

day_month_year = day+" "+month+" "+Year
subject = ""

    
def get_data_from():
    try:
        
        """Shows basic usage of the Gmail API.
        Lists the user's Gmail labels.
        """
        creds = None
        # The file token.json stores the user's access and refresh tokens, and is
        # created automatically when the authorization flow completes for the first
        # time.
        
        if os.path.exists('token.json'):
            creds = Credentials.from_authorized_user_file('token.json', SCOPES)
        service = build('gmail', 'v1', credentials=creds)
        #show_chatty_threads(service)
        # If there are no (valid) credentials available, let the user log in.
        if not creds or not creds.valid:
            if creds and creds.expired and creds.refresh_token:
                creds.refresh(Request())
            else:
                flow = InstalledAppFlow.from_client_secrets_file(
                    'credentials.json', SCOPES)
                creds = flow.run_local_server(port=0)
            # Save the credentials for the next run
            with open('token.json', 'w') as token:
                token.write(creds.to_json())

        service = build('gmail', 'v1', credentials=creds)
        
        # Call the Gmail API
        threads = service.users().threads().list(userId='me').execute().get('threads', [])
        for thread in threads:
            tdata = service.users().threads().get(userId='me', id=thread['id']).execute()
            nmsgs = len(tdata['messages'])
            msg = tdata['messages'][0]['payload']
            #print(msg['headers'])
            subject = ''
            for header in msg['headers']:
                if header['name'] == 'Subject':
                    subject = header['value']
                    
            for header in msg['headers']:
                if header['name'] == 'Date':
                    #print(len(header['value']))
                    date_email = header['value']
                    #if date_email == 'xyz':
                    if day_month_year in date_email:
                        print(date_email)
                        txt = service.users().messages().get(userId='me', id=thread['id']).execute()
                        #print(txt)
                        # Get value of 'payload' from dictionary 'txt' 
                        payload = txt['payload'] 
                        # The Body of the message is in Encrypted format. So, we have to decode it. 
                        # Get the data and decode it with base 64 decoder. 
                        parts = payload.get('parts')[0] 
                        data = parts['body']['data'] 
                        #print(data)
                        data = data.replace("-","+").replace("_","/")
                        #print(data)
                        decoded_data = base64.b64decode(data.encode('utf-8')).decode("utf-8")
                        #print(decoded_data)
                        #print(type(decoded_data))

                        #find URL in email body 
                        URL = re.search("(?P<url>https?://[^\s]+)", decoded_data).group("url")
                        URL = URL.replace(">","")
                        print(URL)
                        req = requests.get(URL)
                        url_content = req.content
                        print(url_content)
                        # Load into a dataframe
                        df = pd.read_excel(url_content)
                        print(df)
                        # Write to csv
                        file_name = subject+"-"+day_month_year+".csv" 
                        print(file_name)
                        bucket.blob('dags/orchestra/xyz/abc_temp/'+f'{file_name}').upload_from_string(df, 'text/csv')
                        df.to_csv(file_name)
                            
    except Exception as e: print(e)


Solution 1:[1]

try add this to 'service' API call cache_discovery=False for the file_cache is unavailable when using oauth2client >= 4.0.0

example:

service = build('gmail', 'v1', credentials=creds, cache_discovery=False)

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1