mirror of
https://github.com/aljazceru/goose.git
synced 2025-12-18 14:44:21 +01:00
feat: Google Workspace Toolkit (#222)
This commit is contained in:
@@ -15,8 +15,10 @@ dependencies = [
|
||||
"langfuse>=2.38.2",
|
||||
"selenium>=4.0.0",
|
||||
"beautifulsoup4>=4.9.3",
|
||||
"pyshadow<=0.0.5"
|
||||
"pyshadow<=0.0.5",
|
||||
"google-workspace>=0.20.3",
|
||||
]
|
||||
|
||||
author = [{ name = "Block", email = "ai-oss-tools@block.xyz" }]
|
||||
packages = [{ include = "goose", from = "src" }]
|
||||
|
||||
@@ -36,6 +38,7 @@ repo_context = "goose.toolkit.repo_context.repo_context:RepoContext"
|
||||
synopsis = "goose.synopsis.toolkit:SynopsisDeveloper"
|
||||
browser = "goose.toolkit.web_browser:BrowserToolkit"
|
||||
memory = "goose.toolkit.memory:Memory"
|
||||
google_workspace = "goose.toolkit.google_workspace:GoogleWorkspace"
|
||||
|
||||
[project.entry-points."goose.profile"]
|
||||
default = "goose.profile:default_profile"
|
||||
@@ -84,3 +87,10 @@ ai-exchange = { workspace = true }
|
||||
|
||||
[tool.uv.workspace]
|
||||
members = ["packages/*"]
|
||||
|
||||
[project.optional-dependencies]
|
||||
google_workspace = [
|
||||
"google-auth-oauthlib>=1.0.0",
|
||||
"google-auth-httplib2>=0.1.0",
|
||||
"google-api-python-client>=2.86.0",
|
||||
]
|
||||
|
||||
103
src/goose/toolkit/google_workspace.py
Normal file
103
src/goose/toolkit/google_workspace.py
Normal file
@@ -0,0 +1,103 @@
|
||||
import os
|
||||
|
||||
from exchange import Message # type: ignore
|
||||
|
||||
from goose.toolkit.base import Toolkit, tool
|
||||
from goose.tools.gmail_client import GmailClient
|
||||
from goose.tools.google_calendar_client import GoogleCalendarClient
|
||||
from goose.tools.google_oauth_handler import GoogleOAuthHandler
|
||||
|
||||
SCOPES = ["https://www.googleapis.com/auth/gmail.readonly", "https://www.googleapis.com/auth/calendar.readonly"]
|
||||
|
||||
|
||||
def get_file_paths() -> dict[str, str]:
|
||||
return {
|
||||
"CLIENT_SECRETS_FILE": os.path.expanduser("~/.config/goose/google_credentials.json"),
|
||||
"TOKEN_FILE": os.path.expanduser("~/.config/goose/google_oauth_token.json"),
|
||||
}
|
||||
|
||||
|
||||
class GoogleWorkspace(Toolkit):
|
||||
"""A toolkit for integrating with Google APIs"""
|
||||
|
||||
def system(self) -> str:
|
||||
"""Retrieve detailed configuration and procedural guidelines for Jira operations"""
|
||||
template_content = Message.load("prompts/google_workspace.jinja").text
|
||||
return template_content
|
||||
|
||||
def login(self) -> str:
|
||||
try:
|
||||
file_paths = get_file_paths()
|
||||
oauth_handler = GoogleOAuthHandler(file_paths["CLIENT_SECRETS_FILE"], file_paths["TOKEN_FILE"], SCOPES)
|
||||
credentials = oauth_handler.get_credentials()
|
||||
return f"Successfully authenticated with Google! Access token: {credentials.token[:8]}..."
|
||||
except Exception as e:
|
||||
return f"Error: {str(e)}"
|
||||
|
||||
@tool
|
||||
def list_emails(self) -> str:
|
||||
"""List the emails in the user's Gmail inbox, including email IDs"""
|
||||
try:
|
||||
file_paths = get_file_paths()
|
||||
oauth_handler = GoogleOAuthHandler(file_paths["CLIENT_SECRETS_FILE"], file_paths["TOKEN_FILE"], SCOPES)
|
||||
credentials = oauth_handler.get_credentials()
|
||||
gmail_client = GmailClient(credentials)
|
||||
emails = gmail_client.list_emails()
|
||||
return emails
|
||||
except ValueError as e:
|
||||
return f"Error: {str(e)}"
|
||||
except Exception as e:
|
||||
return f"An unexpected error occurred: {str(e)}"
|
||||
|
||||
@tool
|
||||
def get_email_content(self, email_id: str) -> str:
|
||||
"""
|
||||
Get the contents of a single email by its ID.
|
||||
|
||||
Args:
|
||||
email_id (str): The ID of the email to retrieve.
|
||||
|
||||
Returns:
|
||||
response (str): The contents of the email, including subject, sender, and body.
|
||||
"""
|
||||
try:
|
||||
file_paths = get_file_paths()
|
||||
oauth_handler = GoogleOAuthHandler(file_paths["CLIENT_SECRETS_FILE"], file_paths["TOKEN_FILE"], SCOPES)
|
||||
credentials = oauth_handler.get_credentials()
|
||||
gmail_client = GmailClient(credentials)
|
||||
email_content = gmail_client.get_email_content(email_id)
|
||||
return email_content
|
||||
except ValueError as e:
|
||||
return f"Error: {str(e)}"
|
||||
except Exception as e:
|
||||
return f"An unexpected error occurred: {str(e)}"
|
||||
|
||||
@tool
|
||||
def todays_schedule(self) -> str:
|
||||
"""List the events on the user's Google Calendar for today"""
|
||||
try:
|
||||
file_paths = get_file_paths()
|
||||
oauth_handler = GoogleOAuthHandler(file_paths["CLIENT_SECRETS_FILE"], file_paths["TOKEN_FILE"], SCOPES)
|
||||
credentials = oauth_handler.get_credentials()
|
||||
calendar_client = GoogleCalendarClient(credentials)
|
||||
schedule = calendar_client.list_events_for_today()
|
||||
return schedule
|
||||
except ValueError as e:
|
||||
return f"Error: {str(e)}"
|
||||
except Exception as e:
|
||||
return f"An unexpected error occurred: {str(e)}"
|
||||
|
||||
@tool
|
||||
def list_calendars(self) -> str:
|
||||
"""List the calendars in the user's Google Calendar"""
|
||||
try:
|
||||
file_paths = get_file_paths()
|
||||
oauth_handler = GoogleOAuthHandler(file_paths["CLIENT_SECRETS_FILE"], file_paths["TOKEN_FILE"], SCOPES)
|
||||
credentials = oauth_handler.get_credentials()
|
||||
calendar_client = GoogleCalendarClient(credentials)
|
||||
calendars = calendar_client.list_calendars()
|
||||
return calendars
|
||||
except ValueError as e:
|
||||
return f"Error: {str(e)}"
|
||||
except Exception as e:
|
||||
return f"An unexpected error occurred: {str(e)}"
|
||||
3
src/goose/toolkit/prompts/google_workspace.jinja
Normal file
3
src/goose/toolkit/prompts/google_workspace.jinja
Normal file
@@ -0,0 +1,3 @@
|
||||
When asked about your email use the list_emails tool.
|
||||
When asked for today's schedule used the todays_schedule tool.
|
||||
Please always list the email ID in your responses.
|
||||
129
src/goose/tools/gmail_client.py
Normal file
129
src/goose/tools/gmail_client.py
Normal file
@@ -0,0 +1,129 @@
|
||||
import base64
|
||||
from datetime import datetime
|
||||
|
||||
from googleapiclient.discovery import build
|
||||
|
||||
|
||||
class GmailClient:
|
||||
def __init__(self, credentials: dict) -> None:
|
||||
self.service = build("gmail", "v1", credentials=credentials)
|
||||
|
||||
def list_emails(self, max_results: int = 10) -> str:
|
||||
"""List the emails in the user's Gmail inbox"""
|
||||
try:
|
||||
results = self.service.users().messages().list(userId="me", maxResults=max_results).execute()
|
||||
messages = results.get("messages", [])
|
||||
|
||||
if not messages:
|
||||
return "No messages found."
|
||||
else:
|
||||
output = "Recent emails:\n"
|
||||
for message in messages:
|
||||
msg = self.service.users().messages().get(userId="me", id=message["id"]).execute()
|
||||
subject = next(
|
||||
(header["value"] for header in msg["payload"]["headers"] if header["name"] == "Subject"),
|
||||
"No subject",
|
||||
)
|
||||
sender = next(
|
||||
(header["value"] for header in msg["payload"]["headers"] if header["name"] == "From"),
|
||||
"Unknown sender",
|
||||
)
|
||||
output += f"ID: {message['id']}\nFrom: {sender}\nSubject: {subject}\n\n"
|
||||
return output
|
||||
except Exception as e:
|
||||
return f"Error listing emails: {str(e)}"
|
||||
|
||||
def get_email_content(self, email_id: str) -> str:
|
||||
"""Get the contents of an email by its ID"""
|
||||
try:
|
||||
message = self.service.users().messages().get(userId="me", id=email_id, format="full").execute()
|
||||
|
||||
headers = message["payload"]["headers"]
|
||||
subject = next((header["value"] for header in headers if header["name"] == "Subject"), "No subject")
|
||||
sender = next((header["value"] for header in headers if header["name"] == "From"), "Unknown sender")
|
||||
|
||||
if "parts" in message["payload"]:
|
||||
parts = message["payload"]["parts"]
|
||||
body = next((part["body"]["data"] for part in parts if part["mimeType"] == "text/plain"), None)
|
||||
else:
|
||||
body = message["payload"]["body"]["data"]
|
||||
|
||||
if body:
|
||||
decoded_body = base64.urlsafe_b64decode(body.encode("ASCII")).decode("utf-8")
|
||||
else:
|
||||
decoded_body = "No plain text content found in the email."
|
||||
|
||||
return f"From: {sender}\nSubject: {subject}\n\nBody:\n{decoded_body}"
|
||||
except Exception as e:
|
||||
return f"Error retrieving email: {str(e)}"
|
||||
|
||||
def _format_email_date(self, date_str: str) -> str:
|
||||
try:
|
||||
date_obj = datetime.fromtimestamp(int(date_str) / 1000.0)
|
||||
return date_obj.strftime("%Y-%m-%d %H:%M:%S")
|
||||
except Exception:
|
||||
return date_str
|
||||
|
||||
def _get_email_content(self, msg_id: str) -> dict:
|
||||
try:
|
||||
message = self.service.users().messages().get(userId="me", id=msg_id, format="full").execute()
|
||||
|
||||
headers = message["payload"]["headers"]
|
||||
subject = next((h["value"] for h in headers if h["name"].lower() == "subject"), "No Subject")
|
||||
from_header = next((h["value"] for h in headers if h["name"].lower() == "from"), "Unknown Sender")
|
||||
date = self._format_email_date(message["internalDate"])
|
||||
|
||||
# Get email body
|
||||
if "parts" in message["payload"]:
|
||||
parts = message["payload"]["parts"]
|
||||
body = ""
|
||||
for part in parts:
|
||||
if part["mimeType"] == "text/plain":
|
||||
if "data" in part["body"]:
|
||||
body += base64.urlsafe_b64decode(part["body"]["data"].encode("ASCII")).decode("utf-8")
|
||||
else:
|
||||
if "data" in message["payload"]["body"]:
|
||||
# NOTE: Trunace the body to 100 characters.
|
||||
# TODO: Add ability to look up specific emails.
|
||||
body = base64.urlsafe_b64decode(message["payload"]["body"]["data"].encode("ASCII")).decode("utf-8")[
|
||||
0:100
|
||||
]
|
||||
else:
|
||||
body = "No content"
|
||||
|
||||
return {"subject": subject, "from": from_header, "date": date, "body": body}
|
||||
except Exception as e:
|
||||
return {"error": f"Error fetching email content: {str(e)}"}
|
||||
|
||||
# def list_emails(self, max_results: int = 10, output_format: str = "text") -> str:
|
||||
# try:
|
||||
# results = self.service.users().messages().list(userId="me", maxResults=max_results).execute()
|
||||
# messages = results.get("messages", [])
|
||||
|
||||
# if not messages:
|
||||
# return "No emails found."
|
||||
|
||||
# emails = []
|
||||
# for message in messages:
|
||||
# email_content = self._get_email_content(message["id"])
|
||||
# emails.append(email_content)
|
||||
|
||||
# if output_format == "json":
|
||||
# return json.dumps(emails, indent=2)
|
||||
|
||||
# # Format as text
|
||||
# text_output = []
|
||||
# for email in emails:
|
||||
# text_output.append(f"\nSubject: {email['subject']}")
|
||||
# text_output.append(f"From: {email['from']}")
|
||||
# text_output.append(f"Date: {email['date']}")
|
||||
# text_output.append("\nBody:")
|
||||
# text_output.append(email["body"])
|
||||
# text_output.append("\n" + "=" * 50)
|
||||
|
||||
# return "\n".join(text_output)
|
||||
|
||||
# except HttpError as error:
|
||||
# raise ValueError(f"Error accessing Gmail: {str(error)}")
|
||||
# except HttpError as error:
|
||||
# raise ValueError(f"Error accessing Gmail: {str(error)}")
|
||||
50
src/goose/tools/google_calendar_client.py
Normal file
50
src/goose/tools/google_calendar_client.py
Normal file
@@ -0,0 +1,50 @@
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Any, Dict, List
|
||||
from zoneinfo import ZoneInfo
|
||||
|
||||
from googleapiclient.discovery import build
|
||||
from googleapiclient.errors import HttpError
|
||||
|
||||
|
||||
class GoogleCalendarClient:
|
||||
def __init__(self, credentials: dict) -> None:
|
||||
self.creds = credentials
|
||||
self.service = build("calendar", "v3", credentials=credentials)
|
||||
|
||||
def list_calendars(self) -> List[Dict[str, Any]]:
|
||||
try:
|
||||
calendars_result = self.service.calendarList().list().execute()
|
||||
calendars = calendars_result.get("items", [])
|
||||
return calendars
|
||||
except HttpError as error:
|
||||
print(f"An error occurred: {error}")
|
||||
return []
|
||||
|
||||
def list_events_for_today(self) -> List[Dict[str, Any]]:
|
||||
try:
|
||||
# Get the start and end of the current day in UTC
|
||||
now = datetime.now(ZoneInfo("UTC"))
|
||||
start_of_day = now.replace(hour=0, minute=0, second=0, microsecond=0)
|
||||
end_of_day = start_of_day + timedelta(days=1)
|
||||
|
||||
# Convert to RFC3339 format
|
||||
time_min = start_of_day.isoformat()
|
||||
time_max = end_of_day.isoformat()
|
||||
|
||||
# Call the Calendar API
|
||||
events_result = (
|
||||
self.service.events()
|
||||
.list(calendarId="primary", timeMin=time_min, timeMax=time_max, singleEvents=True, orderBy="startTime")
|
||||
.execute()
|
||||
)
|
||||
events = events_result.get("items", [])
|
||||
|
||||
if not events:
|
||||
print("No events found for today.")
|
||||
return []
|
||||
|
||||
return events
|
||||
|
||||
except HttpError as error:
|
||||
print(f"An error occurred: {error}")
|
||||
return []
|
||||
146
src/goose/tools/google_oauth_handler.py
Normal file
146
src/goose/tools/google_oauth_handler.py
Normal file
@@ -0,0 +1,146 @@
|
||||
import json
|
||||
import os
|
||||
import urllib.parse
|
||||
import webbrowser
|
||||
from http.server import BaseHTTPRequestHandler, HTTPServer
|
||||
from threading import Thread
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
import google_auth_oauthlib.flow
|
||||
from google.oauth2.credentials import Credentials
|
||||
|
||||
REDIRECT_PORT = 8000
|
||||
|
||||
|
||||
class OAuthConfig:
|
||||
def __init__(self, client_secrets_file: str, token_file: str, scopes: List[str]) -> None:
|
||||
self.client_secrets_file: str = client_secrets_file
|
||||
self.token_file: str = token_file
|
||||
self.scopes: List[str] = scopes
|
||||
self.auth_success_message: str = """
|
||||
<html>
|
||||
<body>
|
||||
<h1>Authentication Successful!</h1>
|
||||
<p>You can now close this window and return to the terminal.</p>
|
||||
</body>
|
||||
</html>
|
||||
"""
|
||||
|
||||
|
||||
class OAuthCallbackHandler(BaseHTTPRequestHandler):
|
||||
def __init__(self, *args: Any, state: Optional[str] = None, **kwargs: Any) -> None: # noqa: ANN401
|
||||
self.state: Optional[str] = state
|
||||
self.credentials: Optional[Credentials] = None
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
def do_GET(self) -> None: # noqa: N802
|
||||
query_components = urllib.parse.parse_qs(urllib.parse.urlparse(self.path).query)
|
||||
|
||||
received_state = query_components.get("state", [""])[0]
|
||||
if received_state != self.state:
|
||||
self.send_error(400, "State mismatch. Possible CSRF attack.")
|
||||
return
|
||||
|
||||
code = query_components.get("code", [""])[0]
|
||||
if not code:
|
||||
self.send_error(400, "No authorization code received.")
|
||||
return
|
||||
|
||||
try:
|
||||
flow = google_auth_oauthlib.flow.Flow.from_client_secrets_file(
|
||||
self.server.oauth_config.client_secrets_file,
|
||||
scopes=self.server.oauth_config.scopes,
|
||||
state=received_state,
|
||||
)
|
||||
|
||||
flow.redirect_uri = f"http://localhost:{self.server.server_port}/auth/google/callback/"
|
||||
flow.fetch_token(code=code)
|
||||
|
||||
credentials_dict = credentials_to_dict(flow.credentials)
|
||||
with open(self.server.oauth_config.token_file, "w") as token_file:
|
||||
json.dump(credentials_dict, token_file)
|
||||
|
||||
self.send_response(200)
|
||||
self.send_header("Content-type", "text/html")
|
||||
self.end_headers()
|
||||
self.wfile.write(self.server.oauth_config.auth_success_message.encode())
|
||||
|
||||
self.server.credentials = flow.credentials
|
||||
|
||||
except Exception as e:
|
||||
self.send_error(500, f"Error exchanging authorization code: {str(e)}")
|
||||
|
||||
def log_message(self, format: str, *args: Any) -> None: # noqa: ANN401
|
||||
pass
|
||||
|
||||
|
||||
def credentials_to_dict(credentials: Credentials) -> Dict[str, Any]:
|
||||
return {
|
||||
"token": credentials.token,
|
||||
"refresh_token": credentials.refresh_token,
|
||||
"token_uri": credentials.token_uri,
|
||||
"client_id": credentials.client_id,
|
||||
"client_secret": credentials.client_secret,
|
||||
"scopes": credentials.scopes,
|
||||
}
|
||||
|
||||
|
||||
class GoogleOAuthHandler:
|
||||
def __init__(self, client_secrets_file: str, token_file: str, scopes: List[str]) -> None:
|
||||
self.oauth_config: OAuthConfig = OAuthConfig(client_secrets_file, token_file, scopes)
|
||||
|
||||
def get_credentials(self) -> Credentials:
|
||||
if os.path.exists(self.oauth_config.token_file):
|
||||
with open(self.oauth_config.token_file, "r") as token_file:
|
||||
creds_dict = json.load(token_file)
|
||||
return Credentials(
|
||||
token=creds_dict["token"],
|
||||
refresh_token=creds_dict["refresh_token"],
|
||||
token_uri=creds_dict["token_uri"],
|
||||
client_id=creds_dict["client_id"],
|
||||
client_secret=creds_dict["client_secret"],
|
||||
scopes=creds_dict["scopes"],
|
||||
)
|
||||
|
||||
return self._authenticate_user()
|
||||
|
||||
def _save_token(self, credentials_dict: Dict[str, Any]) -> None:
|
||||
os.makedirs(os.path.dirname(self.oauth_config.token_file), exist_ok=True)
|
||||
with open(self.oauth_config.token_file, "w") as token_file:
|
||||
json.dump(credentials_dict, token_file)
|
||||
|
||||
def _authenticate_user(self) -> Credentials:
|
||||
port = REDIRECT_PORT
|
||||
redirect_uri = f"http://localhost:{port}/auth/google/callback/"
|
||||
|
||||
flow = google_auth_oauthlib.flow.Flow.from_client_secrets_file(
|
||||
self.oauth_config.client_secrets_file, scopes=self.oauth_config.scopes
|
||||
)
|
||||
flow.redirect_uri = redirect_uri
|
||||
|
||||
auth_url, state = flow.authorization_url(access_type="offline", include_granted_scopes="true", prompt="consent")
|
||||
|
||||
server_address = ("", port)
|
||||
httpd = HTTPServer(server_address, lambda *args, **kwargs: OAuthCallbackHandler(*args, state=state, **kwargs))
|
||||
httpd.oauth_config = self.oauth_config
|
||||
httpd.credentials = None
|
||||
|
||||
server_thread = Thread(target=httpd.serve_forever)
|
||||
server_thread.daemon = True
|
||||
server_thread.start()
|
||||
|
||||
print(f"Listening on port {port}")
|
||||
print("Opening browser for authentication...")
|
||||
webbrowser.open(auth_url)
|
||||
|
||||
print("Waiting for authentication...")
|
||||
while httpd.credentials is None:
|
||||
pass
|
||||
|
||||
httpd.shutdown()
|
||||
server_thread.join()
|
||||
|
||||
credentials = httpd.credentials
|
||||
self._save_token(credentials_to_dict(credentials))
|
||||
|
||||
return credentials
|
||||
135
tests/toolkit/test_google_workspace.py
Normal file
135
tests/toolkit/test_google_workspace.py
Normal file
@@ -0,0 +1,135 @@
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from goose.toolkit.google_workspace import GoogleWorkspace
|
||||
from goose.tools.google_oauth_handler import GoogleOAuthHandler
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def google_workspace_toolkit():
|
||||
return GoogleWorkspace(notifier=MagicMock())
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_credentials():
|
||||
mock_creds = MagicMock()
|
||||
mock_creds.token = "mock_token"
|
||||
return mock_creds
|
||||
|
||||
|
||||
def test_google_workspace_init(google_workspace_toolkit):
|
||||
assert isinstance(google_workspace_toolkit, GoogleWorkspace)
|
||||
|
||||
|
||||
@patch.object(GoogleOAuthHandler, "get_credentials")
|
||||
def test_login(mock_get_credentials, google_workspace_toolkit, mock_credentials):
|
||||
mock_get_credentials.return_value = mock_credentials
|
||||
result = google_workspace_toolkit.login()
|
||||
assert "Successfully authenticated with Google!" in result
|
||||
assert "Access token: mock_tok..." in result
|
||||
|
||||
|
||||
@patch.object(GoogleOAuthHandler, "get_credentials")
|
||||
def test_login_error(mock_get_credentials, google_workspace_toolkit):
|
||||
mock_get_credentials.side_effect = ValueError("Test error")
|
||||
result = google_workspace_toolkit.login()
|
||||
assert "Error: Test error" in result
|
||||
|
||||
|
||||
@patch("goose.toolkit.google_workspace.get_file_paths")
|
||||
def test_file_paths(mock_get_file_paths):
|
||||
mock_get_file_paths.return_value = {
|
||||
"CLIENT_SECRETS_FILE": "/mock/home/path/.config/goose/google_credentials.json",
|
||||
"TOKEN_FILE": "/mock/home/path/.config/goose/google_oauth_token.json",
|
||||
}
|
||||
from goose.toolkit.google_workspace import get_file_paths
|
||||
|
||||
file_paths = get_file_paths()
|
||||
assert file_paths["CLIENT_SECRETS_FILE"] == "/mock/home/path/.config/goose/google_credentials.json"
|
||||
assert file_paths["TOKEN_FILE"] == "/mock/home/path/.config/goose/google_oauth_token.json"
|
||||
|
||||
|
||||
def test_list_emails(mocker, google_workspace_toolkit):
|
||||
# Mock get_file_paths
|
||||
mock_get_file_paths = mocker.patch("goose.toolkit.google_workspace.get_file_paths")
|
||||
mock_get_file_paths.return_value = {
|
||||
"CLIENT_SECRETS_FILE": "/mock/home/path/.config/goose/google_credentials.json",
|
||||
"TOKEN_FILE": "/mock/home/path/.config/goose/google_oauth_token.json",
|
||||
}
|
||||
|
||||
# Mock GoogleOAuthHandler
|
||||
mock_google_oauth_handler = mocker.patch("goose.toolkit.google_workspace.GoogleOAuthHandler")
|
||||
mock_credentials = mocker.MagicMock()
|
||||
mock_google_oauth_handler.return_value.get_credentials.return_value = mock_credentials
|
||||
|
||||
# Mock GmailClient
|
||||
mock_gmail_client = mocker.patch("goose.toolkit.google_workspace.GmailClient")
|
||||
mock_gmail_client.return_value.list_emails.return_value = "mock_emails"
|
||||
|
||||
# Call the method
|
||||
result = google_workspace_toolkit.list_emails()
|
||||
|
||||
# Assertions
|
||||
assert result == "mock_emails"
|
||||
mock_get_file_paths.assert_called_once()
|
||||
mock_google_oauth_handler.assert_called_once_with(
|
||||
"/mock/home/path/.config/goose/google_credentials.json",
|
||||
"/mock/home/path/.config/goose/google_oauth_token.json",
|
||||
["https://www.googleapis.com/auth/gmail.readonly", "https://www.googleapis.com/auth/calendar.readonly"],
|
||||
)
|
||||
mock_google_oauth_handler.return_value.get_credentials.assert_called_once()
|
||||
mock_gmail_client.assert_called_once_with(mock_credentials)
|
||||
mock_gmail_client.return_value.list_emails.assert_called_once()
|
||||
|
||||
|
||||
def test_todays_schedule(mocker, google_workspace_toolkit):
|
||||
mock_calendar_client = mocker.Mock()
|
||||
mock_calendar_client.list_events_for_today.return_value = [
|
||||
{
|
||||
"summary": "Test Event 1",
|
||||
"start": {"dateTime": "2023-05-01T09:00:00"},
|
||||
"end": {"dateTime": "2023-05-01T10:00:00"},
|
||||
},
|
||||
{
|
||||
"summary": "Test Event 2",
|
||||
"start": {"dateTime": "2023-05-01T14:00:00"},
|
||||
"end": {"dateTime": "2023-05-01T15:00:00"},
|
||||
},
|
||||
]
|
||||
mocker.patch("goose.toolkit.google_workspace.GoogleCalendarClient", return_value=mock_calendar_client)
|
||||
mocker.patch(
|
||||
"goose.toolkit.google_workspace.get_file_paths",
|
||||
return_value={"CLIENT_SECRETS_FILE": "mock_path", "TOKEN_FILE": "mock_path"},
|
||||
)
|
||||
mocker.patch("goose.toolkit.google_workspace.GoogleOAuthHandler")
|
||||
|
||||
result = google_workspace_toolkit.todays_schedule()
|
||||
|
||||
assert isinstance(result, list)
|
||||
assert len(result) == 2
|
||||
assert result[0]["summary"] == "Test Event 1"
|
||||
assert result[1]["summary"] == "Test Event 2"
|
||||
|
||||
|
||||
def test_list_calendars(mocker, google_workspace_toolkit):
|
||||
mock_calendar_client = mocker.Mock()
|
||||
mock_calendar_client.list_calendars.return_value = [
|
||||
{"summary": "Calendar 1", "id": "calendar1@example.com"},
|
||||
{"summary": "Calendar 2", "id": "calendar2@example.com"},
|
||||
]
|
||||
mocker.patch("goose.toolkit.google_workspace.GoogleCalendarClient", return_value=mock_calendar_client)
|
||||
mocker.patch(
|
||||
"goose.toolkit.google_workspace.get_file_paths",
|
||||
return_value={"CLIENT_SECRETS_FILE": "mock_path", "TOKEN_FILE": "mock_path"},
|
||||
)
|
||||
mocker.patch("goose.toolkit.google_workspace.GoogleOAuthHandler")
|
||||
|
||||
result = google_workspace_toolkit.list_calendars()
|
||||
|
||||
assert isinstance(result, list)
|
||||
assert len(result) == 2
|
||||
assert result[0]["summary"] == "Calendar 1"
|
||||
assert result[1]["summary"] == "Calendar 2"
|
||||
assert result[0]["id"] == "calendar1@example.com"
|
||||
assert result[1]["id"] == "calendar2@example.com"
|
||||
Reference in New Issue
Block a user