fix run and exec_run default commands, actually use gVisor

- run and exec_run need a separate default command. Run usually executes
  a script while exec_run simulates an interactive session. The image
  templates and run funcs have been upgraded to handle both
  types of commands.

- test: make docker tests run when docker is installed and docker lib
  avaialble.
  - test that runsc runtime is used by default when gVisor is installed.
    (manually removing gVisor skips the test)
docker-utility
blob42 1 year ago
parent 149fe0055e
commit 201ecdc9ee

@ -158,11 +158,10 @@ class DockerSocket:
return chunks
def _default_params() -> Dict:
return {
# the only required parameter to be able to attach.
'stdin_open': True,
}
_default_params = {
# the only required parameter to be able to attach.
'stdin_open': True,
}
def _get_command(query: str, **kwargs: Dict) -> str:
"""Build an escaped command from a query string and keyword arguments."""
@ -172,6 +171,7 @@ def _get_command(query: str, **kwargs: Dict) -> str:
return cmd
#TEST: gVisor
class DockerWrapper(BaseModel, extra=Extra.allow):
"""Executes arbitrary payloads and returns the output.
@ -187,6 +187,12 @@ class DockerWrapper(BaseModel, extra=Extra.allow):
image: Union[str, Type[BaseImage]] = Field(default_factory=Shell,skip=True)
from_env: Optional[bool] = Field(default=True, skip=True)
_default_params: dict = Field(
default = {
# needed to attach stdin
'stdin_open': True,
}
)
# @property
# def image_name(self) -> str:
@ -199,6 +205,10 @@ class DockerWrapper(BaseModel, extra=Extra.allow):
if self.from_env:
self._docker_client = docker.from_env()
if gvisor_runtime_available(docker.from_env()):
self._params['runtime'] = 'runsc'
# if not isinstance(self.image, str) and issubclass(self.image, BaseImage):
# self._params = {**self._params, **self.image().dict()}
@ -244,7 +254,7 @@ class DockerWrapper(BaseModel, extra=Extra.allow):
_image = get_image_template(image)
if isinstance(_image, str):
# user wants a custom image, we should use default params
values["_params"] = {**_default_params(), **{'image': image}}
values["_params"] = {**_default_params, **{'image': image}}
else:
# user wants a pre registered image, we should use the image params
values["_params"] = _image().dict()
@ -267,6 +277,7 @@ class DockerWrapper(BaseModel, extra=Extra.allow):
return values
#FIX: default shell command should be different in run vs exec mode
def run(self, query: str, **kwargs: Any) -> str:
"""Run arbitrary shell command inside a container.
@ -287,6 +298,7 @@ class DockerWrapper(BaseModel, extra=Extra.allow):
del kwargs['image']
cmd = _get_command(query, **kwargs)
kwargs.pop('default_command', None)
kwargs.pop('stdin_command', None)
args['command'] = cmd
# print(f"args: {args}")
@ -351,9 +363,13 @@ class DockerWrapper(BaseModel, extra=Extra.allow):
kwargs = {**self._params, **kwargs}
if 'default_command' in kwargs:
kwargs['command'] = shlex.join([*kwargs['default_command'],query])
del kwargs['default_command']
kwargs.pop('default_command', None)
# exec_run requires flags for stdin
if 'stdin_command' in kwargs:
assert isinstance(kwargs['stdin_command'], list)
kwargs['command'] = shlex.join(kwargs['stdin_command'])
del kwargs['stdin_command']
# kwargs.pop('default_command', None)
# kwargs['command'] = cmd
@ -364,6 +380,8 @@ class DockerWrapper(BaseModel, extra=Extra.allow):
# TODO: handle both output mode for tty=True/False
logger.debug(f"running command {kwargs['command']}")
logger.debug(f"creating container with params {kwargs}")
#FIX: create container with base interactive command
container = self._docker_client.containers.create(**kwargs)
container.start()
@ -403,15 +421,15 @@ class DockerWrapper(BaseModel, extra=Extra.allow):
# output is stored in a list of tuples (stream_type, payload)
df = pd.DataFrame(output, columns=['stream_type', 'payload'])
df['payload'] = df['payload'].apply(lambda x: x.decode('utf-8')).apply(lambda x: x.strip())
df['payload'] = df['payload'].apply(lambda x: x.decode('utf-8'))
df['stream_type'] = df['stream_type'].apply(lambda x: 'stdout' if x == 1 else 'stderr')
payload = df.groupby('stream_type')['payload'].apply(''.join).to_dict()
logger.debug(f"payload: {payload}")
#NOTE: stderr might just contain the prompt
if 'stdout' in payload and 'stderr' in payload and with_stderr:
return f"STDOUT:\n {payload['stdout']}\nSTDERR:\n {payload['stderr']}"
return f"STDOUT:\n {payload['stdout'].strip()}\nSTDERR:\n {payload['stderr']}"
elif 'stderr' in payload and not 'stdout' in payload:
return f"STDERR: {payload['stderr']}"
else:
return payload['stdout']
return payload['stdout'].strip()

