mirror of
https://github.com/hwchase17/langchain
synced 2024-11-10 01:10:59 +00:00
fea4888e72
- **Description:** The Github error prompt is confused because of JWT enctrypt to somebody not familiar with Github connection method. This PR is to add some useful error prompt to help users troubleshooting. - **Issue:** https://github.com/langchain-ai/langchain/issues/14550#issuecomment-1867445049 - **Dependencies:** None, - **Twitter handle:** None
854 lines
31 KiB
Python
854 lines
31 KiB
Python
"""Util that calls GitHub."""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
from typing import TYPE_CHECKING, Any, Dict, List, Optional
|
|
|
|
import requests
|
|
from langchain_core.pydantic_v1 import BaseModel, Extra, root_validator
|
|
from langchain_core.utils import get_from_dict_or_env
|
|
|
|
if TYPE_CHECKING:
|
|
from github.Issue import Issue
|
|
from github.PullRequest import PullRequest
|
|
|
|
|
|
def _import_tiktoken() -> Any:
|
|
"""Import tiktoken."""
|
|
try:
|
|
import tiktoken
|
|
except ImportError:
|
|
raise ImportError(
|
|
"tiktoken is not installed. "
|
|
"Please install it with `pip install tiktoken`"
|
|
)
|
|
return tiktoken
|
|
|
|
|
|
class GitHubAPIWrapper(BaseModel):
|
|
"""Wrapper for GitHub API."""
|
|
|
|
github: Any #: :meta private:
|
|
github_repo_instance: Any #: :meta private:
|
|
github_repository: Optional[str] = None
|
|
github_app_id: Optional[str] = None
|
|
github_app_private_key: Optional[str] = None
|
|
active_branch: Optional[str] = None
|
|
github_base_branch: Optional[str] = None
|
|
|
|
class Config:
|
|
"""Configuration for this pydantic object."""
|
|
|
|
extra = Extra.forbid
|
|
|
|
@root_validator()
|
|
def validate_environment(cls, values: Dict) -> Dict:
|
|
"""Validate that api key and python package exists in environment."""
|
|
github_repository = get_from_dict_or_env(
|
|
values, "github_repository", "GITHUB_REPOSITORY"
|
|
)
|
|
|
|
github_app_id = get_from_dict_or_env(values, "github_app_id", "GITHUB_APP_ID")
|
|
|
|
github_app_private_key = get_from_dict_or_env(
|
|
values, "github_app_private_key", "GITHUB_APP_PRIVATE_KEY"
|
|
)
|
|
|
|
try:
|
|
from github import Auth, GithubIntegration
|
|
|
|
except ImportError:
|
|
raise ImportError(
|
|
"PyGithub is not installed. "
|
|
"Please install it with `pip install PyGithub`"
|
|
)
|
|
|
|
try:
|
|
# interpret the key as a file path
|
|
# fallback to interpreting as the key itself
|
|
with open(github_app_private_key, "r") as f:
|
|
private_key = f.read()
|
|
except Exception:
|
|
private_key = github_app_private_key
|
|
|
|
auth = Auth.AppAuth(
|
|
github_app_id,
|
|
private_key,
|
|
)
|
|
gi = GithubIntegration(auth=auth)
|
|
installation = gi.get_installations()
|
|
if not installation:
|
|
raise ValueError(
|
|
f"Please make sure to install the created github app with id "
|
|
f"{github_app_id} on the repo: {github_repository}"
|
|
"More instructions can be found at "
|
|
"https://docs.github.com/en/apps/using-"
|
|
"github-apps/installing-your-own-github-app"
|
|
)
|
|
try:
|
|
installation = installation[0]
|
|
except ValueError as e:
|
|
raise ValueError(
|
|
"Please make sure to give correct github parameters "
|
|
f"Error message: {e}"
|
|
)
|
|
# create a GitHub instance:
|
|
g = installation.get_github_for_installation()
|
|
repo = g.get_repo(github_repository)
|
|
|
|
github_base_branch = get_from_dict_or_env(
|
|
values,
|
|
"github_base_branch",
|
|
"GITHUB_BASE_BRANCH",
|
|
default=repo.default_branch,
|
|
)
|
|
|
|
active_branch = get_from_dict_or_env(
|
|
values,
|
|
"active_branch",
|
|
"ACTIVE_BRANCH",
|
|
default=repo.default_branch,
|
|
)
|
|
|
|
values["github"] = g
|
|
values["github_repo_instance"] = repo
|
|
values["github_repository"] = github_repository
|
|
values["github_app_id"] = github_app_id
|
|
values["github_app_private_key"] = github_app_private_key
|
|
values["active_branch"] = active_branch
|
|
values["github_base_branch"] = github_base_branch
|
|
|
|
return values
|
|
|
|
def parse_issues(self, issues: List[Issue]) -> List[dict]:
|
|
"""
|
|
Extracts title and number from each Issue and puts them in a dictionary
|
|
Parameters:
|
|
issues(List[Issue]): A list of Github Issue objects
|
|
Returns:
|
|
List[dict]: A dictionary of issue titles and numbers
|
|
"""
|
|
parsed = []
|
|
for issue in issues:
|
|
title = issue.title
|
|
number = issue.number
|
|
opened_by = issue.user.login if issue.user else None
|
|
issue_dict = {"title": title, "number": number}
|
|
if opened_by is not None:
|
|
issue_dict["opened_by"] = opened_by
|
|
parsed.append(issue_dict)
|
|
return parsed
|
|
|
|
def parse_pull_requests(self, pull_requests: List[PullRequest]) -> List[dict]:
|
|
"""
|
|
Extracts title and number from each Issue and puts them in a dictionary
|
|
Parameters:
|
|
issues(List[Issue]): A list of Github Issue objects
|
|
Returns:
|
|
List[dict]: A dictionary of issue titles and numbers
|
|
"""
|
|
parsed = []
|
|
for pr in pull_requests:
|
|
parsed.append(
|
|
{
|
|
"title": pr.title,
|
|
"number": pr.number,
|
|
"commits": str(pr.commits),
|
|
"comments": str(pr.comments),
|
|
}
|
|
)
|
|
return parsed
|
|
|
|
def get_issues(self) -> str:
|
|
"""
|
|
Fetches all open issues from the repo excluding pull requests
|
|
|
|
Returns:
|
|
str: A plaintext report containing the number of issues
|
|
and each issue's title and number.
|
|
"""
|
|
issues = self.github_repo_instance.get_issues(state="open")
|
|
# Filter out pull requests (part of GH issues object)
|
|
issues = [issue for issue in issues if not issue.pull_request]
|
|
if issues:
|
|
parsed_issues = self.parse_issues(issues)
|
|
parsed_issues_str = (
|
|
"Found " + str(len(parsed_issues)) + " issues:\n" + str(parsed_issues)
|
|
)
|
|
return parsed_issues_str
|
|
else:
|
|
return "No open issues available"
|
|
|
|
def list_open_pull_requests(self) -> str:
|
|
"""
|
|
Fetches all open PRs from the repo
|
|
|
|
Returns:
|
|
str: A plaintext report containing the number of PRs
|
|
and each PR's title and number.
|
|
"""
|
|
# issues = self.github_repo_instance.get_issues(state="open")
|
|
pull_requests = self.github_repo_instance.get_pulls(state="open")
|
|
if pull_requests.totalCount > 0:
|
|
parsed_prs = self.parse_pull_requests(pull_requests)
|
|
parsed_prs_str = (
|
|
"Found " + str(len(parsed_prs)) + " pull requests:\n" + str(parsed_prs)
|
|
)
|
|
return parsed_prs_str
|
|
else:
|
|
return "No open pull requests available"
|
|
|
|
def list_files_in_main_branch(self) -> str:
|
|
"""
|
|
Fetches all files in the main branch of the repo.
|
|
|
|
Returns:
|
|
str: A plaintext report containing the paths and names of the files.
|
|
"""
|
|
files: List[str] = []
|
|
try:
|
|
contents = self.github_repo_instance.get_contents(
|
|
"", ref=self.github_base_branch
|
|
)
|
|
for content in contents:
|
|
if content.type == "dir":
|
|
files.extend(self.get_files_from_directory(content.path))
|
|
else:
|
|
files.append(content.path)
|
|
|
|
if files:
|
|
files_str = "\n".join(files)
|
|
return f"Found {len(files)} files in the main branch:\n{files_str}"
|
|
else:
|
|
return "No files found in the main branch"
|
|
except Exception as e:
|
|
return str(e)
|
|
|
|
def set_active_branch(self, branch_name: str) -> str:
|
|
"""Equivalent to `git checkout branch_name` for this Agent.
|
|
Clones formatting from Github.
|
|
|
|
Returns an Error (as a string) if branch doesn't exist.
|
|
"""
|
|
curr_branches = [
|
|
branch.name for branch in self.github_repo_instance.get_branches()
|
|
]
|
|
if branch_name in curr_branches:
|
|
self.active_branch = branch_name
|
|
return f"Switched to branch `{branch_name}`"
|
|
else:
|
|
return (
|
|
f"Error {branch_name} does not exist,"
|
|
f"in repo with current branches: {str(curr_branches)}"
|
|
)
|
|
|
|
def list_branches_in_repo(self) -> str:
|
|
"""
|
|
Fetches a list of all branches in the repository.
|
|
|
|
Returns:
|
|
str: A plaintext report containing the names of the branches.
|
|
"""
|
|
try:
|
|
branches = [
|
|
branch.name for branch in self.github_repo_instance.get_branches()
|
|
]
|
|
if branches:
|
|
branches_str = "\n".join(branches)
|
|
return (
|
|
f"Found {len(branches)} branches in the repository:"
|
|
f"\n{branches_str}"
|
|
)
|
|
else:
|
|
return "No branches found in the repository"
|
|
except Exception as e:
|
|
return str(e)
|
|
|
|
def create_branch(self, proposed_branch_name: str) -> str:
|
|
"""
|
|
Create a new branch, and set it as the active bot branch.
|
|
Equivalent to `git switch -c proposed_branch_name`
|
|
If the proposed branch already exists, we append _v1 then _v2...
|
|
until a unique name is found.
|
|
|
|
Returns:
|
|
str: A plaintext success message.
|
|
"""
|
|
from github import GithubException
|
|
|
|
i = 0
|
|
new_branch_name = proposed_branch_name
|
|
base_branch = self.github_repo_instance.get_branch(
|
|
self.github_repo_instance.default_branch
|
|
)
|
|
for i in range(1000):
|
|
try:
|
|
self.github_repo_instance.create_git_ref(
|
|
ref=f"refs/heads/{new_branch_name}", sha=base_branch.commit.sha
|
|
)
|
|
self.active_branch = new_branch_name
|
|
return (
|
|
f"Branch '{new_branch_name}' "
|
|
"created successfully, and set as current active branch."
|
|
)
|
|
except GithubException as e:
|
|
if e.status == 422 and "Reference already exists" in e.data["message"]:
|
|
i += 1
|
|
new_branch_name = f"{proposed_branch_name}_v{i}"
|
|
else:
|
|
# Handle any other exceptions
|
|
print(f"Failed to create branch. Error: {e}")
|
|
raise Exception(
|
|
"Unable to create branch name from proposed_branch_name: "
|
|
f"{proposed_branch_name}"
|
|
)
|
|
return (
|
|
"Unable to create branch. "
|
|
"At least 1000 branches exist with named derived from "
|
|
f"proposed_branch_name: `{proposed_branch_name}`"
|
|
)
|
|
|
|
def list_files_in_bot_branch(self) -> str:
|
|
"""
|
|
Fetches all files in the active branch of the repo,
|
|
the branch the bot uses to make changes.
|
|
|
|
Returns:
|
|
str: A plaintext list containing the the filepaths in the branch.
|
|
"""
|
|
files: List[str] = []
|
|
try:
|
|
contents = self.github_repo_instance.get_contents(
|
|
"", ref=self.active_branch
|
|
)
|
|
for content in contents:
|
|
if content.type == "dir":
|
|
files.extend(self.get_files_from_directory(content.path))
|
|
else:
|
|
files.append(content.path)
|
|
|
|
if files:
|
|
files_str = "\n".join(files)
|
|
return (
|
|
f"Found {len(files)} files in branch `{self.active_branch}`:\n"
|
|
f"{files_str}"
|
|
)
|
|
else:
|
|
return f"No files found in branch: `{self.active_branch}`"
|
|
except Exception as e:
|
|
return f"Error: {e}"
|
|
|
|
def get_files_from_directory(self, directory_path: str) -> str:
|
|
"""
|
|
Recursively fetches files from a directory in the repo.
|
|
|
|
Parameters:
|
|
directory_path (str): Path to the directory
|
|
|
|
Returns:
|
|
str: List of file paths, or an error message.
|
|
"""
|
|
from github import GithubException
|
|
|
|
files: List[str] = []
|
|
try:
|
|
contents = self.github_repo_instance.get_contents(
|
|
directory_path, ref=self.active_branch
|
|
)
|
|
except GithubException as e:
|
|
return f"Error: status code {e.status}, {e.message}"
|
|
|
|
for content in contents:
|
|
if content.type == "dir":
|
|
files.extend(self.get_files_from_directory(content.path))
|
|
else:
|
|
files.append(content.path)
|
|
return str(files)
|
|
|
|
def get_issue(self, issue_number: int) -> Dict[str, Any]:
|
|
"""
|
|
Fetches a specific issue and its first 10 comments
|
|
Parameters:
|
|
issue_number(int): The number for the github issue
|
|
Returns:
|
|
dict: A dictionary containing the issue's title,
|
|
body, comments as a string, and the username of the user
|
|
who opened the issue
|
|
"""
|
|
issue = self.github_repo_instance.get_issue(number=issue_number)
|
|
page = 0
|
|
comments: List[dict] = []
|
|
while len(comments) <= 10:
|
|
comments_page = issue.get_comments().get_page(page)
|
|
if len(comments_page) == 0:
|
|
break
|
|
for comment in comments_page:
|
|
comments.append({"body": comment.body, "user": comment.user.login})
|
|
page += 1
|
|
|
|
opened_by = None
|
|
if issue.user and issue.user.login:
|
|
opened_by = issue.user.login
|
|
|
|
return {
|
|
"number": issue_number,
|
|
"title": issue.title,
|
|
"body": issue.body,
|
|
"comments": str(comments),
|
|
"opened_by": str(opened_by),
|
|
}
|
|
|
|
def list_pull_request_files(self, pr_number: int) -> List[Dict[str, Any]]:
|
|
"""Fetches the full text of all files in a PR. Truncates after first 3k tokens.
|
|
# TODO: Enhancement to summarize files with ctags if they're getting long.
|
|
|
|
Args:
|
|
pr_number(int): The number of the pull request on Github
|
|
|
|
Returns:
|
|
dict: A dictionary containing the issue's title,
|
|
body, and comments as a string
|
|
"""
|
|
tiktoken = _import_tiktoken()
|
|
MAX_TOKENS_FOR_FILES = 3_000
|
|
pr_files = []
|
|
pr = self.github_repo_instance.get_pull(number=int(pr_number))
|
|
total_tokens = 0
|
|
page = 0
|
|
while True: # or while (total_tokens + tiktoken()) < MAX_TOKENS_FOR_FILES:
|
|
files_page = pr.get_files().get_page(page)
|
|
if len(files_page) == 0:
|
|
break
|
|
for file in files_page:
|
|
try:
|
|
file_metadata_response = requests.get(file.contents_url)
|
|
if file_metadata_response.status_code == 200:
|
|
download_url = json.loads(file_metadata_response.text)[
|
|
"download_url"
|
|
]
|
|
else:
|
|
print(f"Failed to download file: {file.contents_url}, skipping")
|
|
continue
|
|
|
|
file_content_response = requests.get(download_url)
|
|
if file_content_response.status_code == 200:
|
|
# Save the content as a UTF-8 string
|
|
file_content = file_content_response.text
|
|
else:
|
|
print(
|
|
"Failed downloading file content "
|
|
f"(Error {file_content_response.status_code}). Skipping"
|
|
)
|
|
continue
|
|
|
|
file_tokens = len(
|
|
tiktoken.get_encoding("cl100k_base").encode(
|
|
file_content + file.filename + "file_name file_contents"
|
|
)
|
|
)
|
|
if (total_tokens + file_tokens) < MAX_TOKENS_FOR_FILES:
|
|
pr_files.append(
|
|
{
|
|
"filename": file.filename,
|
|
"contents": file_content,
|
|
"additions": file.additions,
|
|
"deletions": file.deletions,
|
|
}
|
|
)
|
|
total_tokens += file_tokens
|
|
except Exception as e:
|
|
print(f"Error when reading files from a PR on github. {e}")
|
|
page += 1
|
|
return pr_files
|
|
|
|
def get_pull_request(self, pr_number: int) -> Dict[str, Any]:
|
|
"""
|
|
Fetches a specific pull request and its first 10 comments,
|
|
limited by max_tokens.
|
|
|
|
Parameters:
|
|
pr_number(int): The number for the Github pull
|
|
max_tokens(int): The maximum number of tokens in the response
|
|
Returns:
|
|
dict: A dictionary containing the pull's title, body,
|
|
and comments as a string
|
|
"""
|
|
max_tokens = 2_000
|
|
pull = self.github_repo_instance.get_pull(number=pr_number)
|
|
total_tokens = 0
|
|
|
|
def get_tokens(text: str) -> int:
|
|
tiktoken = _import_tiktoken()
|
|
return len(tiktoken.get_encoding("cl100k_base").encode(text))
|
|
|
|
def add_to_dict(data_dict: Dict[str, Any], key: str, value: str) -> None:
|
|
nonlocal total_tokens # Declare total_tokens as nonlocal
|
|
tokens = get_tokens(value)
|
|
if total_tokens + tokens <= max_tokens:
|
|
data_dict[key] = value
|
|
total_tokens += tokens # Now this will modify the outer variable
|
|
|
|
response_dict: Dict[str, str] = {}
|
|
add_to_dict(response_dict, "title", pull.title)
|
|
add_to_dict(response_dict, "number", str(pr_number))
|
|
add_to_dict(response_dict, "body", pull.body)
|
|
|
|
comments: List[str] = []
|
|
page = 0
|
|
while len(comments) <= 10:
|
|
comments_page = pull.get_issue_comments().get_page(page)
|
|
if len(comments_page) == 0:
|
|
break
|
|
for comment in comments_page:
|
|
comment_str = str({"body": comment.body, "user": comment.user.login})
|
|
if total_tokens + get_tokens(comment_str) > max_tokens:
|
|
break
|
|
comments.append(comment_str)
|
|
total_tokens += get_tokens(comment_str)
|
|
page += 1
|
|
add_to_dict(response_dict, "comments", str(comments))
|
|
|
|
commits: List[str] = []
|
|
page = 0
|
|
while len(commits) <= 10:
|
|
commits_page = pull.get_commits().get_page(page)
|
|
if len(commits_page) == 0:
|
|
break
|
|
for commit in commits_page:
|
|
commit_str = str({"message": commit.commit.message})
|
|
if total_tokens + get_tokens(commit_str) > max_tokens:
|
|
break
|
|
commits.append(commit_str)
|
|
total_tokens += get_tokens(commit_str)
|
|
page += 1
|
|
add_to_dict(response_dict, "commits", str(commits))
|
|
return response_dict
|
|
|
|
def create_pull_request(self, pr_query: str) -> str:
|
|
"""
|
|
Makes a pull request from the bot's branch to the base branch
|
|
Parameters:
|
|
pr_query(str): a string which contains the PR title
|
|
and the PR body. The title is the first line
|
|
in the string, and the body are the rest of the string.
|
|
For example, "Updated README\nmade changes to add info"
|
|
Returns:
|
|
str: A success or failure message
|
|
"""
|
|
if self.github_base_branch == self.active_branch:
|
|
return """Cannot make a pull request because
|
|
commits are already in the main or master branch."""
|
|
else:
|
|
try:
|
|
title = pr_query.split("\n")[0]
|
|
body = pr_query[len(title) + 2 :]
|
|
pr = self.github_repo_instance.create_pull(
|
|
title=title,
|
|
body=body,
|
|
head=self.active_branch,
|
|
base=self.github_base_branch,
|
|
)
|
|
return f"Successfully created PR number {pr.number}"
|
|
except Exception as e:
|
|
return "Unable to make pull request due to error:\n" + str(e)
|
|
|
|
def comment_on_issue(self, comment_query: str) -> str:
|
|
"""
|
|
Adds a comment to a github issue
|
|
Parameters:
|
|
comment_query(str): a string which contains the issue number,
|
|
two newlines, and the comment.
|
|
for example: "1\n\nWorking on it now"
|
|
adds the comment "working on it now" to issue 1
|
|
Returns:
|
|
str: A success or failure message
|
|
"""
|
|
issue_number = int(comment_query.split("\n\n")[0])
|
|
comment = comment_query[len(str(issue_number)) + 2 :]
|
|
try:
|
|
issue = self.github_repo_instance.get_issue(number=issue_number)
|
|
issue.create_comment(comment)
|
|
return "Commented on issue " + str(issue_number)
|
|
except Exception as e:
|
|
return "Unable to make comment due to error:\n" + str(e)
|
|
|
|
def create_file(self, file_query: str) -> str:
|
|
"""
|
|
Creates a new file on the Github repo
|
|
Parameters:
|
|
file_query(str): a string which contains the file path
|
|
and the file contents. The file path is the first line
|
|
in the string, and the contents are the rest of the string.
|
|
For example, "hello_world.md\n# Hello World!"
|
|
Returns:
|
|
str: A success or failure message
|
|
"""
|
|
if self.active_branch == self.github_base_branch:
|
|
return (
|
|
"You're attempting to commit to the directly to the"
|
|
f"{self.github_base_branch} branch, which is protected. "
|
|
"Please create a new branch and try again."
|
|
)
|
|
|
|
file_path = file_query.split("\n")[0]
|
|
file_contents = file_query[len(file_path) + 2 :]
|
|
|
|
try:
|
|
try:
|
|
file = self.github_repo_instance.get_contents(
|
|
file_path, ref=self.active_branch
|
|
)
|
|
if file:
|
|
return (
|
|
f"File already exists at `{file_path}` "
|
|
f"on branch `{self.active_branch}`. You must use "
|
|
"`update_file` to modify it."
|
|
)
|
|
except Exception:
|
|
# expected behavior, file shouldn't exist yet
|
|
pass
|
|
|
|
self.github_repo_instance.create_file(
|
|
path=file_path,
|
|
message="Create " + file_path,
|
|
content=file_contents,
|
|
branch=self.active_branch,
|
|
)
|
|
return "Created file " + file_path
|
|
except Exception as e:
|
|
return "Unable to make file due to error:\n" + str(e)
|
|
|
|
def read_file(self, file_path: str) -> str:
|
|
"""
|
|
Read a file from this agent's branch, defined by self.active_branch,
|
|
which supports PR branches.
|
|
Parameters:
|
|
file_path(str): the file path
|
|
Returns:
|
|
str: The file decoded as a string, or an error message if not found
|
|
"""
|
|
try:
|
|
file = self.github_repo_instance.get_contents(
|
|
file_path, ref=self.active_branch
|
|
)
|
|
return file.decoded_content.decode("utf-8")
|
|
except Exception as e:
|
|
return (
|
|
f"File not found `{file_path}` on branch"
|
|
f"`{self.active_branch}`. Error: {str(e)}"
|
|
)
|
|
|
|
def update_file(self, file_query: str) -> str:
|
|
"""
|
|
Updates a file with new content.
|
|
Parameters:
|
|
file_query(str): Contains the file path and the file contents.
|
|
The old file contents is wrapped in OLD <<<< and >>>> OLD
|
|
The new file contents is wrapped in NEW <<<< and >>>> NEW
|
|
For example:
|
|
/test/hello.txt
|
|
OLD <<<<
|
|
Hello Earth!
|
|
>>>> OLD
|
|
NEW <<<<
|
|
Hello Mars!
|
|
>>>> NEW
|
|
Returns:
|
|
A success or failure message
|
|
"""
|
|
if self.active_branch == self.github_base_branch:
|
|
return (
|
|
"You're attempting to commit to the directly"
|
|
f"to the {self.github_base_branch} branch, which is protected. "
|
|
"Please create a new branch and try again."
|
|
)
|
|
try:
|
|
file_path: str = file_query.split("\n")[0]
|
|
old_file_contents = (
|
|
file_query.split("OLD <<<<")[1].split(">>>> OLD")[0].strip()
|
|
)
|
|
new_file_contents = (
|
|
file_query.split("NEW <<<<")[1].split(">>>> NEW")[0].strip()
|
|
)
|
|
|
|
file_content = self.read_file(file_path)
|
|
updated_file_content = file_content.replace(
|
|
old_file_contents, new_file_contents
|
|
)
|
|
|
|
if file_content == updated_file_content:
|
|
return (
|
|
"File content was not updated because old content was not found."
|
|
"It may be helpful to use the read_file action to get "
|
|
"the current file contents."
|
|
)
|
|
|
|
self.github_repo_instance.update_file(
|
|
path=file_path,
|
|
message="Update " + str(file_path),
|
|
content=updated_file_content,
|
|
branch=self.active_branch,
|
|
sha=self.github_repo_instance.get_contents(
|
|
file_path, ref=self.active_branch
|
|
).sha,
|
|
)
|
|
return "Updated file " + str(file_path)
|
|
except Exception as e:
|
|
return "Unable to update file due to error:\n" + str(e)
|
|
|
|
def delete_file(self, file_path: str) -> str:
|
|
"""
|
|
Deletes a file from the repo
|
|
Parameters:
|
|
file_path(str): Where the file is
|
|
Returns:
|
|
str: Success or failure message
|
|
"""
|
|
if self.active_branch == self.github_base_branch:
|
|
return (
|
|
"You're attempting to commit to the directly"
|
|
f"to the {self.github_base_branch} branch, which is protected. "
|
|
"Please create a new branch and try again."
|
|
)
|
|
try:
|
|
self.github_repo_instance.delete_file(
|
|
path=file_path,
|
|
message="Delete " + file_path,
|
|
branch=self.active_branch,
|
|
sha=self.github_repo_instance.get_contents(
|
|
file_path, ref=self.active_branch
|
|
).sha,
|
|
)
|
|
return "Deleted file " + file_path
|
|
except Exception as e:
|
|
return "Unable to delete file due to error:\n" + str(e)
|
|
|
|
def search_issues_and_prs(self, query: str) -> str:
|
|
"""
|
|
Searches issues and pull requests in the repository.
|
|
|
|
Parameters:
|
|
query(str): The search query
|
|
|
|
Returns:
|
|
str: A string containing the first 5 issues and pull requests
|
|
"""
|
|
search_result = self.github.search_issues(query, repo=self.github_repository)
|
|
max_items = min(5, len(search_result))
|
|
results = [f"Top {max_items} results:"]
|
|
for issue in search_result[:max_items]:
|
|
results.append(
|
|
f"Title: {issue.title}, Number: {issue.number}, State: {issue.state}"
|
|
)
|
|
return "\n".join(results)
|
|
|
|
def search_code(self, query: str) -> str:
|
|
"""
|
|
Searches code in the repository.
|
|
# Todo: limit total tokens returned...
|
|
|
|
Parameters:
|
|
query(str): The search query
|
|
|
|
Returns:
|
|
str: A string containing, at most, the top 5 search results
|
|
"""
|
|
search_result = self.github.search_code(
|
|
query=query, repo=self.github_repository
|
|
)
|
|
if search_result.totalCount == 0:
|
|
return "0 results found."
|
|
max_results = min(5, search_result.totalCount)
|
|
results = [f"Showing top {max_results} of {search_result.totalCount} results:"]
|
|
count = 0
|
|
for code in search_result:
|
|
if count >= max_results:
|
|
break
|
|
# Get the file content using the PyGithub get_contents method
|
|
file_content = self.github_repo_instance.get_contents(
|
|
code.path, ref=self.active_branch
|
|
).decoded_content.decode()
|
|
results.append(
|
|
f"Filepath: `{code.path}`\nFile contents: "
|
|
f"{file_content}\n<END OF FILE>"
|
|
)
|
|
count += 1
|
|
return "\n".join(results)
|
|
|
|
def create_review_request(self, reviewer_username: str) -> str:
|
|
"""
|
|
Creates a review request on *THE* open pull request
|
|
that matches the current active_branch.
|
|
|
|
Parameters:
|
|
reviewer_username(str): The username of the person who is being requested
|
|
|
|
Returns:
|
|
str: A message confirming the creation of the review request
|
|
"""
|
|
pull_requests = self.github_repo_instance.get_pulls(
|
|
state="open", sort="created"
|
|
)
|
|
# find PR against active_branch
|
|
pr = next(
|
|
(pr for pr in pull_requests if pr.head.ref == self.active_branch), None
|
|
)
|
|
if pr is None:
|
|
return (
|
|
"No open pull request found for the "
|
|
f"current branch `{self.active_branch}`"
|
|
)
|
|
|
|
try:
|
|
pr.create_review_request(reviewers=[reviewer_username])
|
|
return (
|
|
f"Review request created for user {reviewer_username} "
|
|
f"on PR #{pr.number}"
|
|
)
|
|
except Exception as e:
|
|
return f"Failed to create a review request with error {e}"
|
|
|
|
def run(self, mode: str, query: str) -> str:
|
|
if mode == "get_issue":
|
|
return json.dumps(self.get_issue(int(query)))
|
|
elif mode == "get_pull_request":
|
|
return json.dumps(self.get_pull_request(int(query)))
|
|
elif mode == "list_pull_request_files":
|
|
return json.dumps(self.list_pull_request_files(int(query)))
|
|
elif mode == "get_issues":
|
|
return self.get_issues()
|
|
elif mode == "comment_on_issue":
|
|
return self.comment_on_issue(query)
|
|
elif mode == "create_file":
|
|
return self.create_file(query)
|
|
elif mode == "create_pull_request":
|
|
return self.create_pull_request(query)
|
|
elif mode == "read_file":
|
|
return self.read_file(query)
|
|
elif mode == "update_file":
|
|
return self.update_file(query)
|
|
elif mode == "delete_file":
|
|
return self.delete_file(query)
|
|
elif mode == "list_open_pull_requests":
|
|
return self.list_open_pull_requests()
|
|
elif mode == "list_files_in_main_branch":
|
|
return self.list_files_in_main_branch()
|
|
elif mode == "list_files_in_bot_branch":
|
|
return self.list_files_in_bot_branch()
|
|
elif mode == "list_branches_in_repo":
|
|
return self.list_branches_in_repo()
|
|
elif mode == "set_active_branch":
|
|
return self.set_active_branch(query)
|
|
elif mode == "create_branch":
|
|
return self.create_branch(query)
|
|
elif mode == "get_files_from_directory":
|
|
return self.get_files_from_directory(query)
|
|
elif mode == "search_issues_and_prs":
|
|
return self.search_issues_and_prs(query)
|
|
elif mode == "search_code":
|
|
return self.search_code(query)
|
|
elif mode == "create_review_request":
|
|
return self.create_review_request(query)
|
|
else:
|
|
raise ValueError("Invalid mode" + mode)
|