From ed97aa65af340ccc8432c2147ee502be74a3f6dd Mon Sep 17 00:00:00 2001 From: blob42 Date: Thu, 2 Mar 2023 19:11:58 +0100 Subject: [PATCH] exec_run: add timeout and delay params - use `delay` to wait for sent payload to finish - use `timeout` to control how long to wait for output --- langchain/utilities/docker/__init__.py | 45 ++++++++++++++++++-------- 1 file changed, 31 insertions(+), 14 deletions(-) diff --git a/langchain/utilities/docker/__init__.py b/langchain/utilities/docker/__init__.py index 1325a777..4020b276 100644 --- a/langchain/utilities/docker/__init__.py +++ b/langchain/utilities/docker/__init__.py @@ -48,10 +48,17 @@ def _check_gvisor_runtime(): _check_gvisor_runtime() +#TODO!: using pexpect to with containers +# TODO: add default expect pattern to image template +# TODO: pass max reads parameters for read trials +# NOTE: spawning with tty true or not gives slightly different stdout format +# NOTE: echo=False works when tty is disabled and only stdin is connected + + class DockerSocket: """Wrapper around docker API's socket object. Can be used as a context manager.""" - _timeout: int = 10 + _timeout: int = 5 def __init__(self, socket, timeout: int = _timeout): @@ -120,7 +127,7 @@ class DockerSocket: # the first recv is blocking to wait for the container to start header = self.socket._sock.recv(8) except BlockingIOError: - print("[header] blocking IO") + # logger.debug("[header] blocking IO") break self.socket._sock.setblocking(False) @@ -135,7 +142,7 @@ class DockerSocket: try: chunk = self.socket._sock.recv(min(size, SOCK_BUF_SIZE)) except BlockingIOError: - print("[body] blocking IO") + # logger.debug("[body] blocking IO") break if chunk == b'': raise ValueError("incomplete read from container output") @@ -301,7 +308,10 @@ class DockerWrapper(BaseModel, extra=Extra.allow): - def exec_run(self, query: str, **kwargs: Any) -> str: + def exec_run(self, query: str, socket_timeout: int = 5, + delay: float = 0.5, + with_stderr: bool = False, + **kwargs: Any) -> str: """Run arbitrary shell command inside an ephemeral container. This will create a container, run the command, and then remove the @@ -309,24 +319,31 @@ class DockerWrapper(BaseModel, extra=Extra.allow): using Docker API. It effectively simulates a tty session. Args: + socket_timeout (int): The timeout for receiving from the attached stdin. + delay (int): The delay in seconds before running the command. **kwargs: Pass extra parameters to DockerClient.container.exec_run. """ # it is necessary to open stdin to keep the container running after it's started # the attach_socket will hold the connection open until the container is stopped or # the socket is closed. - # TODO!: use tty=True to be able to simulate a tty session. + # NOTE: using tty=True to be able to simulate a tty session. # NOTE: some images like python need to be launched with custom # parameters to keep stdin open. For example python image needs to be # started with the command `python3 -i` + # remove local variables from kwargs + for arg in kwargs.keys(): + if arg in locals(): + del kwargs[arg] + kwargs = {**self._params, **kwargs} if 'default_command' in kwargs: kwargs['command'] = shlex.join(kwargs['default_command']) del kwargs['default_command'] - # cmd = _get_command(query, **kwargs) + # kwargs.pop('default_command', None) # kwargs['command'] = cmd @@ -346,15 +363,16 @@ class DockerWrapper(BaseModel, extra=Extra.allow): # input() - with DockerSocket(_socket) as _socket: + with DockerSocket(_socket, timeout=socket_timeout) as _socket: # flush the output buffer (if any prompt) flush = _socket.recv() _socket.setblocking(True) - print(flush) + logger.debug(f"flushed output: {flush}") # TEST: make sure the container is ready ? use a blocking first call _socket.sendall(query.encode('utf-8')) - #FIX: is it possible to know if command is finished ? - sleep(0.5) #this should be available as a parameter + + #NOTE: delay ensures that the command is executed after the input is sent + sleep(delay) #this should be available as a parameter # read the output output = None @@ -370,16 +388,15 @@ class DockerWrapper(BaseModel, extra=Extra.allow): # output is stored in a list of tuples (stream_type, payload) df = pd.DataFrame(output, columns=['stream_type', 'payload']) - df['payload'] = df['payload'].apply(lambda x: x.decode('utf-8')) + df['payload'] = df['payload'].apply(lambda x: x.decode('utf-8')).apply(lambda x: x.strip()) df['stream_type'] = df['stream_type'].apply(lambda x: 'stdout' if x == 1 else 'stderr') payload = df.groupby('stream_type')['payload'].apply(''.join).to_dict() logger.debug(f"payload: {payload}") - #HACK: better output handling when stderr is present #NOTE: stderr might just contain the prompt - if 'stdout' in payload and 'stderr' in payload: + if 'stdout' in payload and 'stderr' in payload and with_stderr: return f"STDOUT:\n {payload['stdout']}\nSTDERR:\n {payload['stderr']}" - if 'stderr' in payload and not 'stdout' in payload: + elif 'stderr' in payload and not 'stdout' in payload: return f"STDERR: {payload['stderr']}" else: return payload['stdout']