mirror of https://github.com/corca-ai/EVAL
feat: backmerge
commit
311539f2eb
@ -1,3 +0,0 @@
|
||||
from .patch import CodePatcher
|
||||
from .read import CodeReader
|
||||
from .write import CodeWriter
|
@ -1,35 +0,0 @@
|
||||
"""
|
||||
read protocol:
|
||||
|
||||
<filepath>|<start line>-<end line>
|
||||
"""
|
||||
|
||||
|
||||
class ReadCommand:
|
||||
separator = "|"
|
||||
|
||||
def __init__(self, filepath: str, start: int, end: int):
|
||||
self.filepath: str = filepath
|
||||
self.start: int = start
|
||||
self.end: int = end
|
||||
|
||||
def execute(self):
|
||||
with open(self.filepath, "r") as f:
|
||||
code = f.readlines()
|
||||
if self.start == self.end:
|
||||
code = code[self.start - 1]
|
||||
else:
|
||||
code = "".join(code[self.start - 1 : self.end])
|
||||
return code
|
||||
|
||||
@staticmethod
|
||||
def from_str(command: str) -> "ReadCommand":
|
||||
filepath, line = command.split(ReadCommand.separator)
|
||||
start, end = line.split("-")
|
||||
return ReadCommand(filepath, int(start), int(end))
|
||||
|
||||
|
||||
class CodeReader:
|
||||
@staticmethod
|
||||
def read(command: str) -> str:
|
||||
return ReadCommand.from_str(command).execute()
|
@ -0,0 +1,142 @@
|
||||
from core.tools.base import tool, BaseToolSet
|
||||
from logger import logger
|
||||
|
||||
from .patch import CodePatcher
|
||||
from .read import CodeReader
|
||||
from .write import CodeWriter
|
||||
|
||||
|
||||
class CodeEditor(BaseToolSet):
|
||||
@tool(
|
||||
name="CodeEditor.READ",
|
||||
description="Read and understand code. "
|
||||
f"Input should be filename and line number group. ex. test.py|1-10 "
|
||||
"and the output will be code. ",
|
||||
)
|
||||
def read(self, inputs: str) -> str:
|
||||
try:
|
||||
output = CodeReader.read(inputs)
|
||||
except Exception as e:
|
||||
output = str(e)
|
||||
|
||||
logger.debug(
|
||||
f"\nProcessed CodeEditor.READ, Input Commands: {inputs} "
|
||||
f"Output Answer: {output}"
|
||||
)
|
||||
return output
|
||||
|
||||
@tool(
|
||||
name="CodeEditor.SUMMARY",
|
||||
description="Summary code. "
|
||||
"Read the code structured into a tree. "
|
||||
"If you set specific line, it will show the code from the specific line. "
|
||||
"Input should be filename, depth, and specific line if you want. ex. test.py|2 or test.py|3|print('hello world') "
|
||||
"and the output will be list of (line number: code). ",
|
||||
)
|
||||
def summary(self, inputs: str) -> str:
|
||||
try:
|
||||
output = CodeReader.summary(inputs)
|
||||
except Exception as e:
|
||||
output = str(e)
|
||||
|
||||
logger.debug(
|
||||
f"\nProcessed CodeEditor.SUMMARY, Input Commands: {inputs} "
|
||||
f"Output Answer: {output}"
|
||||
)
|
||||
return output
|
||||
|
||||
@tool(
|
||||
name="CodeEditor.APPEND",
|
||||
description="Append code to the existing file. "
|
||||
"If the code is completed, use the Terminal tool to execute it, if not, append the code through the this tool. "
|
||||
"Input should be filename and code to append. "
|
||||
"Input code must be the code that should be appended, NOT whole code. "
|
||||
"ex. test.py\nprint('hello world')\n "
|
||||
"and the output will be last 3 line.",
|
||||
)
|
||||
def append(self, inputs: str) -> str:
|
||||
try:
|
||||
code = CodeWriter.append(inputs)
|
||||
output = (
|
||||
"Last 3 line was:\n"
|
||||
+ "\n".join(code.split("\n")[-3:])
|
||||
+ "\nYou can use CodeEditor.APPEND tool to append the code if it is not completed."
|
||||
)
|
||||
except Exception as e:
|
||||
output = str(e)
|
||||
|
||||
logger.debug(
|
||||
f"\nProcessed CodeEditor.APPEND, Input: {inputs} "
|
||||
f"Output Answer: {output}"
|
||||
)
|
||||
return output
|
||||
|
||||
@tool(
|
||||
name="CodeEditor.WRITE",
|
||||
description="Write code to create a new tool. "
|
||||
"If the code is completed, use the Terminal tool to execute it, if not, append the code through the CodeEditor.APPEND tool. "
|
||||
"Input should be filename and code. This file must be in playground folder. "
|
||||
"ex. test.py\nprint('hello world')\n "
|
||||
"and the output will be last 3 line.",
|
||||
)
|
||||
def write(self, inputs: str) -> str:
|
||||
try:
|
||||
code = CodeWriter.write(inputs)
|
||||
output = (
|
||||
"Last 3 line was:\n"
|
||||
+ "\n".join(code.split("\n")[-3:])
|
||||
+ "\nYou can use CodeEditor.APPEND tool to append the code if it is not completed."
|
||||
)
|
||||
except Exception as e:
|
||||
output = str(e)
|
||||
|
||||
logger.debug(
|
||||
f"\nProcessed CodeEditor.WRITE, Input: {inputs} " f"Output Answer: {output}"
|
||||
)
|
||||
return output
|
||||
|
||||
@tool(
|
||||
name="CodeEditor.PATCH",
|
||||
description="Patch the code to correct the error if an error occurs or to improve it. "
|
||||
"Input is a list of patches. The patch is separated by {seperator}. ".format(
|
||||
seperator=CodePatcher.separator.replace("\n", "\\n")
|
||||
)
|
||||
+ "Each patch has to be formatted like below.\n"
|
||||
"<filepath>|<start_line>,<start_col>|<end_line>,<end_col>|<new_code>"
|
||||
"Code between start and end will be replaced with new_code. "
|
||||
"The output will be written/deleted bytes or error message. ",
|
||||
)
|
||||
def patch(self, patches: str) -> str:
|
||||
try:
|
||||
w, d = CodePatcher.patch(patches)
|
||||
output = f"successfully wrote {w}, deleted {d}"
|
||||
except Exception as e:
|
||||
output = str(e)
|
||||
|
||||
logger.debug(
|
||||
f"\nProcessed CodeEditor.PATCH, Input Patch: {patches} "
|
||||
f"Output Answer: {output}"
|
||||
)
|
||||
return output
|
||||
|
||||
@tool(
|
||||
name="CodeEditor.DELETE",
|
||||
description="Delete code in file for a new start. "
|
||||
"Input should be filename."
|
||||
"ex. test.py "
|
||||
"Output will be success or error message.",
|
||||
)
|
||||
def delete(self, inputs: str) -> str:
|
||||
filename = inputs
|
||||
try:
|
||||
with open(filename, "w") as f:
|
||||
f.write("")
|
||||
output = "success"
|
||||
except Exception as e:
|
||||
output = str(e)
|
||||
|
||||
logger.debug(
|
||||
f"\nProcessed CodeEditor.DELETE, Input filename: {inputs} "
|
||||
f"Output Answer: {output}"
|
||||
)
|
||||
return output
|
@ -0,0 +1,173 @@
|
||||
"""
|
||||
read protocol:
|
||||
|
||||
<filepath>|<start line>-<end line>
|
||||
"""
|
||||
from typing import Optional, List, Tuple
|
||||
|
||||
|
||||
class Line:
|
||||
def __init__(self, content: str, line_number: int, depth: int):
|
||||
self.__content: str = content
|
||||
self.__line_number: int = line_number
|
||||
self.__depth: int = depth
|
||||
self.__children: List[Line] = []
|
||||
|
||||
def get_content(self) -> str:
|
||||
return self.__content
|
||||
|
||||
def get_depth(self) -> int:
|
||||
return self.__depth
|
||||
|
||||
def append_child(self, child: "Line") -> None:
|
||||
self.__children.append(child)
|
||||
|
||||
def find_by_lte_depth(self, depth: int) -> List["Line"]:
|
||||
if self.__depth > depth:
|
||||
return []
|
||||
|
||||
lines: List[Line] = [self]
|
||||
for child in self.__children:
|
||||
lines += child.find_by_lte_depth(depth)
|
||||
return lines
|
||||
|
||||
def find_by_content(self, content: str) -> List["Line"]:
|
||||
if content in self.__content:
|
||||
return [self]
|
||||
|
||||
lines: List[Line] = []
|
||||
for child in self.__children:
|
||||
lines += child.find_by_content(content)
|
||||
return lines
|
||||
|
||||
def find_last_lines(self) -> List["Line"]:
|
||||
if len(self.__children) == 0:
|
||||
return [self]
|
||||
else:
|
||||
return [self, *self.__children[-1].find_last_lines()]
|
||||
|
||||
def print(self, depth: int = 0) -> None:
|
||||
print(f"{' ' * depth}{self}", end="")
|
||||
for child in self.__children:
|
||||
child.print(depth + 1)
|
||||
|
||||
def __repr__(self):
|
||||
return f"{self.__line_number}: {self.__content}"
|
||||
|
||||
|
||||
class CodeTree:
|
||||
def __init__(self):
|
||||
self.root: Line = Line("\n", -1, -1)
|
||||
|
||||
def append(self, content: str, line_number: int) -> None:
|
||||
last_lines: List[Line] = self.root.find_last_lines()
|
||||
new_leading_spaces: int = self.__get_leading_spaces(content)
|
||||
|
||||
previous_line: Line = self.root
|
||||
previous_leading_spaces: int = -1
|
||||
for line in last_lines:
|
||||
leading_spaces = self.__get_leading_spaces(line.get_content())
|
||||
if (
|
||||
previous_leading_spaces < new_leading_spaces
|
||||
and new_leading_spaces <= leading_spaces
|
||||
):
|
||||
break
|
||||
previous_line, previous_leading_spaces = line, leading_spaces
|
||||
|
||||
new_line_depth: int = previous_line.get_depth() + 1
|
||||
previous_line.append_child(Line(content, line_number, new_line_depth))
|
||||
|
||||
def find_from_root(self, depth: int) -> List[Line]:
|
||||
return self.root.find_by_lte_depth(depth)
|
||||
|
||||
def find_from_parent(self, depth: int, parent_content: str) -> List[Line]:
|
||||
lines: List[Line] = self.root.find_by_content(parent_content)
|
||||
if len(lines) == 0:
|
||||
return []
|
||||
parent = lines[0]
|
||||
return parent.find_by_lte_depth(depth + parent.get_depth())
|
||||
|
||||
def print(self):
|
||||
print("Code Tree:")
|
||||
print("=================================")
|
||||
self.root.print()
|
||||
print("=================================")
|
||||
|
||||
def __get_leading_spaces(self, content: str) -> int:
|
||||
return len(content) - len(content.lstrip())
|
||||
|
||||
|
||||
class ReadCommand:
|
||||
separator = "|"
|
||||
|
||||
def __init__(self, filepath: str, start: int, end: int):
|
||||
self.filepath: str = filepath
|
||||
self.start: int = start
|
||||
self.end: int = end
|
||||
|
||||
def execute(self) -> str:
|
||||
with open(self.filepath, "r") as f:
|
||||
code = f.readlines()
|
||||
|
||||
if self.start == self.end:
|
||||
code = code[self.start - 1]
|
||||
else:
|
||||
code = "".join(code[self.start - 1 : self.end])
|
||||
return code
|
||||
|
||||
@staticmethod
|
||||
def from_str(command: str) -> "ReadCommand":
|
||||
filepath, line = command.split(ReadCommand.separator)
|
||||
start, end = line.split("-")
|
||||
return ReadCommand(filepath, int(start), int(end))
|
||||
|
||||
|
||||
class SummaryCommand:
|
||||
separator = "|"
|
||||
|
||||
def __init__(self, filepath: str, depth: int, parent_content: Optional[str] = None):
|
||||
self.filepath: str = filepath
|
||||
self.depth: int = depth
|
||||
self.parent_content: Optional[str] = parent_content
|
||||
|
||||
def execute(self) -> str:
|
||||
with open(self.filepath, "r") as f:
|
||||
code = f.readlines()
|
||||
|
||||
code_tree = CodeTree()
|
||||
for i, line in enumerate(code):
|
||||
if line.strip() != "":
|
||||
code_tree.append(line, i + 1)
|
||||
|
||||
# code_tree.print()
|
||||
|
||||
if self.parent_content is None:
|
||||
lines = code_tree.find_from_root(self.depth)
|
||||
else:
|
||||
lines = code_tree.find_from_parent(self.depth, self.parent_content)
|
||||
return "".join([str(line) for line in lines])
|
||||
|
||||
@staticmethod
|
||||
def from_str(command: str) -> "SummaryCommand":
|
||||
command_list: List[str] = command.split(SummaryCommand.separator)
|
||||
filepath: str = command_list[0]
|
||||
depth: int = int(command_list[1])
|
||||
parent_content: str | None = command_list[2] if len(command_list) == 3 else None
|
||||
return SummaryCommand(
|
||||
filepath=filepath, depth=depth, parent_content=parent_content
|
||||
)
|
||||
|
||||
|
||||
class CodeReader:
|
||||
@staticmethod
|
||||
def read(command: str) -> str:
|
||||
return ReadCommand.from_str(command).execute()
|
||||
|
||||
@staticmethod
|
||||
def summary(command: str) -> str:
|
||||
return SummaryCommand.from_str(command).execute()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
summary = CodeReader.summary("read.py|1|class ReadCommand:")
|
||||
print(summary)
|
@ -0,0 +1,67 @@
|
||||
import subprocess
|
||||
from typing import Dict, List
|
||||
|
||||
from tempfile import TemporaryFile
|
||||
|
||||
from env import settings
|
||||
from logger import logger
|
||||
from core.tools.base import tool, BaseToolSet, ToolScope, SessionGetter
|
||||
from core.tools.terminal.syscall import SyscallTracer
|
||||
|
||||
|
||||
class Terminal(BaseToolSet):
|
||||
def __init__(self):
|
||||
self.sessions: Dict[str, List[SyscallTracer]] = {}
|
||||
|
||||
@tool(
|
||||
name="Terminal",
|
||||
description="Executes commands in a terminal."
|
||||
"If linux errno occurs, we have to solve the problem with the terminal. "
|
||||
"It can't execute interactive operations or blocking operations. "
|
||||
"Input should be valid commands, "
|
||||
"and the output will be any output from running that command.",
|
||||
scope=ToolScope.SESSION,
|
||||
)
|
||||
def execute(self, commands: str, get_session: SessionGetter) -> str:
|
||||
session, _ = get_session()
|
||||
|
||||
try:
|
||||
with TemporaryFile() as fp:
|
||||
process = subprocess.Popen(
|
||||
commands,
|
||||
shell=True,
|
||||
cwd=settings["PLAYGROUND_DIR"],
|
||||
stdout=fp,
|
||||
stderr=fp,
|
||||
)
|
||||
|
||||
tracer = SyscallTracer(process.pid)
|
||||
tracer.attach()
|
||||
exitcode, reason = tracer.wait_until_stop_or_exit()
|
||||
logger.debug(f"Stopped terminal execution: {exitcode} {reason}")
|
||||
|
||||
fp.seek(0)
|
||||
output = fp.read().decode()
|
||||
except Exception as e:
|
||||
output = str(e)
|
||||
|
||||
if len(output) > 1000:
|
||||
output = output[:1000] + "..."
|
||||
|
||||
logger.debug(
|
||||
f"\nProcessed Terminal, Input Commands: {commands} "
|
||||
f"Output Answer: {output}"
|
||||
)
|
||||
return output
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import time
|
||||
|
||||
o = Terminal().execute(
|
||||
"sleep 1; echo 1; sleep 2; echo 2; sleep 3; echo 3; sleep 10;",
|
||||
lambda: ("", None),
|
||||
)
|
||||
print(o)
|
||||
|
||||
time.sleep(10) # see if timer has reset
|
@ -0,0 +1,103 @@
|
||||
from typing import Tuple, Optional
|
||||
import signal
|
||||
|
||||
from ptrace.debugger import (
|
||||
PtraceDebugger,
|
||||
PtraceProcess,
|
||||
ProcessExit,
|
||||
ProcessSignal,
|
||||
NewProcessEvent,
|
||||
ProcessExecution,
|
||||
)
|
||||
from ptrace.syscall import PtraceSyscall
|
||||
from ptrace.func_call import FunctionCallOptions
|
||||
from ptrace.tools import signal_to_exitcode
|
||||
|
||||
|
||||
class SyscallTimeoutException(Exception):
|
||||
def __init__(self, pid: int, *args) -> None:
|
||||
super().__init__(f"deadline exceeded while waiting syscall for {pid}", *args)
|
||||
|
||||
|
||||
class SyscallTracer:
|
||||
def __init__(self, pid: int):
|
||||
self.debugger: PtraceDebugger = PtraceDebugger()
|
||||
self.pid: int = pid
|
||||
self.process: PtraceProcess = None
|
||||
|
||||
def is_waiting(self, syscall: PtraceSyscall) -> bool:
|
||||
if syscall.name.startswith("wait"):
|
||||
return True
|
||||
return False
|
||||
|
||||
def attach(self):
|
||||
self.process = self.debugger.addProcess(self.pid, False)
|
||||
|
||||
def detach(self):
|
||||
self.process.detach()
|
||||
self.debugger.quit()
|
||||
|
||||
def set_timer(self, timeout: int):
|
||||
def handler(signum, frame):
|
||||
raise SyscallTimeoutException(self.process.pid)
|
||||
|
||||
signal.signal(signal.SIGALRM, handler)
|
||||
signal.alarm(timeout)
|
||||
|
||||
def reset_timer(self):
|
||||
signal.alarm(0)
|
||||
|
||||
def wait_syscall_with_timeout(self, timeout: int):
|
||||
self.set_timer(timeout)
|
||||
self.process.waitSyscall()
|
||||
self.reset_timer()
|
||||
|
||||
def wait_until_stop_or_exit(self) -> Tuple[Optional[int], str]:
|
||||
self.process.syscall()
|
||||
exitcode = None
|
||||
reason = ""
|
||||
while True:
|
||||
if not self.debugger:
|
||||
break
|
||||
|
||||
try:
|
||||
self.wait_syscall_with_timeout(5)
|
||||
except ProcessExit as event:
|
||||
if event.exitcode is not None:
|
||||
exitcode = event.exitcode
|
||||
continue
|
||||
except ProcessSignal as event:
|
||||
event.process.syscall(event.signum)
|
||||
exitcode = signal_to_exitcode(event.signum)
|
||||
reason = event.reason
|
||||
continue
|
||||
except NewProcessEvent as event:
|
||||
continue
|
||||
except ProcessExecution as event:
|
||||
continue
|
||||
except Exception as e:
|
||||
reason = str(e)
|
||||
break
|
||||
|
||||
syscall = self.process.syscall_state.event(
|
||||
FunctionCallOptions(
|
||||
write_types=False,
|
||||
write_argname=False,
|
||||
string_max_length=300,
|
||||
replace_socketcall=True,
|
||||
write_address=False,
|
||||
max_array_count=20,
|
||||
)
|
||||
)
|
||||
|
||||
self.process.syscall()
|
||||
|
||||
if syscall is None:
|
||||
continue
|
||||
|
||||
if syscall.result:
|
||||
continue
|
||||
|
||||
self.reset_timer()
|
||||
|
||||
return exitcode, reason
|
Loading…
Reference in New Issue