mirror of
https://github.com/xtekky/gpt4free.git
synced 2024-11-05 00:01:00 +00:00
126496d3ca
Add tests for providers with auth Improve async support / 2x faster Shared get_cookies by domain function
115 lines
2.9 KiB
Python
115 lines
2.9 KiB
Python
from abc import ABC, abstractmethod
|
|
|
|
from ..typing import Any, CreateResult, AsyncGenerator, Union
|
|
|
|
import browser_cookie3
|
|
import asyncio
|
|
from time import time
|
|
import math
|
|
|
|
class BaseProvider(ABC):
|
|
url: str
|
|
working = False
|
|
needs_auth = False
|
|
supports_stream = False
|
|
supports_gpt_35_turbo = False
|
|
supports_gpt_4 = False
|
|
|
|
@staticmethod
|
|
@abstractmethod
|
|
def create_completion(
|
|
model: str,
|
|
messages: list[dict[str, str]],
|
|
stream: bool,
|
|
**kwargs: Any,
|
|
) -> CreateResult:
|
|
raise NotImplementedError()
|
|
|
|
@classmethod
|
|
@property
|
|
def params(cls):
|
|
params = [
|
|
("model", "str"),
|
|
("messages", "list[dict[str, str]]"),
|
|
("stream", "bool"),
|
|
]
|
|
param = ", ".join([": ".join(p) for p in params])
|
|
return f"g4f.provider.{cls.__name__} supports: ({param})"
|
|
|
|
|
|
_cookies = {}
|
|
|
|
def get_cookies(cookie_domain: str) -> dict:
|
|
if cookie_domain not in _cookies:
|
|
_cookies[cookie_domain] = {}
|
|
for cookie in browser_cookie3.load(cookie_domain):
|
|
_cookies[cookie_domain][cookie.name] = cookie.value
|
|
return _cookies[cookie_domain]
|
|
|
|
|
|
class AsyncProvider(BaseProvider):
|
|
@classmethod
|
|
def create_completion(
|
|
cls,
|
|
model: str,
|
|
messages: list[dict[str, str]],
|
|
stream: bool = False,
|
|
**kwargs: Any
|
|
) -> CreateResult:
|
|
yield asyncio.run(cls.create_async(model, messages, **kwargs))
|
|
|
|
@staticmethod
|
|
@abstractmethod
|
|
async def create_async(
|
|
model: str,
|
|
messages: list[dict[str, str]],
|
|
**kwargs: Any,
|
|
) -> str:
|
|
raise NotImplementedError()
|
|
|
|
|
|
class AsyncGeneratorProvider(AsyncProvider):
|
|
@classmethod
|
|
def create_completion(
|
|
cls,
|
|
model: str,
|
|
messages: list[dict[str, str]],
|
|
stream: bool = True,
|
|
**kwargs: Any
|
|
) -> CreateResult:
|
|
if stream:
|
|
yield from run_generator(cls.create_async_generator(model, messages, **kwargs))
|
|
else:
|
|
yield from AsyncProvider.create_completion(cls=cls, model=model, messages=messages, **kwargs)
|
|
|
|
@classmethod
|
|
async def create_async(
|
|
cls,
|
|
model: str,
|
|
messages: list[dict[str, str]],
|
|
**kwargs: Any,
|
|
) -> str:
|
|
chunks = [chunk async for chunk in cls.create_async_generator(model, messages, **kwargs)]
|
|
if chunks:
|
|
return "".join(chunks)
|
|
|
|
@staticmethod
|
|
@abstractmethod
|
|
def create_async_generator(
|
|
model: str,
|
|
messages: list[dict[str, str]],
|
|
) -> AsyncGenerator:
|
|
raise NotImplementedError()
|
|
|
|
|
|
def run_generator(generator: AsyncGenerator[Union[Any, str], Any]):
|
|
loop = asyncio.new_event_loop()
|
|
gen = generator.__aiter__()
|
|
|
|
while True:
|
|
try:
|
|
yield loop.run_until_complete(gen.__anext__())
|
|
|
|
except StopAsyncIteration:
|
|
break
|