From 8a7871ece33391046dfbf85682ece9818fe9823a Mon Sep 17 00:00:00 2001 From: blob42 Date: Fri, 3 Mar 2023 21:22:45 +0100 Subject: [PATCH] add exec_attached: attach to running container and exec cmd --- langchain/utilities/docker/__init__.py | 150 +++++++++++++++++++------ tests/unit_tests/test_docker.py | 18 +++ 2 files changed, 135 insertions(+), 33 deletions(-) diff --git a/langchain/utilities/docker/__init__.py b/langchain/utilities/docker/__init__.py index d5a97d67..0b0a41b0 100644 --- a/langchain/utilities/docker/__init__.py +++ b/langchain/utilities/docker/__init__.py @@ -1,15 +1,17 @@ """Wrapper for untrusted code exectuion on docker.""" -#TODO: attach to running container -#TODO: pull images -#TODO: embed file payloads in the call to run (in LLMChain)? -#TODO: image selection helper -#TODO: LLMChain decorator ? +# TODO: pass payload to contanier via filesystem +# TODO: attach to running container +# TODO: embed file payloads in the call to run (in LLMChain)? +# TODO: image selection helper +# TODO: LLMChain decorator ? import docker import struct import socket import shlex -from time import sleep +import os +import io +import tarfile import pandas as pd # type: ignore from docker.client import DockerClient # type: ignore from docker.errors import APIError, ContainerError # type: ignore @@ -94,17 +96,17 @@ class DockerSocket: # is not multiplexed. The data exchanged over the hijacked connection is # simply the raw data from the process PTY and client's stdin. - - # header := [8]byte{STREAM_TYPE, 0, 0, 0, SIZE1, SIZE2, SIZE3, SIZE4} # STREAM_TYPE can be: # # 0: stdin (is written on stdout) # 1: stdout # 2: stderr - # SIZE1, SIZE2, SIZE3, SIZE4 are the four bytes of the uint32 size encoded as big endian. + # SIZE1, SIZE2, SIZE3, SIZE4 are the four bytes of the uint32 size encoded as + # big endian. # - # Following the header is the payload, which is the specified number of bytes of STREAM_TYPE. + # Following the header is the payload, which is the specified number of bytes of + # STREAM_TYPE. # # The simplest way to implement this protocol is the following: # @@ -187,12 +189,6 @@ class DockerWrapper(BaseModel, extra=Extra.allow): image: Union[str, Type[BaseImage]] = Field(default_factory=Shell,skip=True) from_env: Optional[bool] = Field(default=True, skip=True) - _default_params: dict = Field( - default = { - # needed to attach stdin - 'stdin_open': True, - } - ) # @property # def image_name(self) -> str: @@ -277,6 +273,13 @@ class DockerWrapper(BaseModel, extra=Extra.allow): return values + def _clean_kwargs(self, kwargs: dict) -> dict: + kwargs.pop('default_command', None) + kwargs.pop('stdin_command', None) + return kwargs + + + #FIX: default shell command should be different in run vs exec mode def run(self, query: str, **kwargs: Any) -> str: """Run arbitrary shell command inside a container. @@ -297,8 +300,7 @@ class DockerWrapper(BaseModel, extra=Extra.allow): del kwargs['image'] cmd = _get_command(query, **kwargs) - kwargs.pop('default_command', None) - kwargs.pop('stdin_command', None) + self._clean_kwargs(kwargs) args['command'] = cmd # print(f"args: {args}") @@ -327,21 +329,31 @@ class DockerWrapper(BaseModel, extra=Extra.allow): logger.debug(f"flushed output: {flush}") + def _massage_output_streams(self, output): + df = pd.DataFrame(output, columns=['stream_type', 'payload']) + df['payload'] = df['payload'].apply(lambda x: x.decode('utf-8')) + df['stream_type'] = df['stream_type'].apply(lambda x: 'stdout' if x == 1 else 'stderr') + payload = df.groupby('stream_type')['payload'].apply(''.join).to_dict() + logger.debug(f"payload: {payload}") + return payload + + # TODO: document dif between run and exec_run def exec_run(self, query: str, timeout: int = 5, delay: float = 0.5, with_stderr: bool = False, flush_prompt: bool = False, **kwargs: Any) -> str: - """Run arbitrary shell command inside an ephemeral container. + """Run a shell command inside an ephemeral container. This will create a container, run the command, and then remove the container. the input is sent to the container's stdin through a socket using Docker API. It effectively simulates a tty session. Args: + query (str): The command to execute. timeout (int): The timeout for receiving from the attached stdin. - delay (int): The delay in seconds before running the command. + delay (float): The delay in seconds before running the command. with_stderr (bool): If True, the stderr will be included in the output flush_prompt (bool): If True, the prompt will be flushed before running the command. **kwargs: Pass extra parameters to DockerClient.container.exec_run. @@ -363,9 +375,10 @@ class DockerWrapper(BaseModel, extra=Extra.allow): kwargs = {**self._params, **kwargs} - kwargs.pop('default_command', None) + kwargs = self._clean_kwargs(kwargs) - # exec_run requires flags for stdin + # exec_run requires flags for stdin so we use `stdin_command` as + # a default command for creating the container if 'stdin_command' in kwargs: assert isinstance(kwargs['stdin_command'], list) kwargs['command'] = shlex.join(kwargs['stdin_command']) @@ -378,10 +391,8 @@ class DockerWrapper(BaseModel, extra=Extra.allow): # return # TODO: handle both output mode for tty=True/False - logger.debug(f"running command {kwargs['command']}") logger.debug(f"creating container with params {kwargs}") - #FIX: create container with base interactive command container = self._docker_client.containers.create(**kwargs) container.start() @@ -402,10 +413,8 @@ class DockerWrapper(BaseModel, extra=Extra.allow): _socket.sendall(raw_input) #NOTE: delay ensures that the command is executed after the input is sent - sleep(delay) #this should be available as a parameter + time.sleep(delay) #this should be available as a parameter - # read the output - output = None try: output = _socket.recv() except socket.timeout: @@ -418,15 +427,90 @@ class DockerWrapper(BaseModel, extra=Extra.allow): pass container.remove(force=True) + if output is None: + logger.warning("no output") + return "ERROR" # output is stored in a list of tuples (stream_type, payload) - df = pd.DataFrame(output, columns=['stream_type', 'payload']) - df['payload'] = df['payload'].apply(lambda x: x.decode('utf-8')) - df['stream_type'] = df['stream_type'].apply(lambda x: 'stdout' if x == 1 else 'stderr') - payload = df.groupby('stream_type')['payload'].apply(''.join).to_dict() - logger.debug(f"payload: {payload}") + payload = self._massage_output_streams(output) + + + #NOTE: stderr might contain only the prompt + if 'stdout' in payload and 'stderr' in payload and with_stderr: + return f"STDOUT:\n {payload['stdout'].strip()}\nSTDERR:\n {payload['stderr']}" + elif 'stderr' in payload and not 'stdout' in payload: + return f"STDERR: {payload['stderr']}" + else: + return payload['stdout'].strip() + + + def exec_attached(self, query: str, container: str, + delay: float = 0.5, + timeout: int = 5, + with_stderr: bool = False, + flush_prompt: bool = False, + **kwargs: Any) -> str: + """Attach to container and exec query on it. + + This method is very similary to exec_run. It only differs in that it attaches to + an already specifed container instead of creating a new one for each query. + + Args: + query (str): The command to execute. + container (str): The container to attach to. + timeout (int): The timeout for receiving from the attached stdin. + delay (float): The delay in seconds before running the command. + with_stderr (bool): If True, the stderr will be included in the output + flush_prompt (bool): If True, the prompt will be flushed before running the command. + **kwargs: Pass extra parameters to DockerClient.container.exec_run. + + """ + + # remove local variables from kwargs + for arg in kwargs.keys(): + if arg in locals(): + del kwargs[arg] + + + kwargs = {**self._params, **kwargs} + kwargs = self._clean_kwargs(kwargs) + + logger.debug(f"attaching to container {container} with params {kwargs}") + + try: + container = self._docker_client.containers.get(container) + except Exception as e: + logger.error(f"container {container}: {e}") + return "ERROR" + + _socket = container.attach_socket(params={'stdout': 1, 'stderr': 1, + 'stdin': 1, 'stream': 1}) + + + with DockerSocket(_socket, timeout=timeout) as _socket: + # flush the output buffer (if any prompt) + if flush_prompt: + self._flush_prompt(_socket) + + raw_input = f"{query}\n".encode('utf-8') + _socket.sendall(raw_input) + + #NOTE: delay ensures that the command is executed after the input is sent + time.sleep(delay) #this should be available as a parameter + + try: + output = _socket.recv() + except socket.timeout: + return "ERROR: timeout" + + if output is None: + logger.warning("no output") + return "ERROR" + + payload = self._massage_output_streams(output) + print(payload) - #NOTE: stderr might just contain the prompt + #NOTE: stderr might contain only the prompt if 'stdout' in payload and 'stderr' in payload and with_stderr: return f"STDOUT:\n {payload['stdout'].strip()}\nSTDERR:\n {payload['stderr']}" elif 'stderr' in payload and not 'stdout' in payload: diff --git a/tests/unit_tests/test_docker.py b/tests/unit_tests/test_docker.py index 58d4bdfc..c5224ad0 100644 --- a/tests/unit_tests/test_docker.py +++ b/tests/unit_tests/test_docker.py @@ -7,6 +7,7 @@ from langchain.utilities.docker import DockerWrapper, \ gvisor_runtime_available, _default_params from unittest.mock import MagicMock import subprocess +import time def docker_installed() -> bool: @@ -84,6 +85,23 @@ class TestDockerUtility: mock_client.info.return_value = {'Runtimes': {'runc': {'path': 'runc'}}} assert not gvisor_runtime_available(mock_client) + def test_exec_attached(self) -> None: + """Test exec with attached mode.""" + # create a test container + d = DockerWrapper() + cont = d._docker_client.containers.run('alpine', '/bin/sh -s', + detach=True, + stdin_open=True) + cont.start() + # make sure the prompt is ready + time.sleep(1) + out = d.exec_attached("cat /etc/os-release", container=cont.id) + assert out.find('alpine') != -1 + cont.kill() + cont.remove(force=True) + + + @pytest.mark.skipif(not gvisor_installed(), reason="gvisor not installed") def test_run_with_runtime_runsc(self) -> None: