mirror of
https://github.com/bigscience-workshop/petals
synced 2024-10-31 09:20:41 +00:00
26ebbfe8f0
This PR makes both clients and servers work on macOS. Specifically, it: - Follows https://github.com/learning-at-home/hivemind/pull/586 to run a macOS-compatible `p2pd` binary (both x86-64 and ARM64 are supported) - Fixes forking issues and tests on macOS, Python 3.10+ - Introduces basic support for serving model blocks on Apple M1/M2 GPUs (torch.mps) - Increases max number of open files by default (it's not enough on Linux and is really small on macOS)
185 lines
8.8 KiB
Python
185 lines
8.8 KiB
Python
import asyncio
|
|
import multiprocessing as mp
|
|
import random
|
|
import time
|
|
from typing import Optional
|
|
|
|
import pytest
|
|
import pytest_asyncio # make sure the module exists; otherwise the test will be skipped
|
|
import torch
|
|
from hivemind import TensorDescriptor
|
|
|
|
from petals.server.memory_cache import AllocationFailed, MemoryCache
|
|
from petals.utils.misc import get_size_in_bytes
|
|
|
|
|
|
def _make_tensor_descriptor(num_bytes: int, dtype: Optional[torch.dtype] = None):
|
|
if dtype is None:
|
|
dtype = random.choice((torch.int64, torch.int8, torch.uint8, torch.float32, torch.bfloat16, torch.bool))
|
|
elem_size_bytes = get_size_in_bytes(dtype)
|
|
descr = TensorDescriptor.from_tensor(torch.empty((num_bytes // elem_size_bytes,), dtype=dtype))
|
|
return descr
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_cache_timeout():
|
|
cache = MemoryCache(max_size_bytes=1024, max_alloc_timeout=0.5)
|
|
cache.runtime_pid += 1 # pretend we're another process
|
|
async with cache.allocate_cache(_make_tensor_descriptor(768), timeout=0):
|
|
pass
|
|
|
|
async with cache.allocate_cache(_make_tensor_descriptor(100), timeout=999):
|
|
async with cache.allocate_cache(_make_tensor_descriptor(512), timeout=0):
|
|
async with cache.allocate_cache(_make_tensor_descriptor(128), _make_tensor_descriptor(32), timeout=1):
|
|
t_start = time.perf_counter()
|
|
with pytest.raises(AllocationFailed):
|
|
async with cache.allocate_cache(_make_tensor_descriptor(768), timeout=0.1):
|
|
pass
|
|
assert 0.1 < time.perf_counter() - t_start < 0.2, "wait time exceeds alloc timeout"
|
|
async with cache.allocate_cache(_make_tensor_descriptor(128), timeout=float("inf")):
|
|
pass
|
|
|
|
t_start = time.perf_counter()
|
|
with pytest.raises(AllocationFailed):
|
|
async with cache.allocate_cache(_make_tensor_descriptor(384), timeout=1.0): # exceeds max timeout
|
|
pass
|
|
assert 0.5 < time.perf_counter() - t_start < 0.6, "wait time exceeds max alloc timeout"
|
|
|
|
# test memory allocation when another task frees the memory
|
|
async def _klog_the_cache():
|
|
async with cache.allocate_cache(_make_tensor_descriptor(512), timeout=0.2):
|
|
pass
|
|
|
|
large_alloc_task = asyncio.create_task(_klog_the_cache())
|
|
|
|
t_start = time.perf_counter()
|
|
await asyncio.sleep(0.05) # wait for large alloc to enqueue
|
|
async with cache.allocate_cache(_make_tensor_descriptor(128), timeout=float("inf")): # exceeds max timeout
|
|
pass # this memory should allocate once the background task clears the queue
|
|
assert 0.2 < time.perf_counter() - t_start < 0.3, "memory should be allocated after background task clears"
|
|
with pytest.raises(AllocationFailed):
|
|
await large_alloc_task
|
|
|
|
# test that zero-timeout allocation fails instantaneously even if someone else is awaiting alloc
|
|
large_alloc_task = asyncio.create_task(_klog_the_cache())
|
|
t_start = time.perf_counter()
|
|
await asyncio.sleep(0.05) # wait for large alloc to enqueue
|
|
with pytest.raises(AllocationFailed):
|
|
async with cache.allocate_cache(_make_tensor_descriptor(512), timeout=0):
|
|
pass # this memory should allocate once the background task clears the queue
|
|
assert time.perf_counter() - t_start < 0.1, "zero-timeout task should fail (or succeed) instantaneously"
|
|
with pytest.raises(AllocationFailed):
|
|
await large_alloc_task
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_unlimited_timeout():
|
|
cache = MemoryCache(max_size_bytes=1024)
|
|
cache.runtime_pid += 1 # pretend we're another process
|
|
t_start = time.perf_counter()
|
|
|
|
async def _klog_the_cache():
|
|
async with cache.allocate_cache(_make_tensor_descriptor(512), timeout=0.2):
|
|
await asyncio.sleep(0.5)
|
|
|
|
alloc_task = asyncio.create_task(_klog_the_cache())
|
|
await asyncio.sleep(0.1)
|
|
async with cache.allocate_cache(_make_tensor_descriptor(768), timeout=float("inf")):
|
|
await alloc_task
|
|
assert 0.5 < time.perf_counter() - t_start < 0.6, "memory should be allocated after background task clears"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_cache_usage():
|
|
cache = MemoryCache(max_size_bytes=2048)
|
|
alloc_event, dealloc_a_event, dealloc_bcd_event, dealloc_e_event, dealloc_f_event = (mp.Event() for _ in range(5))
|
|
pipe_receiver, pipe_sender = mp.Pipe(duplex=False)
|
|
with pytest.raises(AssertionError):
|
|
async with cache.allocate_cache(_make_tensor_descriptor(123), timeout=1):
|
|
pass # fails because cache must be allocated from another process
|
|
|
|
descr_a = TensorDescriptor.from_tensor(torch.empty(768, dtype=torch.uint8)) # 768 bytes
|
|
descr_b = TensorDescriptor.from_tensor(torch.empty((), dtype=torch.float64)) # 8 bytes
|
|
descr_c = TensorDescriptor.from_tensor(torch.empty((33,), dtype=torch.bool)) # 33 bytes
|
|
descr_d = TensorDescriptor.from_tensor(torch.empty((0,), dtype=torch.int64)) # 0 bytes
|
|
descr_e = TensorDescriptor.from_tensor(torch.empty((96, 8), dtype=torch.bfloat16)) # 1536 bytes
|
|
descr_f = TensorDescriptor.from_tensor(torch.empty((1792,), dtype=torch.uint8)) # 1792 bytes
|
|
|
|
async def _allocate_and_wait(dealloc_event, *descrs, timeout=None):
|
|
loop = asyncio.get_event_loop()
|
|
async with cache.allocate_cache(*descrs, timeout=timeout) as handles:
|
|
pipe_sender.send(handles)
|
|
await loop.run_in_executor(None, dealloc_event.wait)
|
|
|
|
async def _allocate_af():
|
|
alloc_event.wait()
|
|
allocate_a_task = asyncio.create_task(_allocate_and_wait(dealloc_a_event, descr_a))
|
|
await allocate_a_task
|
|
allocate_f_task = asyncio.create_task(_allocate_and_wait(dealloc_f_event, descr_f)) # klogs the cache
|
|
await allocate_f_task
|
|
|
|
alloc_process1 = mp.context.ForkProcess(target=lambda: asyncio.run(_allocate_af()), daemon=True)
|
|
alloc_process1.start()
|
|
|
|
async def _allocate_bcde():
|
|
alloc_event.wait()
|
|
await asyncio.sleep(0.1) # ensure that the other tensor is always allocated (and sent through pipe) first
|
|
allocate_bcd_task = asyncio.create_task(_allocate_and_wait(dealloc_bcd_event, descr_b, descr_c, descr_d))
|
|
allocate_e_task = asyncio.create_task(_allocate_and_wait(dealloc_e_event, descr_e)) # doesn't fit
|
|
await asyncio.wait({allocate_e_task, allocate_bcd_task}, return_when=asyncio.ALL_COMPLETED)
|
|
|
|
alloc_process2 = mp.context.ForkProcess(target=lambda: asyncio.run(_allocate_bcde()), daemon=True)
|
|
alloc_process2.start()
|
|
assert cache.current_size_bytes == 0
|
|
alloc_event.set()
|
|
(handle_a,) = pipe_receiver.recv()
|
|
|
|
handle_b, handle_c, handle_d = pipe_receiver.recv()
|
|
|
|
with cache.use_cache(handle_a) as (tensor_a,):
|
|
assert tensor_a.dtype == torch.uint8
|
|
tensor_a[2:5] = torch.tensor((42, 43, 44))
|
|
|
|
with cache.use_cache(handle_a, handle_b, handle_d) as (tensor_a, tensor_b, tensor_d):
|
|
assert tensor_b.dtype == torch.float64 and tensor_b.numel() == 1 and tensor_b.ndim == 0
|
|
assert tensor_d.dtype == torch.int64 and tensor_d.numel() == 0
|
|
tensor_a += 1
|
|
tensor_b[...] = -1.337
|
|
assert cache.current_size_bytes == 809 # this checks a,b,c,d are allocated but b still awaits memory
|
|
|
|
dealloc_bcd_event.set()
|
|
await asyncio.sleep(0.1)
|
|
assert cache.current_size_bytes == 768 # only tensor a should be allocated
|
|
with pytest.raises(KeyError):
|
|
with cache.use_cache(handle_a, handle_b):
|
|
pass # one of handles (c) is deallocated
|
|
with pytest.raises(KeyError):
|
|
with cache.use_cache(handle_d):
|
|
pass # handle_d is deallocated correctly, even though it is never used
|
|
with cache.use_cache(handle_a) as (tensor_a,):
|
|
assert tuple(tensor_a[2:5]) == (43, 44, 45)
|
|
|
|
dealloc_a_event.set()
|
|
(handle_e,) = pipe_receiver.recv() # e can finally be allocated
|
|
await asyncio.sleep(0.1)
|
|
assert cache.current_size_bytes == 1536 # tensor e should finally be able to allocate
|
|
|
|
with pytest.raises(KeyError):
|
|
with cache.use_cache(handle_a):
|
|
pass # tensor a is no longer allocated
|
|
with cache.use_cache(handle_e) as (tensor_e,):
|
|
assert tensor_e.dtype == torch.bfloat16 and tensor_e.shape == (96, 8)
|
|
|
|
dealloc_e_event.set()
|
|
await asyncio.sleep(0.1)
|
|
assert cache.current_size_bytes == 1792 # only tensor f is still allocated
|
|
dealloc_f_event.set()
|
|
|
|
alloc_process1.join()
|
|
alloc_process2.join()
|
|
await asyncio.sleep(0.1)
|
|
assert cache.current_size_bytes == 0
|
|
assert cache.current_size_bytes == 0
|
|
assert alloc_process1.exitcode == 0, "allocation process 1 failed or did not finish, see stderr for details"
|
|
assert alloc_process2.exitcode == 0, "allocation process 2 failed or did not finish, see stderr for details"
|