"""Gmail tool utils.""" from __future__ import annotations import logging import os from typing import TYPE_CHECKING, List, Optional, Tuple if TYPE_CHECKING: from google.auth.transport.requests import Request from google.oauth2.credentials import Credentials from google_auth_oauthlib.flow import InstalledAppFlow from googleapiclient.discovery import Resource from googleapiclient.discovery import build as build_resource logger = logging.getLogger(__name__) def import_google() -> Tuple[Request, Credentials]: """Import google libraries. Returns: Tuple[Request, Credentials]: Request and Credentials classes. """ # google-auth-httplib2 try: from google.auth.transport.requests import Request from google.oauth2.credentials import Credentials except ImportError: raise ImportError( "You need to install google-auth-httplib2 to use this toolkit. " "Try running pip install --upgrade google-auth-httplib2" ) return Request, Credentials def import_installed_app_flow() -> InstalledAppFlow: """Import InstalledAppFlow class. Returns: InstalledAppFlow: InstalledAppFlow class. """ try: from google_auth_oauthlib.flow import InstalledAppFlow except ImportError: raise ImportError( "You need to install google-auth-oauthlib to use this toolkit. " "Try running pip install --upgrade google-auth-oauthlib" ) return InstalledAppFlow def import_googleapiclient_resource_builder() -> build_resource: """Import googleapiclient.discovery.build function. Returns: build_resource: googleapiclient.discovery.build function. """ try: from googleapiclient.discovery import build except ImportError: raise ImportError( "You need to install googleapiclient to use this toolkit. " "Try running pip install --upgrade google-api-python-client" ) return build DEFAULT_SCOPES = ["https://mail.google.com/"] DEFAULT_CREDS_TOKEN_FILE = "token.json" DEFAULT_CLIENT_SECRETS_FILE = "credentials.json" def get_gmail_credentials( token_file: Optional[str] = None, client_secrets_file: Optional[str] = None, scopes: Optional[List[str]] = None, ) -> Credentials: """Get credentials.""" # From https://developers.google.com/gmail/api/quickstart/python Request, Credentials = import_google() InstalledAppFlow = import_installed_app_flow() creds = None scopes = scopes or DEFAULT_SCOPES token_file = token_file or DEFAULT_CREDS_TOKEN_FILE client_secrets_file = client_secrets_file or DEFAULT_CLIENT_SECRETS_FILE # The file token.json stores the user's access and refresh tokens, and is # created automatically when the authorization flow completes for the first # time. if os.path.exists(token_file): creds = Credentials.from_authorized_user_file(token_file, scopes) # If there are no (valid) credentials available, let the user log in. if not creds or not creds.valid: if creds and creds.expired and creds.refresh_token: creds.refresh(Request()) else: # https://developers.google.com/gmail/api/quickstart/python#authorize_credentials_for_a_desktop_application # noqa flow = InstalledAppFlow.from_client_secrets_file( client_secrets_file, scopes ) creds = flow.run_local_server(port=0) # Save the credentials for the next run with open(token_file, "w") as token: token.write(creds.to_json()) return creds def build_resource_service( credentials: Optional[Credentials] = None, service_name: str = "gmail", service_version: str = "v1", ) -> Resource: """Build a Gmail service.""" credentials = credentials or get_gmail_credentials() builder = import_googleapiclient_resource_builder() return builder(service_name, service_version, credentials=credentials) def clean_email_body(body: str) -> str: """Clean email body.""" try: from bs4 import BeautifulSoup try: soup = BeautifulSoup(str(body), "html.parser") body = soup.get_text() return str(body) except Exception as e: logger.error(e) return str(body) except ImportError: logger.warning("BeautifulSoup not installed. Skipping cleaning.") return str(body)