mirror of
https://github.com/hwchase17/langchain
synced 2024-11-02 09:40:22 +00:00
f9f5626ca4
**Description:** Bugfix: Langchain_community's GitHub Api wrapper throws a TypeError when searching for issues and/or PRs (the `search_issues_and_prs` method). This is because PyGithub's PageinatedList type does not support the len() method. See https://github.com/PyGithub/PyGithub/issues/1476 ![image](https://github.com/langchain-ai/langchain/assets/8849021/57390b11-ed41-4f48-ba50-f3028610789c) **Dependencies:** None **Twitter handle**: @ChrisKeoghNZ I haven't registered an issue as it would take me longer to fill the template out than to make the fix, but I'm happy to if that's deemed essential. I've added a simple integration test to cover this as there were no existing unit tests and it was going to be tricky to set them up. Co-authored-by: Chris Keogh <chris.keogh@xero.com>
855 lines
32 KiB
Python
855 lines
32 KiB
Python
"""Util that calls GitHub."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
from typing import TYPE_CHECKING, Any, Dict, List, Optional
|
|
|
|
import requests
|
|
from langchain_core.pydantic_v1 import BaseModel, Extra, root_validator
|
|
from langchain_core.utils import get_from_dict_or_env
|
|
|
|
if TYPE_CHECKING:
|
|
from github.Issue import Issue
|
|
from github.PullRequest import PullRequest
|
|
|
|
|
|
def _import_tiktoken() -> Any:
|
|
"""Import tiktoken."""
|
|
try:
|
|
import tiktoken
|
|
except ImportError:
|
|
raise ImportError(
|
|
"tiktoken is not installed. "
|
|
"Please install it with `pip install tiktoken`"
|
|
)
|
|
return tiktoken
|
|
|
|
|
|
class GitHubAPIWrapper(BaseModel):
|
|
"""Wrapper for GitHub API."""
|
|
|
|
github: Any #: :meta private:
|
|
github_repo_instance: Any #: :meta private:
|
|
github_repository: Optional[str] = None
|
|
github_app_id: Optional[str] = None
|
|
github_app_private_key: Optional[str] = None
|
|
active_branch: Optional[str] = None
|
|
github_base_branch: Optional[str] = None
|
|
|
|
class Config:
|
|
"""Configuration for this pydantic object."""
|
|
|
|
extra = Extra.forbid
|
|
|
|
@root_validator()
|
|
def validate_environment(cls, values: Dict) -> Dict:
|
|
"""Validate that api key and python package exists in environment."""
|
|
github_repository = get_from_dict_or_env(
|
|
values, "github_repository", "GITHUB_REPOSITORY"
|
|
)
|
|
|
|
github_app_id = get_from_dict_or_env(values, "github_app_id", "GITHUB_APP_ID")
|
|
|
|
github_app_private_key = get_from_dict_or_env(
|
|
values, "github_app_private_key", "GITHUB_APP_PRIVATE_KEY"
|
|
)
|
|
|
|
try:
|
|
from github import Auth, GithubIntegration
|
|
|
|
except ImportError:
|
|
raise ImportError(
|
|
"PyGithub is not installed. "
|
|
"Please install it with `pip install PyGithub`"
|
|
)
|
|
|
|
try:
|
|
# interpret the key as a file path
|
|
# fallback to interpreting as the key itself
|
|
with open(github_app_private_key, "r") as f:
|
|
private_key = f.read()
|
|
except Exception:
|
|
private_key = github_app_private_key
|
|
|
|
auth = Auth.AppAuth(
|
|
github_app_id,
|
|
private_key,
|
|
)
|
|
gi = GithubIntegration(auth=auth)
|
|
installation = gi.get_installations()
|
|
if not installation:
|
|
raise ValueError(
|
|
f"Please make sure to install the created github app with id "
|
|
f"{github_app_id} on the repo: {github_repository}"
|
|
"More instructions can be found at "
|
|
"https://docs.github.com/en/apps/using-"
|
|
"github-apps/installing-your-own-github-app"
|
|
)
|
|
try:
|
|
installation = installation[0]
|
|
except ValueError as e:
|
|
raise ValueError(
|
|
"Please make sure to give correct github parameters "
|
|
f"Error message: {e}"
|
|
)
|
|
# create a GitHub instance:
|
|
g = installation.get_github_for_installation()
|
|
repo = g.get_repo(github_repository)
|
|
|
|
github_base_branch = get_from_dict_or_env(
|
|
values,
|
|
"github_base_branch",
|
|
"GITHUB_BASE_BRANCH",
|
|
default=repo.default_branch,
|
|
)
|
|
|
|
active_branch = get_from_dict_or_env(
|
|
values,
|
|
"active_branch",
|
|
"ACTIVE_BRANCH",
|
|
default=repo.default_branch,
|
|
)
|
|
|
|
values["github"] = g
|
|
values["github_repo_instance"] = repo
|
|
values["github_repository"] = github_repository
|
|
values["github_app_id"] = github_app_id
|
|
values["github_app_private_key"] = github_app_private_key
|
|
values["active_branch"] = active_branch
|
|
values["github_base_branch"] = github_base_branch
|
|
|
|
return values
|
|
|
|
def parse_issues(self, issues: List[Issue]) -> List[dict]:
|
|
"""
|
|
Extracts title and number from each Issue and puts them in a dictionary
|
|
Parameters:
|
|
issues(List[Issue]): A list of Github Issue objects
|
|
Returns:
|
|
List[dict]: A dictionary of issue titles and numbers
|
|
"""
|
|
parsed = []
|
|
for issue in issues:
|
|
title = issue.title
|
|
number = issue.number
|
|
opened_by = issue.user.login if issue.user else None
|
|
issue_dict = {"title": title, "number": number}
|
|
if opened_by is not None:
|
|
issue_dict["opened_by"] = opened_by
|
|
parsed.append(issue_dict)
|
|
return parsed
|
|
|
|
def parse_pull_requests(self, pull_requests: List[PullRequest]) -> List[dict]:
|
|
"""
|
|
Extracts title and number from each Issue and puts them in a dictionary
|
|
Parameters:
|
|
issues(List[Issue]): A list of Github Issue objects
|
|
Returns:
|
|
List[dict]: A dictionary of issue titles and numbers
|
|
"""
|
|
parsed = []
|
|
for pr in pull_requests:
|
|
parsed.append(
|
|
{
|
|
"title": pr.title,
|
|
"number": pr.number,
|
|
"commits": str(pr.commits),
|
|
"comments": str(pr.comments),
|
|
}
|
|
)
|
|
return parsed
|
|
|
|
def get_issues(self) -> str:
|
|
"""
|
|
Fetches all open issues from the repo excluding pull requests
|
|
|
|
Returns:
|
|
str: A plaintext report containing the number of issues
|
|
and each issue's title and number.
|
|
"""
|
|
issues = self.github_repo_instance.get_issues(state="open")
|
|
# Filter out pull requests (part of GH issues object)
|
|
issues = [issue for issue in issues if not issue.pull_request]
|
|
if issues:
|
|
parsed_issues = self.parse_issues(issues)
|
|
parsed_issues_str = (
|
|
"Found " + str(len(parsed_issues)) + " issues:\n" + str(parsed_issues)
|
|
)
|
|
return parsed_issues_str
|
|
else:
|
|
return "No open issues available"
|
|
|
|
def list_open_pull_requests(self) -> str:
|
|
"""
|
|
Fetches all open PRs from the repo
|
|
|
|
Returns:
|
|
str: A plaintext report containing the number of PRs
|
|
and each PR's title and number.
|
|
"""
|
|
# issues = self.github_repo_instance.get_issues(state="open")
|
|
pull_requests = self.github_repo_instance.get_pulls(state="open")
|
|
if pull_requests.totalCount > 0:
|
|
parsed_prs = self.parse_pull_requests(pull_requests)
|
|
parsed_prs_str = (
|
|
"Found " + str(len(parsed_prs)) + " pull requests:\n" + str(parsed_prs)
|
|
)
|
|
return parsed_prs_str
|
|
else:
|
|
return "No open pull requests available"
|
|
|
|
def list_files_in_main_branch(self) -> str:
|
|
"""
|
|
Fetches all files in the main branch of the repo.
|
|
|
|
Returns:
|
|
str: A plaintext report containing the paths and names of the files.
|
|
"""
|
|
files: List[str] = []
|
|
try:
|
|
contents = self.github_repo_instance.get_contents(
|
|
"", ref=self.github_base_branch
|
|
)
|
|
for content in contents:
|
|
if content.type == "dir":
|
|
files.extend(self.get_files_from_directory(content.path))
|
|
else:
|
|
files.append(content.path)
|
|
|
|
if files:
|
|
files_str = "\n".join(files)
|
|
return f"Found {len(files)} files in the main branch:\n{files_str}"
|
|
else:
|
|
return "No files found in the main branch"
|
|
except Exception as e:
|
|
return str(e)
|
|
|
|
def set_active_branch(self, branch_name: str) -> str:
|
|
"""Equivalent to `git checkout branch_name` for this Agent.
|
|
Clones formatting from Github.
|
|
|
|
Returns an Error (as a string) if branch doesn't exist.
|
|
"""
|
|
curr_branches = [
|
|
branch.name for branch in self.github_repo_instance.get_branches()
|
|
]
|
|
if branch_name in curr_branches:
|
|
self.active_branch = branch_name
|
|
return f"Switched to branch `{branch_name}`"
|
|
else:
|
|
return (
|
|
f"Error {branch_name} does not exist,"
|
|
f"in repo with current branches: {str(curr_branches)}"
|
|
)
|
|
|
|
def list_branches_in_repo(self) -> str:
|
|
"""
|
|
Fetches a list of all branches in the repository.
|
|
|
|
Returns:
|
|
str: A plaintext report containing the names of the branches.
|
|
"""
|
|
try:
|
|
branches = [
|
|
branch.name for branch in self.github_repo_instance.get_branches()
|
|
]
|
|
if branches:
|
|
branches_str = "\n".join(branches)
|
|
return (
|
|
f"Found {len(branches)} branches in the repository:"
|
|
f"\n{branches_str}"
|
|
)
|
|
else:
|
|
return "No branches found in the repository"
|
|
except Exception as e:
|
|
return str(e)
|
|
|
|
def create_branch(self, proposed_branch_name: str) -> str:
|
|
"""
|
|
Create a new branch, and set it as the active bot branch.
|
|
Equivalent to `git switch -c proposed_branch_name`
|
|
If the proposed branch already exists, we append _v1 then _v2...
|
|
until a unique name is found.
|
|
|
|
Returns:
|
|
str: A plaintext success message.
|
|
"""
|
|
from github import GithubException
|
|
|
|
i = 0
|
|
new_branch_name = proposed_branch_name
|
|
base_branch = self.github_repo_instance.get_branch(
|
|
self.github_repo_instance.default_branch
|
|
)
|
|
for i in range(1000):
|
|
try:
|
|
self.github_repo_instance.create_git_ref(
|
|
ref=f"refs/heads/{new_branch_name}", sha=base_branch.commit.sha
|
|
)
|
|
self.active_branch = new_branch_name
|
|
return (
|
|
f"Branch '{new_branch_name}' "
|
|
"created successfully, and set as current active branch."
|
|
)
|
|
except GithubException as e:
|
|
if e.status == 422 and "Reference already exists" in e.data["message"]:
|
|
i += 1
|
|
new_branch_name = f"{proposed_branch_name}_v{i}"
|
|
else:
|
|
# Handle any other exceptions
|
|
print(f"Failed to create branch. Error: {e}") # noqa: T201
|
|
raise Exception(
|
|
"Unable to create branch name from proposed_branch_name: "
|
|
f"{proposed_branch_name}"
|
|
)
|
|
return (
|
|
"Unable to create branch. "
|
|
"At least 1000 branches exist with named derived from "
|
|
f"proposed_branch_name: `{proposed_branch_name}`"
|
|
)
|
|
|
|
def list_files_in_bot_branch(self) -> str:
|
|
"""
|
|
Fetches all files in the active branch of the repo,
|
|
the branch the bot uses to make changes.
|
|
|
|
Returns:
|
|
str: A plaintext list containing the the filepaths in the branch.
|
|
"""
|
|
files: List[str] = []
|
|
try:
|
|
contents = self.github_repo_instance.get_contents(
|
|
"", ref=self.active_branch
|
|
)
|
|
for content in contents:
|
|
if content.type == "dir":
|
|
files.extend(self.get_files_from_directory(content.path))
|
|
else:
|
|
files.append(content.path)
|
|
|
|
if files:
|
|
files_str = "\n".join(files)
|
|
return (
|
|
f"Found {len(files)} files in branch `{self.active_branch}`:\n"
|
|
f"{files_str}"
|
|
)
|
|
else:
|
|
return f"No files found in branch: `{self.active_branch}`"
|
|
except Exception as e:
|
|
return f"Error: {e}"
|
|
|
|
def get_files_from_directory(self, directory_path: str) -> str:
|
|
"""
|
|
Recursively fetches files from a directory in the repo.
|
|
|
|
Parameters:
|
|
directory_path (str): Path to the directory
|
|
|
|
Returns:
|
|
str: List of file paths, or an error message.
|
|
"""
|
|
from github import GithubException
|
|
|
|
files: List[str] = []
|
|
try:
|
|
contents = self.github_repo_instance.get_contents(
|
|
directory_path, ref=self.active_branch
|
|
)
|
|
except GithubException as e:
|
|
return f"Error: status code {e.status}, {e.message}"
|
|
|
|
for content in contents:
|
|
if content.type == "dir":
|
|
files.extend(self.get_files_from_directory(content.path))
|
|
else:
|
|
files.append(content.path)
|
|
return str(files)
|
|
|
|
def get_issue(self, issue_number: int) -> Dict[str, Any]:
|
|
"""
|
|
Fetches a specific issue and its first 10 comments
|
|
Parameters:
|
|
issue_number(int): The number for the github issue
|
|
Returns:
|
|
dict: A dictionary containing the issue's title,
|
|
body, comments as a string, and the username of the user
|
|
who opened the issue
|
|
"""
|
|
issue = self.github_repo_instance.get_issue(number=issue_number)
|
|
page = 0
|
|
comments: List[dict] = []
|
|
while len(comments) <= 10:
|
|
comments_page = issue.get_comments().get_page(page)
|
|
if len(comments_page) == 0:
|
|
break
|
|
for comment in comments_page:
|
|
comments.append({"body": comment.body, "user": comment.user.login})
|
|
page += 1
|
|
|
|
opened_by = None
|
|
if issue.user and issue.user.login:
|
|
opened_by = issue.user.login
|
|
|
|
return {
|
|
"number": issue_number,
|
|
"title": issue.title,
|
|
"body": issue.body,
|
|
"comments": str(comments),
|
|
"opened_by": str(opened_by),
|
|
}
|
|
|
|
def list_pull_request_files(self, pr_number: int) -> List[Dict[str, Any]]:
|
|
"""Fetches the full text of all files in a PR. Truncates after first 3k tokens.
|
|
# TODO: Enhancement to summarize files with ctags if they're getting long.
|
|
|
|
Args:
|
|
pr_number(int): The number of the pull request on Github
|
|
|
|
Returns:
|
|
dict: A dictionary containing the issue's title,
|
|
body, and comments as a string
|
|
"""
|
|
tiktoken = _import_tiktoken()
|
|
MAX_TOKENS_FOR_FILES = 3_000
|
|
pr_files = []
|
|
pr = self.github_repo_instance.get_pull(number=int(pr_number))
|
|
total_tokens = 0
|
|
page = 0
|
|
while True: # or while (total_tokens + tiktoken()) < MAX_TOKENS_FOR_FILES:
|
|
files_page = pr.get_files().get_page(page)
|
|
if len(files_page) == 0:
|
|
break
|
|
for file in files_page:
|
|
try:
|
|
file_metadata_response = requests.get(file.contents_url)
|
|
if file_metadata_response.status_code == 200:
|
|
download_url = json.loads(file_metadata_response.text)[
|
|
"download_url"
|
|
]
|
|
else:
|
|
print(f"Failed to download file: {file.contents_url}, skipping") # noqa: T201
|
|
continue
|
|
|
|
file_content_response = requests.get(download_url)
|
|
if file_content_response.status_code == 200:
|
|
# Save the content as a UTF-8 string
|
|
file_content = file_content_response.text
|
|
else:
|
|
print( # noqa: T201
|
|
"Failed downloading file content "
|
|
f"(Error {file_content_response.status_code}). Skipping"
|
|
)
|
|
continue
|
|
|
|
file_tokens = len(
|
|
tiktoken.get_encoding("cl100k_base").encode(
|
|
file_content + file.filename + "file_name file_contents"
|
|
)
|
|
)
|
|
if (total_tokens + file_tokens) < MAX_TOKENS_FOR_FILES:
|
|
pr_files.append(
|
|
{
|
|
"filename": file.filename,
|
|
"contents": file_content,
|
|
"additions": file.additions,
|
|
"deletions": file.deletions,
|
|
}
|
|
)
|
|
total_tokens += file_tokens
|
|
except Exception as e:
|
|
print(f"Error when reading files from a PR on github. {e}") # noqa: T201
|
|
page += 1
|
|
return pr_files
|
|
|
|
def get_pull_request(self, pr_number: int) -> Dict[str, Any]:
|
|
"""
|
|
Fetches a specific pull request and its first 10 comments,
|
|
limited by max_tokens.
|
|
|
|
Parameters:
|
|
pr_number(int): The number for the Github pull
|
|
max_tokens(int): The maximum number of tokens in the response
|
|
Returns:
|
|
dict: A dictionary containing the pull's title, body,
|
|
and comments as a string
|
|
"""
|
|
max_tokens = 2_000
|
|
pull = self.github_repo_instance.get_pull(number=pr_number)
|
|
total_tokens = 0
|
|
|
|
def get_tokens(text: str) -> int:
|
|
tiktoken = _import_tiktoken()
|
|
return len(tiktoken.get_encoding("cl100k_base").encode(text))
|
|
|
|
def add_to_dict(data_dict: Dict[str, Any], key: str, value: str) -> None:
|
|
nonlocal total_tokens # Declare total_tokens as nonlocal
|
|
tokens = get_tokens(value)
|
|
if total_tokens + tokens <= max_tokens:
|
|
data_dict[key] = value
|
|
total_tokens += tokens # Now this will modify the outer variable
|
|
|
|
response_dict: Dict[str, str] = {}
|
|
add_to_dict(response_dict, "title", pull.title)
|
|
add_to_dict(response_dict, "number", str(pr_number))
|
|
add_to_dict(response_dict, "body", pull.body)
|
|
|
|
comments: List[str] = []
|
|
page = 0
|
|
while len(comments) <= 10:
|
|
comments_page = pull.get_issue_comments().get_page(page)
|
|
if len(comments_page) == 0:
|
|
break
|
|
for comment in comments_page:
|
|
comment_str = str({"body": comment.body, "user": comment.user.login})
|
|
if total_tokens + get_tokens(comment_str) > max_tokens:
|
|
break
|
|
comments.append(comment_str)
|
|
total_tokens += get_tokens(comment_str)
|
|
page += 1
|
|
add_to_dict(response_dict, "comments", str(comments))
|
|
|
|
commits: List[str] = []
|
|
page = 0
|
|
while len(commits) <= 10:
|
|
commits_page = pull.get_commits().get_page(page)
|
|
if len(commits_page) == 0:
|
|
break
|
|
for commit in commits_page:
|
|
commit_str = str({"message": commit.commit.message})
|
|
if total_tokens + get_tokens(commit_str) > max_tokens:
|
|
break
|
|
commits.append(commit_str)
|
|
total_tokens += get_tokens(commit_str)
|
|
page += 1
|
|
add_to_dict(response_dict, "commits", str(commits))
|
|
return response_dict
|
|
|
|
def create_pull_request(self, pr_query: str) -> str:
|
|
"""
|
|
Makes a pull request from the bot's branch to the base branch
|
|
Parameters:
|
|
pr_query(str): a string which contains the PR title
|
|
and the PR body. The title is the first line
|
|
in the string, and the body are the rest of the string.
|
|
For example, "Updated README\nmade changes to add info"
|
|
Returns:
|
|
str: A success or failure message
|
|
"""
|
|
if self.github_base_branch == self.active_branch:
|
|
return """Cannot make a pull request because
|
|
commits are already in the main or master branch."""
|
|
else:
|
|
try:
|
|
title = pr_query.split("\n")[0]
|
|
body = pr_query[len(title) + 2 :]
|
|
pr = self.github_repo_instance.create_pull(
|
|
title=title,
|
|
body=body,
|
|
head=self.active_branch,
|
|
base=self.github_base_branch,
|
|
)
|
|
return f"Successfully created PR number {pr.number}"
|
|
except Exception as e:
|
|
return "Unable to make pull request due to error:\n" + str(e)
|
|
|
|
def comment_on_issue(self, comment_query: str) -> str:
|
|
"""
|
|
Adds a comment to a github issue
|
|
Parameters:
|
|
comment_query(str): a string which contains the issue number,
|
|
two newlines, and the comment.
|
|
for example: "1\n\nWorking on it now"
|
|
adds the comment "working on it now" to issue 1
|
|
Returns:
|
|
str: A success or failure message
|
|
"""
|
|
issue_number = int(comment_query.split("\n\n")[0])
|
|
comment = comment_query[len(str(issue_number)) + 2 :]
|
|
try:
|
|
issue = self.github_repo_instance.get_issue(number=issue_number)
|
|
issue.create_comment(comment)
|
|
return "Commented on issue " + str(issue_number)
|
|
except Exception as e:
|
|
return "Unable to make comment due to error:\n" + str(e)
|
|
|
|
def create_file(self, file_query: str) -> str:
|
|
"""
|
|
Creates a new file on the Github repo
|
|
Parameters:
|
|
file_query(str): a string which contains the file path
|
|
and the file contents. The file path is the first line
|
|
in the string, and the contents are the rest of the string.
|
|
For example, "hello_world.md\n# Hello World!"
|
|
Returns:
|
|
str: A success or failure message
|
|
"""
|
|
if self.active_branch == self.github_base_branch:
|
|
return (
|
|
"You're attempting to commit to the directly to the"
|
|
f"{self.github_base_branch} branch, which is protected. "
|
|
"Please create a new branch and try again."
|
|
)
|
|
|
|
file_path = file_query.split("\n")[0]
|
|
file_contents = file_query[len(file_path) + 2 :]
|
|
|
|
try:
|
|
try:
|
|
file = self.github_repo_instance.get_contents(
|
|
file_path, ref=self.active_branch
|
|
)
|
|
if file:
|
|
return (
|
|
f"File already exists at `{file_path}` "
|
|
f"on branch `{self.active_branch}`. You must use "
|
|
"`update_file` to modify it."
|
|
)
|
|
except Exception:
|
|
# expected behavior, file shouldn't exist yet
|
|
pass
|
|
|
|
self.github_repo_instance.create_file(
|
|
path=file_path,
|
|
message="Create " + file_path,
|
|
content=file_contents,
|
|
branch=self.active_branch,
|
|
)
|
|
return "Created file " + file_path
|
|
except Exception as e:
|
|
return "Unable to make file due to error:\n" + str(e)
|
|
|
|
def read_file(self, file_path: str) -> str:
|
|
"""
|
|
Read a file from this agent's branch, defined by self.active_branch,
|
|
which supports PR branches.
|
|
Parameters:
|
|
file_path(str): the file path
|
|
Returns:
|
|
str: The file decoded as a string, or an error message if not found
|
|
"""
|
|
try:
|
|
file = self.github_repo_instance.get_contents(
|
|
file_path, ref=self.active_branch
|
|
)
|
|
return file.decoded_content.decode("utf-8")
|
|
except Exception as e:
|
|
return (
|
|
f"File not found `{file_path}` on branch"
|
|
f"`{self.active_branch}`. Error: {str(e)}"
|
|
)
|
|
|
|
def update_file(self, file_query: str) -> str:
|
|
"""
|
|
Updates a file with new content.
|
|
Parameters:
|
|
file_query(str): Contains the file path and the file contents.
|
|
The old file contents is wrapped in OLD <<<< and >>>> OLD
|
|
The new file contents is wrapped in NEW <<<< and >>>> NEW
|
|
For example:
|
|
/test/hello.txt
|
|
OLD <<<<
|
|
Hello Earth!
|
|
>>>> OLD
|
|
NEW <<<<
|
|
Hello Mars!
|
|
>>>> NEW
|
|
Returns:
|
|
A success or failure message
|
|
"""
|
|
if self.active_branch == self.github_base_branch:
|
|
return (
|
|
"You're attempting to commit to the directly"
|
|
f"to the {self.github_base_branch} branch, which is protected. "
|
|
"Please create a new branch and try again."
|
|
)
|
|
try:
|
|
file_path: str = file_query.split("\n")[0]
|
|
old_file_contents = (
|
|
file_query.split("OLD <<<<")[1].split(">>>> OLD")[0].strip()
|
|
)
|
|
new_file_contents = (
|
|
file_query.split("NEW <<<<")[1].split(">>>> NEW")[0].strip()
|
|
)
|
|
|
|
file_content = self.read_file(file_path)
|
|
updated_file_content = file_content.replace(
|
|
old_file_contents, new_file_contents
|
|
)
|
|
|
|
if file_content == updated_file_content:
|
|
return (
|
|
"File content was not updated because old content was not found."
|
|
"It may be helpful to use the read_file action to get "
|
|
"the current file contents."
|
|
)
|
|
|
|
self.github_repo_instance.update_file(
|
|
path=file_path,
|
|
message="Update " + str(file_path),
|
|
content=updated_file_content,
|
|
branch=self.active_branch,
|
|
sha=self.github_repo_instance.get_contents(
|
|
file_path, ref=self.active_branch
|
|
).sha,
|
|
)
|
|
return "Updated file " + str(file_path)
|
|
except Exception as e:
|
|
return "Unable to update file due to error:\n" + str(e)
|
|
|
|
def delete_file(self, file_path: str) -> str:
|
|
"""
|
|
Deletes a file from the repo
|
|
Parameters:
|
|
file_path(str): Where the file is
|
|
Returns:
|
|
str: Success or failure message
|
|
"""
|
|
if self.active_branch == self.github_base_branch:
|
|
return (
|
|
"You're attempting to commit to the directly"
|
|
f"to the {self.github_base_branch} branch, which is protected. "
|
|
"Please create a new branch and try again."
|
|
)
|
|
try:
|
|
self.github_repo_instance.delete_file(
|
|
path=file_path,
|
|
message="Delete " + file_path,
|
|
branch=self.active_branch,
|
|
sha=self.github_repo_instance.get_contents(
|
|
file_path, ref=self.active_branch
|
|
).sha,
|
|
)
|
|
return "Deleted file " + file_path
|
|
except Exception as e:
|
|
return "Unable to delete file due to error:\n" + str(e)
|
|
|
|
def search_issues_and_prs(self, query: str) -> str:
|
|
"""
|
|
Searches issues and pull requests in the repository.
|
|
|
|
Parameters:
|
|
query(str): The search query
|
|
|
|
Returns:
|
|
str: A string containing the first 5 issues and pull requests
|
|
"""
|
|
search_result = self.github.search_issues(query, repo=self.github_repository)
|
|
max_items = min(5, search_result.totalCount)
|
|
results = [f"Top {max_items} results:"]
|
|
for issue in search_result[:max_items]:
|
|
results.append(
|
|
f"Title: {issue.title}, Number: {issue.number}, State: {issue.state}"
|
|
)
|
|
return "\n".join(results)
|
|
|
|
def search_code(self, query: str) -> str:
|
|
"""
|
|
Searches code in the repository.
|
|
# Todo: limit total tokens returned...
|
|
|
|
Parameters:
|
|
query(str): The search query
|
|
|
|
Returns:
|
|
str: A string containing, at most, the top 5 search results
|
|
"""
|
|
search_result = self.github.search_code(
|
|
query=query, repo=self.github_repository
|
|
)
|
|
if search_result.totalCount == 0:
|
|
return "0 results found."
|
|
max_results = min(5, search_result.totalCount)
|
|
results = [f"Showing top {max_results} of {search_result.totalCount} results:"]
|
|
count = 0
|
|
for code in search_result:
|
|
if count >= max_results:
|
|
break
|
|
# Get the file content using the PyGithub get_contents method
|
|
file_content = self.github_repo_instance.get_contents(
|
|
code.path, ref=self.active_branch
|
|
).decoded_content.decode()
|
|
results.append(
|
|
f"Filepath: `{code.path}`\nFile contents: "
|
|
f"{file_content}\n<END OF FILE>"
|
|
)
|
|
count += 1
|
|
return "\n".join(results)
|
|
|
|
def create_review_request(self, reviewer_username: str) -> str:
|
|
"""
|
|
Creates a review request on *THE* open pull request
|
|
that matches the current active_branch.
|
|
|
|
Parameters:
|
|
reviewer_username(str): The username of the person who is being requested
|
|
|
|
Returns:
|
|
str: A message confirming the creation of the review request
|
|
"""
|
|
pull_requests = self.github_repo_instance.get_pulls(
|
|
state="open", sort="created"
|
|
)
|
|
# find PR against active_branch
|
|
pr = next(
|
|
(pr for pr in pull_requests if pr.head.ref == self.active_branch), None
|
|
)
|
|
if pr is None:
|
|
return (
|
|
"No open pull request found for the "
|
|
f"current branch `{self.active_branch}`"
|
|
)
|
|
|
|
try:
|
|
pr.create_review_request(reviewers=[reviewer_username])
|
|
return (
|
|
f"Review request created for user {reviewer_username} "
|
|
f"on PR #{pr.number}"
|
|
)
|
|
except Exception as e:
|
|
return f"Failed to create a review request with error {e}"
|
|
|
|
def run(self, mode: str, query: str) -> str:
|
|
if mode == "get_issue":
|
|
return json.dumps(self.get_issue(int(query)))
|
|
elif mode == "get_pull_request":
|
|
return json.dumps(self.get_pull_request(int(query)))
|
|
elif mode == "list_pull_request_files":
|
|
return json.dumps(self.list_pull_request_files(int(query)))
|
|
elif mode == "get_issues":
|
|
return self.get_issues()
|
|
elif mode == "comment_on_issue":
|
|
return self.comment_on_issue(query)
|
|
elif mode == "create_file":
|
|
return self.create_file(query)
|
|
elif mode == "create_pull_request":
|
|
return self.create_pull_request(query)
|
|
elif mode == "read_file":
|
|
return self.read_file(query)
|
|
elif mode == "update_file":
|
|
return self.update_file(query)
|
|
elif mode == "delete_file":
|
|
return self.delete_file(query)
|
|
elif mode == "list_open_pull_requests":
|
|
return self.list_open_pull_requests()
|
|
elif mode == "list_files_in_main_branch":
|
|
return self.list_files_in_main_branch()
|
|
elif mode == "list_files_in_bot_branch":
|
|
return self.list_files_in_bot_branch()
|
|
elif mode == "list_branches_in_repo":
|
|
return self.list_branches_in_repo()
|
|
elif mode == "set_active_branch":
|
|
return self.set_active_branch(query)
|
|
elif mode == "create_branch":
|
|
return self.create_branch(query)
|
|
elif mode == "get_files_from_directory":
|
|
return self.get_files_from_directory(query)
|
|
elif mode == "search_issues_and_prs":
|
|
return self.search_issues_and_prs(query)
|
|
elif mode == "search_code":
|
|
return self.search_code(query)
|
|
elif mode == "create_review_request":
|
|
return self.create_review_request(query)
|
|
else:
|
|
raise ValueError("Invalid mode" + mode)
|