@ -2,6 +2,7 @@
import json
import json
from typing import Any , Dict , List , Optional
from typing import Any , Dict , List , Optional
from github . Issue import Issue
from pydantic import BaseModel , Extra , root_validator
from pydantic import BaseModel , Extra , root_validator
from langchain . utils import get_from_dict_or_env
from langchain . utils import get_from_dict_or_env
@ -41,6 +42,7 @@ class GitHubAPIWrapper(BaseModel):
try :
try :
from github import Auth , GithubIntegration
from github import Auth , GithubIntegration
except ImportError :
except ImportError :
raise ImportError (
raise ImportError (
" PyGithub is not installed. "
" PyGithub is not installed. "
@ -69,70 +71,101 @@ class GitHubAPIWrapper(BaseModel):
return values
return values
def parse_issues ( self , issues : List [ dict ] ) - > List [ dict ] :
def parse_issues ( self , issues : List [ Issue ] ) - > List [ dict ] :
"""
Extracts title and number from each Issue and puts them in a dictionary
Parameters :
issues ( List [ Issue ] ) : A list of Github Issue objects
Returns :
List [ dict ] : A dictionary of issue titles and numbers
"""
parsed = [ ]
parsed = [ ]
for issue in issues :
for issue in issues :
title = issue [ " title " ]
title = issue . title
number = issue [ " number " ]
number = issue . number
parsed . append ( { " title " : title , " number " : number } )
parsed . append ( { " title " : title , " number " : number } )
return parsed
return parsed
def get_issues ( self ) - > str :
def get_issues ( self ) - > str :
"""
Fetches all open issues from the repo
Returns :
str : A plaintext report containing the number of issues
and each issue ' s title and number.
"""
issues = self . github_repo_instance . get_issues ( state = " open " )
issues = self . github_repo_instance . get_issues ( state = " open " )
if issues . totalCount > 0 :
parsed_issues = self . parse_issues ( issues )
parsed_issues = self . parse_issues ( issues )
parsed_issues_str = (
parsed_issues_str = (
" Found " + str ( len ( parsed_issues ) ) + " issues: \n " + str ( parsed_issues )
" Found " + str ( len ( parsed_issues ) ) + " issues: \n " + str ( parsed_issues )
)
)
return parsed_issues_str
return parsed_issues_str
else :
return " No open issues available "
def get_issue ( self , issue_number : int ) - > Dict [ str , Any ] :
def get_issue ( self , issue_number : int ) - > Dict [ str , Any ] :
"""
Fetches a specific issue and its first 10 comments
Parameters :
issue_number ( int ) : The number for the github issue
Returns :
dict : A doctionary containing the issue ' s title,
body , and comments as a string
"""
issue = self . github_repo_instance . get_issue ( number = issue_number )
issue = self . github_repo_instance . get_issue ( number = issue_number )
# If there are too many comments
# We can't add them all to context so for now we'll just skip
if issue . get_comments ( ) . totalCount > 10 :
return {
" message " : (
" There are too many comments to add them all to context. "
" Please visit the issue on GitHub to see them all. "
)
}
page = 0
page = 0
comments = [ ]
comments : List [ dict ] = [ ]
while True :
while len ( comments ) < = 10 :
comments_page = issue . get_comments ( ) . get_page ( page )
comments_page = issue . get_comments ( ) . get_page ( page )
if len ( comments_page ) == 0 :
if len ( comments_page ) == 0 :
break
break
for comment in comments_page :
for comment in comments_page :
comments . append (
comments . append ( { " body " : comment . body , " user " : comment . user . login } )
{ " body " : comment [ " body " ] , " user " : comment [ " user " ] [ " login " ] }
)
page + = 1
page + = 1
return {
return {
" title " : issue [ " title " ] ,
" title " : issue . title ,
" body " : issue [ " body " ] ,
" body " : issue . body ,
" comments " : str ( comments ) ,
" comments " : str ( comments ) ,
}
}
def comment_on_issue ( self , comment_query : str ) - > str :
def comment_on_issue ( self , comment_query : str ) - > str :
# comment_query is a string which contains the issue number and the comment
"""
# the issue number is the first word in the string
Adds a comment to a github issue
# the comment is the rest of the string
Parameters :
comment_query ( str ) : a string which contains the issue number ,
two newlines , and the comment .
for example : " 1 \n \n Working on it now "
adds the comment " working on it now " to issue 1
Returns :
str : A success or failure message
"""
issue_number = int ( comment_query . split ( " \n \n " ) [ 0 ] )
issue_number = int ( comment_query . split ( " \n \n " ) [ 0 ] )
comment = comment_query [ len ( str ( issue_number ) ) + 2 : ]
comment = comment_query [ len ( str ( issue_number ) ) + 2 : ]
try :
issue = self . github_repo_instance . get_issue ( number = issue_number )
issue = self . github_repo_instance . get_issue ( number = issue_number )
issue . create_comment ( comment )
issue . create_comment ( comment )
return " Commented on issue " + str ( issue_number )
return " Commented on issue " + str ( issue_number )
except Exception as e :
return " Unable to make comment due to error: \n " + str ( e )
def create_file ( self , file_query : str ) - > str :
def create_file ( self , file_query : str ) - > str :
# file_query is a string which contains the file path and the file contents
"""
# the file path is the first line in the string
Creates a new file on the Github repo
# the file contents is the rest of the string
Parameters :
file_query ( str ) : a string which contains the file path
and the file contents . The file path is the first line
in the string , and the contents are the rest of the string .
For example , " hello_world.md \n # Hello World! "
Returns :
str : A success or failure message
"""
file_path = file_query . split ( " \n " ) [ 0 ]
file_path = file_query . split ( " \n " ) [ 0 ]
file_contents = file_query [ len ( file_path ) + 2 : ]
file_contents = file_query [ len ( file_path ) + 2 : ]
try :
exists = self . github_repo_instance . get_contents ( file_path )
if exists is None :
self . github_repo_instance . create_file (
self . github_repo_instance . create_file (
path = file_path ,
path = file_path ,
message = " Create " + file_path ,
message = " Create " + file_path ,
@ -140,32 +173,48 @@ class GitHubAPIWrapper(BaseModel):
branch = self . github_branch ,
branch = self . github_branch ,
)
)
return " Created file " + file_path
return " Created file " + file_path
else :
return f " File already exists at { file_path } . Use update_file instead "
except Exception as e :
return " Unable to make file due to error: \n " + str ( e )
def read_file ( self , file_path : str ) - > str :
def read_file ( self , file_path : str ) - > str :
# file_path is a string which contains the file path
"""
Reads a file from the github repo
Parameters :
file_path ( str ) : the file path
Returns :
str : The file decoded as a string
"""
file = self . github_repo_instance . get_contents ( file_path )
file = self . github_repo_instance . get_contents ( file_path )
return file . decoded_content . decode ( " utf-8 " )
return file . decoded_content . decode ( " utf-8 " )
def update_file ( self , file_query : str ) - > str :
def update_file ( self , file_query : str ) - > str :
# file_query is a string which contains the file path and the file contents
"""
# the file path is the first line in the string
Updates a file with new content .
# the old file contents is wrapped in OLD <<<< and >>>> OLD
Parameters :
# the new file contents is wrapped in NEW <<<< and >>>> NEW
file_query ( str ) : Contains the file path and the file contents .
The old file contents is wrapped in OLD << << and >> >> OLD
# for example:
The new file contents is wrapped in NEW << << and >> >> NEW
For example :
# /test/test.txt
/ test / hello . txt
# OLD <<<<
OLD << <<
# old contents
Hello Earth !
# >>>> OLD
>> >> OLD
# NEW <<<<
NEW << <<
# new contents
Hello Mars !
# >>>> NEW
>> >> NEW
Returns :
# the old contents will be replaced with the new contents
A success or failure message
"""
try :
file_path = file_query . split ( " \n " ) [ 0 ]
file_path = file_query . split ( " \n " ) [ 0 ]
old_file_contents = file_query . split ( " OLD <<<< " ) [ 1 ] . split ( " >>>> OLD " ) [ 0 ] . strip ( )
old_file_contents = (
new_file_contents = file_query . split ( " NEW <<<< " ) [ 1 ] . split ( " >>>> NEW " ) [ 0 ] . strip ( )
file_query . split ( " OLD <<<< " ) [ 1 ] . split ( " >>>> OLD " ) [ 0 ] . strip ( )
)
new_file_contents = (
file_query . split ( " NEW <<<< " ) [ 1 ] . split ( " >>>> NEW " ) [ 0 ] . strip ( )
)
file_content = self . read_file ( file_path )
file_content = self . read_file ( file_path )
updated_file_content = file_content . replace (
updated_file_content = file_content . replace (
@ -174,7 +223,7 @@ class GitHubAPIWrapper(BaseModel):
if file_content == updated_file_content :
if file_content == updated_file_content :
return (
return (
" File content was not updated because the old content was not found. "
" File content was not updated because old content was not found."
" It may be helpful to use the read_file action to get "
" It may be helpful to use the read_file action to get "
" the current file contents. "
" the current file contents. "
)
)
@ -187,9 +236,18 @@ class GitHubAPIWrapper(BaseModel):
sha = self . github_repo_instance . get_contents ( file_path ) . sha ,
sha = self . github_repo_instance . get_contents ( file_path ) . sha ,
)
)
return " Updated file " + file_path
return " Updated file " + file_path
except Exception as e :
return " Unable to update file due to error: \n " + str ( e )
def delete_file ( self , file_path : str ) - > str :
def delete_file ( self , file_path : str ) - > str :
# file_path is a string which contains the file path
"""
Deletes a file from the repo
Parameters :
file_path ( str ) : Where the file is
Returns :
str : Success or failure message
"""
try :
file = self . github_repo_instance . get_contents ( file_path )
file = self . github_repo_instance . get_contents ( file_path )
self . github_repo_instance . delete_file (
self . github_repo_instance . delete_file (
path = file_path ,
path = file_path ,
@ -198,6 +256,8 @@ class GitHubAPIWrapper(BaseModel):
sha = file . sha ,
sha = file . sha ,
)
)
return " Deleted file " + file_path
return " Deleted file " + file_path
except Exception as e :
return " Unable to delete file due to error: \n " + str ( e )
def run ( self , mode : str , query : str ) - > str :
def run ( self , mode : str , query : str ) - > str :
if mode == " get_issues " :
if mode == " get_issues " :