diff --git a/docs/modules/utils/examples/docker.ipynb b/docs/modules/utils/examples/docker.ipynb new file mode 100644 index 00000000..e328dd39 --- /dev/null +++ b/docs/modules/utils/examples/docker.ipynb @@ -0,0 +1,77 @@ +{ + "cells": [ + { + "cell_type": "code", + "metadata": { + "jukit_cell_id": "O4HPx3boF0" + }, + "source": [], + "outputs": [], + "execution_count": null + }, + { + "cell_type": "markdown", + "metadata": { + "jukit_cell_id": "hqQkbPEwTJ" + }, + "source": [ + "# Using the DockerWrapper utility" + ] + }, + { + "cell_type": "code", + "metadata": { + "jukit_cell_id": "vCepuypaFH" + }, + "source": [ + "from langchain.utilities.docker import DockerWrapper, DockerSocket" + ], + "outputs": [], + "execution_count": null + }, + { + "cell_type": "code", + "metadata": { + "jukit_cell_id": "BtYVqy2YtO" + }, + "source": [ + "d = DockerWrapper()\n", + "query = \"\"\"\n", + "for i in $(seq 1 10)\n", + "do\n", + " echo $i\n", + "done\n", + "\"\"\"" + ], + "outputs": [], + "execution_count": null + }, + { + "cell_type": "code", + "metadata": { + "jukit_cell_id": "ELWWm03ptQ" + }, + "source": [ + "print(d.exec_run(query, \"alpine\"))" + ], + "outputs": [ + { + "output_type": "stream", + "name": "stdout", + "text": "[header] blocking IO\n{'stdout': '1\\n2\\n3\\n4\\n5\\n6\\n7\\n8\\n9\\n10\\n'}\n1\n2\n3\n4\n5\n6\n7\n8\n9\n10\n\n" + } + ], + "execution_count": 1 + } + ], + "metadata": { + "anaconda-cloud": {}, + "kernelspec": { + "display_name": "python", + "language": "python", + "name": "python3" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} \ No newline at end of file diff --git a/langchain/utilities/docker.py b/langchain/utilities/docker.py index 9d44416b..1b17023a 100644 --- a/langchain/utilities/docker.py +++ b/langchain/utilities/docker.py @@ -2,8 +2,14 @@ # TODO: Validation: # - verify gVisor runtime (runsc) if available # - pass arbitrary image names +# - embed file payloads in the call to run (in LLMChain)? +# - image selection helper +# - LLMChain decorator ? import docker +import struct +import time +import pandas as pd from docker.client import DockerClient # type: ignore from docker.errors import APIError, ContainerError @@ -11,6 +17,101 @@ from typing import Any, Dict from typing import Optional from pydantic import BaseModel, PrivateAttr, Extra, root_validator, validator +docker_images = { + "default": "alpine:{version}", + "python": "python:{version}", + } + +SOCK_BUF_SIZE = 1024 + +class DockerSocket: + """Wrapper around docker API's socket object. Can be used as a context manager.""" + + + def __init__(self, socket): + self.socket = socket + self.socket._sock.setblocking(False) + + def __enter__(self): + print("context enter") + return self + + def __exit__(self, exc_type, exc_value, traceback): + print("context exit") + self.close() + + def close(self): + self.socket._sock.shutdown(2) # 2 = SHUT_RDWR + self.socket._sock.close() + self.socket.close() + + def sendall(self, data: bytes) -> None: + self.socket._sock.sendall(data) + + def recv(self): + """Wrapper for socket.recv that does buffured read.""" + + # header := [8]byte{STREAM_TYPE, 0, 0, 0, SIZE1, SIZE2, SIZE3, SIZE4} + # STREAM_TYPE can be: + # + # 0: stdin (is written on stdout) + # 1: stdout + # 2: stderr + # SIZE1, SIZE2, SIZE3, SIZE4 are the four bytes of the uint32 size encoded as big endian. + # + # Following the header is the payload, which is the specified number of bytes of STREAM_TYPE. + # + # The simplest way to implement this protocol is the following: + # + # - Read 8 bytes. + # - Choose stdout or stderr depending on the first byte. + # - Extract the frame size from the last four bytes. + # - Read the extracted size and output it on the correct output. + # - Goto 1. + + chunks = [] + # strip the header + # try: + # self.socket._sock.recv(8) + # except BlockingIOError as e: + # raise ValueError("incomplete read from container output") + + while True: + header = b'' + try: + header = self.socket._sock.recv(8) + except BlockingIOError: + print("[header] blocking IO") + break + + if header == b'': + break + stream_type, size = struct.unpack("!BxxxI", header) + + payload = b'' + while size: + chunk = b'' + try: + chunk = self.socket._sock.recv(min(size, SOCK_BUF_SIZE)) + except BlockingIOError: + print("[body] blocking IO") + break + if chunk == b'': + raise ValueError("incomplete read from container output") + payload += chunk + size -= len(chunk) + chunks.append((stream_type, payload)) + # try: + # msg = self.socket._sock.recv(SOCK_BUF_SIZE) + # chunk += msg + # except BlockingIOError as e: + # break + + return chunks + + + + class DockerWrapper(BaseModel, extra=Extra.forbid): """Executes arbitrary payloads and returns the output.""" @@ -52,10 +153,71 @@ class DockerWrapper(BaseModel, extra=Extra.forbid): """ try: - image = getattr(kwargs, "image", self.image) + image = kwargs.get("image", self.image) + del kwargs['image'] return self._docker_client.containers.run(image, query, - remove=True) + remove=True, + **kwargs) except ContainerError as e: return f"STDERR: {e}" + # TODO: handle docker APIError ? + except APIError as e: + print(f"APIError: {e}") + return "ERROR" + + + + def exec_run(self, query: str, image: str) -> str: + """Run arbitrary shell command inside a container. + + This is a lower level API that sends the input to the container's + stdin through a socket using Docker API. + """ + # it is necessary to open stdin to keep the container running after it's started + # the attach_socket will hold the connection open until the container is stopped or + # the socket is closed. + container = self._docker_client.containers.create(image, stdin_open=True) + container.start() + # input() + + # get underlying socket + _socket = container.attach_socket(params={'stdout': 1, 'stderr': 1, 'stdin': 1, 'stream': 1}) + output = None + + socket = DockerSocket(_socket) + socket.sendall(query.encode('utf-8')) + time.sleep(2) + #FIX: how to make sure that the container is done executing the command? + # input() + + # read the output + output = socket.recv() + # print(output) + + + container.kill() + container.remove() + + # output is stored in a list of tuples (stream_type, payload) + df = pd.DataFrame(output, columns=['stream_type', 'payload']) + df['payload'] = df['payload'].apply(lambda x: x.decode('utf-8')) + df['stream_type'] = df['stream_type'].apply(lambda x: 'stdout' if x == 1 else 'stderr') + payload = df.groupby('stream_type')['payload'].apply(''.join).to_dict() + print(payload) + + if 'stdout' in payload and 'stderr' in payload: + return f"STDOUT:\n {payload['stdout']}\nSTDERR: {payload['stderr']}" + elif 'stderr' in payload: + return f"STDERR: {payload['stderr']}" + else: + return payload['stdout'] + + + +def _exec_run_stdin(input): + """Pipes the input data to a container. + + input should be an object with a `read` method that returns bytes. + """