@ -13,16 +13,14 @@ class BaseImage(BaseModel, extra=Extra.forbid):
name: str
tag: Optional[str] = 'latest'
default_command: Optional[List[str]] = None
stdin_command: Optional[List[str]] = None
def dict(self, *args, **kwargs):
"""Override the dict method to add the image name."""
d = super().dict(*args, **kwargs)
del d['name']
del d['tag']
# del d['default_command']
d['image'] = self.image_name
# if self.default_command:
# d['command'] = self.default_command
return d
@property
@ -46,16 +44,23 @@ class Shell(BaseImage):
or by passing the full path to the shell binary.
"""
name: str = 'alpine'
default_command: List[str] = [ShellTypes.sh.value, '-s']
default_command: List[str] = [ShellTypes.sh.value, '-c']
stdin_command: List[str] = [ShellTypes.sh.value, '-i']
@validator('default_command')
def validate_shell(cls, value: str) -> str:
def validate_default_command(cls, value: str) -> str:
"""Validate shell type."""
val = getattr(ShellTypes, value, None)
if val:
return val.value
return value
@validator('stdin_command')
def validate_stdin_command(cls, value: str) -> str:
"""Validate shell type."""
val = getattr(ShellTypes, value, None)
if val:
return val.value
# elif value in [v.value for v in list(ShellTypes.__members__.values())]:
# print(f"docker: overriding shell binary to: {value}")
return value
# example using base image to construct python image
@ -66,13 +71,8 @@ class Python(BaseImage):
stdin open.
"""
name: str = 'python'
default_command: List[str] = ['python3', '-iq']
def __setattr__(self, name, value):
if name == 'default_command':
raise AttributeError(f'running this image with {self.default_command}'
' is necessary to keep stdin open.')
super().__setattr__(name, value)
default_command: List[str] = ['python3', '-c']
stdin_command: List[str] = ['python3', '-iq']
def get_image_template(image_name: str = 'shell') -> Union[str, Type[BaseImage]]:

@ -1,6 +1,8 @@
"""Test the docker wrapper utility."""
import pytest
import importlib
from typing import Any
from langchain.utilities.docker import DockerWrapper, \
gvisor_runtime_available, _default_params
from unittest.mock import MagicMock
@ -17,11 +19,31 @@ def docker_installed() -> bool:
return True
def gvisor_installed() -> bool:
"""return true if gvisor local runtime is installed"""
try:
docker_lib = importlib.import_module('docker')
client = docker_lib.from_env()
return gvisor_runtime_available(client)
except ImportError:
return False
return False
def docker_lib_installed() -> bool:
return importlib.util.find_spec('docker') is not None
@pytest.mark.skipif(not docker_installed(), reason="docker not installed")
def skip_docker_tests() -> bool:
return not docker_installed() or not docker_lib_installed()
@pytest.mark.skipif(skip_docker_tests(), reason="docker not installed")
class TestDockerUtility:
def test_default_image(self) -> None:
"""Test running a command with the default alpine image."""
docker = DockerWrapper()
@ -62,6 +84,14 @@ class TestDockerUtility:
mock_client.info.return_value = {'Runtimes': {'runc': {'path': 'runc'}}}
assert not gvisor_runtime_available(mock_client)
@pytest.mark.skipif(not gvisor_installed(), reason="gvisor not installed")
def test_run_with_runtime_runsc(self) -> None:
docker = DockerWrapper(image='shell')
output = docker.run('dmesg')
assert output.find('gVisor') != -1
def test_socket_read_timeout(self) -> None:
"""Test socket read timeout."""
docker = DockerWrapper(image='python', default_command=['python'])

Loading…
Cancel
Save