2023-03-17 15:55:15 +00:00
from env import settings
2023-03-21 12:49:47 +00:00
from typing import Dict
2023-03-17 15:55:15 +00:00
import requests
from llama_index . readers . database import DatabaseReader
from llama_index import GPTSimpleVectorIndex
2023-03-19 06:17:36 +00:00
from bs4 import BeautifulSoup
2023-03-21 12:49:47 +00:00
from langchain . agents . agent import AgentExecutor
2023-03-17 15:55:15 +00:00
2023-03-18 09:20:18 +00:00
import subprocess
2023-03-22 00:34:52 +00:00
from tools . base import tool , BaseToolSet
from logger import logger
2023-03-18 09:20:18 +00:00
2023-03-18 12:26:19 +00:00
class Terminal ( BaseToolSet ) :
@tool (
2023-03-18 09:20:18 +00:00
name = " Terminal " ,
description = " Executes commands in a terminal. "
2023-03-21 04:41:10 +00:00
" You can install packages with pip, apt, etc. "
2023-03-18 09:20:18 +00:00
" Input should be valid commands, "
2023-03-20 08:27:20 +00:00
" and the output will be any output from running that command. " ,
2023-03-18 09:20:18 +00:00
)
2023-03-22 00:34:52 +00:00
def execute ( self , commands : str ) - > str :
2023-03-18 09:20:18 +00:00
""" Run commands and return final output. """
try :
output = subprocess . run (
commands ,
shell = True ,
stdout = subprocess . PIPE ,
stderr = subprocess . STDOUT ,
) . stdout . decode ( )
except Exception as e :
2023-03-20 08:27:20 +00:00
output = str ( e )
2023-03-22 00:34:52 +00:00
logger . debug (
2023-03-20 08:27:20 +00:00
f " \n Processed Terminal, Input Commands: { commands } "
f " Output Answer: { output } "
)
2023-03-18 09:20:18 +00:00
return output
2023-03-17 15:55:15 +00:00
2023-03-21 12:20:15 +00:00
class CodeEditor ( BaseToolSet ) :
2023-03-22 00:34:52 +00:00
@tool (
name = " CodeEditor.READ " ,
description = " Read and understand code. "
" Input should be filename and line number group. ex. test.py,1-10 "
" and the output will be code. " ,
)
def read ( self , inputs : str ) - > str :
filename , line = inputs . split ( " , " )
line = line . split ( " - " )
if len ( line ) == 1 :
line = int ( line [ 0 ] )
else :
line = [ int ( i ) for i in line ]
try :
with open ( filename , " r " ) as f :
code = f . readlines ( )
if isinstance ( line , int ) :
code = code [ line - 1 ]
else :
code = " " . join ( code [ line [ 0 ] - 1 : line [ 1 ] ] )
output = code
except Exception as e :
output = str ( e )
logger . debug (
f " \n Processed CodeEditor.READ, Input Commands: { inputs } "
f " Output Answer: { output } "
)
return output
2023-03-21 12:20:15 +00:00
@tool (
name = " CodeEditor.WRITE " ,
2023-03-22 00:34:52 +00:00
description = " Write code to create a new tool. "
" You must check the file ' s contents before writing. This tool only supports append code. "
" If the code is completed, use the Terminal tool to execute it, if not, append the code through the CodeEditor tool. "
" Input should be filename and code. "
" ex. test.py \n print( ' hello world ' ) \n "
" and the output will be last 3 line. " ,
2023-03-21 12:20:15 +00:00
)
def write ( self , inputs : str ) - > str :
2023-03-22 00:34:52 +00:00
filename , code = inputs . split ( " \n " , 1 )
2023-03-21 12:20:15 +00:00
try :
with open ( filename , " a " ) as f :
f . write ( code )
2023-03-22 00:34:52 +00:00
output = " Last 3 line was: \n " + " \n " . join ( code . split ( " \n " ) [ - 3 : ] )
2023-03-21 12:20:15 +00:00
except Exception as e :
2023-03-22 00:34:52 +00:00
output = str ( e )
logger . debug (
2023-03-21 12:20:15 +00:00
f " \n Processed CodeEditor, Input Codes: { code } " f " Output Answer: { output } "
)
return output
2023-03-22 00:34:52 +00:00
@tool (
name = " CodeEditor.PATCH " ,
description = " Correct the error throught the code patch if an error occurs. "
" Input should be list of filename, line number, new line (Be sure to consider indentations.) Seperated by -||-. "
" ex. \" test.py-||-1-||-print( ' hello world ' ) \n test.py-||-2-||-print( ' hello world ' ) \n \" "
" and the output will be success or error message. " ,
)
def patch ( self , patches : str ) - > str :
for patch in patches . split ( " \n " ) :
filename , line_number , new_line = patch . split ( " -||- " ) # TODO: fix this
try :
with open ( filename , " r " ) as f :
lines = f . readlines ( )
lines [ int ( line_number ) - 1 ] = new_line + " \n "
with open ( filename , " w " ) as f :
f . writelines ( lines )
output = " success "
except Exception as e :
output = str ( e )
logger . debug (
f " \n Processed CodeEditor, Input Patch: { patches } "
f " Output Answer: { output } "
)
return output
@tool (
name = " CodeEditor.DELETE " ,
description = " Delete code in file for a new start. "
" Input should be filename. "
" ex. test.py "
" Output will be success or error message. " ,
)
def delete ( self , inputs : str ) - > str :
filename = inputs
try :
with open ( filename , " w " ) as f :
f . write ( " " )
output = " success "
except Exception as e :
output = str ( e )
logger . debug (
f " \n Processed CodeEditor, Input filename: { inputs } "
f " Output Answer: { output } "
)
return output
2023-03-21 12:20:15 +00:00
2023-03-18 12:26:19 +00:00
class RequestsGet ( BaseToolSet ) :
@tool (
2023-03-17 15:55:15 +00:00
name = " requests_get " ,
description = " A portal to the internet. "
" Use this when you need to get specific content from a website. "
" Input should be a url (i.e. https://www.google.com). "
" The output will be the text response of the GET request. " ,
)
2023-03-22 00:34:52 +00:00
def get ( self , url : str ) - > str :
2023-03-17 15:55:15 +00:00
""" Run the tool. """
2023-03-19 06:17:36 +00:00
html = requests . get ( url ) . text
soup = BeautifulSoup ( html )
non_readable_tags = soup . find_all (
[ " script " , " style " , " header " , " footer " , " form " ]
)
2023-03-17 15:55:15 +00:00
2023-03-19 06:17:36 +00:00
for non_readable_tag in non_readable_tags :
non_readable_tag . extract ( )
content = soup . get_text ( " \n " , strip = True )
if len ( content ) > 300 :
content = content [ : 300 ] + " ... "
2023-03-20 08:27:20 +00:00
2023-03-22 00:34:52 +00:00
logger . debug (
2023-03-20 08:27:20 +00:00
f " \n Processed RequestsGet, Input Url: { url } " f " Output Contents: { content } "
)
2023-03-19 06:17:36 +00:00
return content
2023-03-17 15:55:15 +00:00
2023-03-18 12:26:19 +00:00
class WineDB ( BaseToolSet ) :
2023-03-17 15:55:15 +00:00
def __init__ ( self ) :
db = DatabaseReader (
scheme = " postgresql " , # Database Scheme
host = settings [ " WINEDB_HOST " ] , # Database Host
port = " 5432 " , # Database Port
user = " alphadom " , # Database User
password = settings [ " WINEDB_PASSWORD " ] , # Database Password
dbname = " postgres " , # Database Name
)
self . columns = [ " nameEn " , " nameKo " , " description " ]
concat_columns = str ( " , ' - ' , " . join ( [ f ' " { i } " ' for i in self . columns ] ) )
query = f """
SELECT
Concat ( { concat_columns } )
FROM wine
"""
documents = db . load_data ( query = query )
self . index = GPTSimpleVectorIndex ( documents )
2023-03-18 12:26:19 +00:00
@tool (
2023-03-21 12:20:15 +00:00
name = " Wine Recommendation " ,
2023-03-17 15:55:15 +00:00
description = " A tool to recommend wines based on a user ' s input. "
" Inputs are necessary factors for wine recommendations, such as the user ' s mood today, side dishes to eat with wine, people to drink wine with, what things you want to do, the scent and taste of their favorite wine. "
" The output will be a list of recommended wines. "
" The tool is based on a database of wine reviews, which is stored in a database. " ,
)
2023-03-22 00:34:52 +00:00
def recommend ( self , query : str ) - > str :
2023-03-17 15:55:15 +00:00
""" Run the tool. """
results = self . index . query ( query )
wine = " \n " . join (
[
f " { i } : { j } "
for i , j in zip (
self . columns , results . source_nodes [ 0 ] . source_text . split ( " - " )
)
]
)
2023-03-20 08:27:20 +00:00
output = results . response + " \n \n " + wine
2023-03-22 00:34:52 +00:00
logger . debug (
f " \n Processed WineDB, Input Query: { query } " f " Output Wine: { wine } "
)
2023-03-20 08:27:20 +00:00
return output
2023-03-17 15:55:15 +00:00
2023-03-18 12:26:19 +00:00
class ExitConversation ( BaseToolSet ) :
2023-03-21 12:49:47 +00:00
def __init__ ( self , executors : Dict [ str , AgentExecutor ] ) :
self . executors = executors
2023-03-21 12:20:15 +00:00
2023-03-18 12:26:19 +00:00
@tool (
2023-03-21 12:49:47 +00:00
name = " Exit Conversation " ,
2023-03-17 15:55:15 +00:00
description = " A tool to exit the conversation. "
2023-03-21 12:49:47 +00:00
" Use this when you want to exit the conversation. "
2023-03-21 12:20:15 +00:00
" Input should be a user ' s key. "
2023-03-17 15:55:15 +00:00
" The output will be a message that the conversation is over. " ,
)
2023-03-21 12:20:15 +00:00
def exit ( self , key : str ) - > str :
2023-03-17 15:55:15 +00:00
""" Run the tool. """
2023-03-21 12:49:47 +00:00
self . executors . pop ( key )
2023-03-18 09:20:18 +00:00
2023-03-22 00:34:52 +00:00
logger . debug ( f " \n Processed ExitConversation. " )
2023-03-20 08:27:20 +00:00
2023-03-21 12:49:47 +00:00
return f " Exit conversation. "