mirror of
https://github.com/xtekky/gpt4free.git
synced 2024-11-10 19:11:01 +00:00
5756586cde
Add doctypes to many functions Add file upload for text files Add alternative url to FreeChatgpt Add webp to allowed image types
150 lines
4.7 KiB
Python
150 lines
4.7 KiB
Python
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import os
|
|
import random
|
|
import secrets
|
|
import string
|
|
from asyncio import AbstractEventLoop, BaseEventLoop
|
|
from platformdirs import user_config_dir
|
|
from browser_cookie3 import (
|
|
chrome, chromium, opera, opera_gx,
|
|
brave, edge, vivaldi, firefox,
|
|
_LinuxPasswordManager, BrowserCookieError
|
|
)
|
|
from ..typing import Dict, Messages
|
|
from .. import debug
|
|
|
|
# Global variable to store cookies
|
|
_cookies: Dict[str, Dict[str, str]] = {}
|
|
|
|
def get_event_loop() -> AbstractEventLoop:
|
|
"""
|
|
Get the current asyncio event loop. If the loop is closed or not set, create a new event loop.
|
|
If a loop is running, handle nested event loops. Patch the loop if 'nest_asyncio' is installed.
|
|
|
|
Returns:
|
|
AbstractEventLoop: The current or new event loop.
|
|
"""
|
|
try:
|
|
loop = asyncio.get_event_loop()
|
|
if isinstance(loop, BaseEventLoop):
|
|
loop._check_closed()
|
|
except RuntimeError:
|
|
loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(loop)
|
|
try:
|
|
asyncio.get_running_loop()
|
|
if not hasattr(loop.__class__, "_nest_patched"):
|
|
import nest_asyncio
|
|
nest_asyncio.apply(loop)
|
|
except RuntimeError:
|
|
pass
|
|
except ImportError:
|
|
raise RuntimeError(
|
|
'Use "create_async" instead of "create" function in a running event loop. Or install "nest_asyncio" package.'
|
|
)
|
|
return loop
|
|
|
|
if os.environ.get('DBUS_SESSION_BUS_ADDRESS') == "/dev/null":
|
|
_LinuxPasswordManager.get_password = lambda a, b: b"secret"
|
|
|
|
def get_cookies(domain_name: str = '') -> Dict[str, str]:
|
|
"""
|
|
Load cookies for a given domain from all supported browsers and cache the results.
|
|
|
|
Args:
|
|
domain_name (str): The domain for which to load cookies.
|
|
|
|
Returns:
|
|
Dict[str, str]: A dictionary of cookie names and values.
|
|
"""
|
|
if domain_name in _cookies:
|
|
return _cookies[domain_name]
|
|
|
|
cookies = _load_cookies_from_browsers(domain_name)
|
|
_cookies[domain_name] = cookies
|
|
return cookies
|
|
|
|
def _load_cookies_from_browsers(domain_name: str) -> Dict[str, str]:
|
|
"""
|
|
Helper function to load cookies from various browsers.
|
|
|
|
Args:
|
|
domain_name (str): The domain for which to load cookies.
|
|
|
|
Returns:
|
|
Dict[str, str]: A dictionary of cookie names and values.
|
|
"""
|
|
cookies = {}
|
|
for cookie_fn in [_g4f, chrome, chromium, opera, opera_gx, brave, edge, vivaldi, firefox]:
|
|
try:
|
|
cookie_jar = cookie_fn(domain_name=domain_name)
|
|
if len(cookie_jar) and debug.logging:
|
|
print(f"Read cookies from {cookie_fn.__name__} for {domain_name}")
|
|
for cookie in cookie_jar:
|
|
if cookie.name not in cookies:
|
|
cookies[cookie.name] = cookie.value
|
|
except BrowserCookieError:
|
|
pass
|
|
except Exception as e:
|
|
if debug.logging:
|
|
print(f"Error reading cookies from {cookie_fn.__name__} for {domain_name}: {e}")
|
|
return cookies
|
|
|
|
def _g4f(domain_name: str) -> list:
|
|
"""
|
|
Load cookies from the 'g4f' browser (if exists).
|
|
|
|
Args:
|
|
domain_name (str): The domain for which to load cookies.
|
|
|
|
Returns:
|
|
list: List of cookies.
|
|
"""
|
|
user_data_dir = user_config_dir("g4f")
|
|
cookie_file = os.path.join(user_data_dir, "Default", "Cookies")
|
|
return [] if not os.path.exists(cookie_file) else chrome(cookie_file, domain_name)
|
|
|
|
def format_prompt(messages: Messages, add_special_tokens=False) -> str:
|
|
"""
|
|
Format a series of messages into a single string, optionally adding special tokens.
|
|
|
|
Args:
|
|
messages (Messages): A list of message dictionaries, each containing 'role' and 'content'.
|
|
add_special_tokens (bool): Whether to add special formatting tokens.
|
|
|
|
Returns:
|
|
str: A formatted string containing all messages.
|
|
"""
|
|
if not add_special_tokens and len(messages) <= 1:
|
|
return messages[0]["content"]
|
|
formatted = "\n".join([
|
|
f'{message["role"].capitalize()}: {message["content"]}'
|
|
for message in messages
|
|
])
|
|
return f"{formatted}\nAssistant:"
|
|
|
|
def get_random_string(length: int = 10) -> str:
|
|
"""
|
|
Generate a random string of specified length, containing lowercase letters and digits.
|
|
|
|
Args:
|
|
length (int, optional): Length of the random string to generate. Defaults to 10.
|
|
|
|
Returns:
|
|
str: A random string of the specified length.
|
|
"""
|
|
return ''.join(
|
|
random.choice(string.ascii_lowercase + string.digits)
|
|
for _ in range(length)
|
|
)
|
|
|
|
def get_random_hex() -> str:
|
|
"""
|
|
Generate a random hexadecimal string of a fixed length.
|
|
|
|
Returns:
|
|
str: A random hexadecimal string of 32 characters (16 bytes).
|
|
"""
|
|
return secrets.token_hex(16).zfill(32) |