Add webdriver module

pull/1274/head
Heiner Lohaus 11 months ago
parent 702837a33a
commit 08e308348b

@ -5,7 +5,8 @@ import random
from ..typing import CreateResult, Messages
from .base_provider import BaseProvider
from .helper import WebDriver, WebDriverSession, format_prompt, get_random_string
from .helper import format_prompt, get_random_string
from .webdriver import WebDriver, WebDriverSession
from .. import debug
class AItianhuSpace(BaseProvider):
@ -24,7 +25,7 @@ class AItianhuSpace(BaseProvider):
domain: str = None,
proxy: str = None,
timeout: int = 120,
web_driver: WebDriver = None,
webdriver: WebDriver = None,
headless: bool = True,
**kwargs
) -> CreateResult:
@ -39,7 +40,7 @@ class AItianhuSpace(BaseProvider):
url = f"https://{domain}"
prompt = format_prompt(messages)
with WebDriverSession(web_driver, "", headless=headless, proxy=proxy) as driver:
with WebDriverSession(webdriver, "", headless=headless, proxy=proxy) as driver:
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC

@ -4,7 +4,8 @@ import time, json
from ..typing import CreateResult, Messages
from .base_provider import BaseProvider
from .helper import WebDriver, WebDriverSession, format_prompt
from .helper import format_prompt
from .webdriver import WebDriver, WebDriverSession
class MyShell(BaseProvider):
url = "https://app.myshell.ai/chat"
@ -20,10 +21,10 @@ class MyShell(BaseProvider):
stream: bool,
proxy: str = None,
timeout: int = 120,
web_driver: WebDriver = None,
webdriver: WebDriver = None,
**kwargs
) -> CreateResult:
with WebDriverSession(web_driver, "", proxy=proxy) as driver:
with WebDriverSession(webdriver, "", proxy=proxy) as driver:
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
@ -52,15 +53,16 @@ response = await fetch("https://api.myshell.ai/v1/bot/chat/send_message", {
"body": '{body}',
"method": "POST"
})
window.reader = response.body.getReader();
window._reader = response.body.pipeThrough(new TextDecoderStream()).getReader();
"""
driver.execute_script(script.replace("{body}", json.dumps(data)))
script = """
chunk = await window.reader.read();
if (chunk['done']) return null;
text = (new TextDecoder()).decode(chunk['value']);
chunk = await window._reader.read();
if (chunk['done']) {
return null;
}
content = '';
text.split('\\n').forEach((line, index) => {
chunk['value'].split('\\n').forEach((line, index) => {
if (line.startsWith('data: ')) {
try {
const data = JSON.parse(line.substring('data: '.length));

@ -4,7 +4,8 @@ import time
from ..typing import CreateResult, Messages
from .base_provider import BaseProvider
from .helper import WebDriver, WebDriverSession, format_prompt
from .helper import format_prompt
from .webdriver import WebDriver, WebDriverSession
class PerplexityAi(BaseProvider):
url = "https://www.perplexity.ai"
@ -20,12 +21,12 @@ class PerplexityAi(BaseProvider):
stream: bool,
proxy: str = None,
timeout: int = 120,
web_driver: WebDriver = None,
webdriver: WebDriver = None,
virtual_display: bool = True,
copilot: bool = False,
**kwargs
) -> CreateResult:
with WebDriverSession(web_driver, "", virtual_display=virtual_display, proxy=proxy) as driver:
with WebDriverSession(webdriver, "", virtual_display=virtual_display, proxy=proxy) as driver:
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC

@ -5,7 +5,8 @@ from urllib.parse import quote
from ..typing import CreateResult, Messages
from .base_provider import BaseProvider
from .helper import WebDriver, WebDriverSession, format_prompt
from .helper import format_prompt
from .webdriver import WebDriver, WebDriverSession
class Phind(BaseProvider):
url = "https://www.phind.com"
@ -21,11 +22,11 @@ class Phind(BaseProvider):
stream: bool,
proxy: str = None,
timeout: int = 120,
web_driver: WebDriver = None,
webdriver: WebDriver = None,
creative_mode: bool = None,
**kwargs
) -> CreateResult:
with WebDriverSession(web_driver, "", proxy=proxy) as driver:
with WebDriverSession(webdriver, "", proxy=proxy) as driver:
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
@ -34,40 +35,38 @@ class Phind(BaseProvider):
driver.get(f"{cls.url}/search?q={prompt}&source=searchbox")
# Register fetch hook
driver.execute_script("""
source = """
window._fetch = window.fetch;
window.fetch = (url, options) => {
// Call parent fetch method
const result = window._fetch(url, options);
window.fetch = async (url, options) => {
const response = await window._fetch(url, options);
if (url != "/api/infer/answer") {
return result;
return response;
}
// Load response reader
result.then((response) => {
if (!response.body.locked) {
window._reader = response.body.getReader();
copy = response.clone();
window._reader = response.body.pipeThrough(new TextDecoderStream()).getReader();
return copy;
}
});
// Return dummy response
return new Promise((resolve, reject) => {
resolve(new Response(new ReadableStream()))
});
}
""")
"""
driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", {
"source": source
})
# Need to change settings
if model.startswith("gpt-4") or creative_mode:
wait = WebDriverWait(driver, timeout)
def open_dropdown():
# Open settings dropdown
wait.until(EC.visibility_of_element_located((By.CSS_SELECTOR, "button.text-dark.dropdown-toggle")))
driver.find_element(By.CSS_SELECTOR, "button.text-dark.dropdown-toggle").click()
# Wait for dropdown toggle
wait.until(EC.visibility_of_element_located((By.XPATH, "//button[text()='GPT-4']")))
if model.startswith("gpt-4") or creative_mode:
# Enable GPT-4
if model.startswith("gpt-4"):
open_dropdown()
driver.find_element(By.XPATH, "//button[text()='GPT-4']").click()
# Enable creative mode
if creative_mode or creative_mode == None:
open_dropdown()
driver.find_element(By.ID, "Creative Mode").click()
# Submit changes
driver.find_element(By.CSS_SELECTOR, ".search-bar-input-group button[type='submit']").click()
@ -78,10 +77,11 @@ window.fetch = (url, options) => {
chunk = driver.execute_script("""
if(window._reader) {
chunk = await window._reader.read();
if (chunk['done']) return null;
text = (new TextDecoder()).decode(chunk['value']);
if (chunk['done']) {
return null;
}
content = '';
text.split('\\r\\n').forEach((line, index) => {
chunk['value'].split('\\r\\n').forEach((line, index) => {
if (line.startsWith('data: ')) {
line = line.substring('data: '.length);
if (!line.startsWith('<PHIND_METADATA>')) {

@ -4,7 +4,7 @@ import time, json, time
from ..typing import CreateResult, Messages
from .base_provider import BaseProvider
from .helper import WebDriver, WebDriverSession
from .webdriver import WebDriver, WebDriverSession
class TalkAi(BaseProvider):
url = "https://talkai.info"
@ -19,10 +19,10 @@ class TalkAi(BaseProvider):
messages: Messages,
stream: bool,
proxy: str = None,
web_driver: WebDriver = None,
webdriver: WebDriver = None,
**kwargs
) -> CreateResult:
with WebDriverSession(web_driver, "", virtual_display=True, proxy=proxy) as driver:
with WebDriverSession(webdriver, "", virtual_display=True, proxy=proxy) as driver:
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC

@ -6,7 +6,6 @@ import webbrowser
import random
import string
import secrets
import time
from os import path
from asyncio import AbstractEventLoop
from platformdirs import user_config_dir
@ -21,26 +20,8 @@ from browser_cookie3 import (
firefox,
BrowserCookieError
)
try:
from selenium.webdriver.remote.webdriver import WebDriver
except ImportError:
class WebDriver():
pass
try:
from undetected_chromedriver import Chrome, ChromeOptions
except ImportError:
class Chrome():
def __init__():
raise RuntimeError('Please install the "undetected_chromedriver" package')
class ChromeOptions():
def add_argument():
pass
try:
from pyvirtualdisplay import Display
except ImportError:
pass
from ..typing import Dict, Messages, Union, Tuple
from ..typing import Dict, Messages
from .. import debug
# Change event loop policy on windows
@ -135,74 +116,11 @@ def format_prompt(messages: Messages, add_special_tokens=False) -> str:
return f"{formatted}\nAssistant:"
def get_browser(
user_data_dir: str = None,
headless: bool = False,
proxy: str = None,
options: ChromeOptions = None
) -> Chrome:
if user_data_dir == None:
user_data_dir = user_config_dir("g4f")
if proxy:
if not options:
options = ChromeOptions()
options.add_argument(f'--proxy-server={proxy}')
return Chrome(options=options, user_data_dir=user_data_dir, headless=headless)
class WebDriverSession():
def __init__(
self,
web_driver: WebDriver = None,
user_data_dir: str = None,
headless: bool = False,
virtual_display: bool = False,
proxy: str = None,
options: ChromeOptions = None
):
self.web_driver = web_driver
self.user_data_dir = user_data_dir
self.headless = headless
self.virtual_display = virtual_display
self.proxy = proxy
self.options = options
def reopen(
self,
user_data_dir: str = None,
headless: bool = False,
virtual_display: bool = False
) -> WebDriver:
if user_data_dir == None:
user_data_dir = self.user_data_dir
self.default_driver.quit()
if not virtual_display and self.virtual_display:
self.virtual_display.stop()
self.default_driver = get_browser(user_data_dir, headless, self.proxy)
return self.default_driver
def __enter__(self) -> WebDriver:
if self.web_driver:
return self.web_driver
if self.virtual_display == True:
self.virtual_display = Display(size=(1920,1080))
self.virtual_display.start()
self.default_driver = get_browser(self.user_data_dir, self.headless, self.proxy, self.options)
return self.default_driver
def __exit__(self, exc_type, exc_val, exc_tb):
if self.default_driver:
self.default_driver.close()
time.sleep(0.1)
self.default_driver.quit()
if self.virtual_display:
self.virtual_display.stop()
def get_random_string(length: int = 10) -> str:
return ''.join(
random.choice(string.ascii_lowercase + string.digits)
for _ in range(length)
)
def get_random_hex() -> str:
return secrets.token_hex(16).zfill(32)

@ -4,7 +4,8 @@ import time
from ...typing import CreateResult, Messages
from ..base_provider import BaseProvider
from ..helper import WebDriver, WebDriverSession, format_prompt
from ..helper import format_prompt
from ..webdriver import WebDriver, WebDriverSession
class Bard(BaseProvider):
url = "https://bard.google.com"
@ -18,13 +19,13 @@ class Bard(BaseProvider):
messages: Messages,
stream: bool,
proxy: str = None,
web_driver: WebDriver = None,
webdriver: WebDriver = None,
user_data_dir: str = None,
headless: bool = True,
**kwargs
) -> CreateResult:
prompt = format_prompt(messages)
session = WebDriverSession(web_driver, user_data_dir, headless, proxy=proxy)
session = WebDriverSession(webdriver, user_data_dir, headless, proxy=proxy)
with session as driver:
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
@ -36,8 +37,8 @@ class Bard(BaseProvider):
wait.until(EC.visibility_of_element_located((By.CSS_SELECTOR, "div.ql-editor.textarea")))
except:
# Reopen browser for login
if not web_driver:
driver = session.reopen(headless=False)
if not webdriver:
driver = session.reopen()
driver.get(f"{cls.url}/chat")
wait = WebDriverWait(driver, 240)
wait.until(EC.visibility_of_element_located((By.CSS_SELECTOR, "div.ql-editor.textarea")))

@ -60,16 +60,3 @@ class HuggingChat(AsyncGeneratorProvider):
async with session.delete(f"{cls.url}/conversation/{conversation_id}", proxy=proxy) as response:
response.raise_for_status()
@classmethod
@property
def params(cls):
params = [
("model", "str"),
("messages", "list[dict[str, str]]"),
("stream", "bool"),
("proxy", "str"),
]
param = ", ".join([": ".join(p) for p in params])
return f"g4f.provider.{cls.__name__} supports: ({param})"

@ -87,15 +87,3 @@ class OpenAssistant(AsyncGeneratorProvider):
}
async with session.delete("https://open-assistant.io/api/chat", proxy=proxy, params=params) as response:
response.raise_for_status()
@classmethod
@property
def params(cls):
params = [
("model", "str"),
("messages", "list[dict[str, str]]"),
("stream", "bool"),
("proxy", "str"),
]
param = ", ".join([": ".join(p) for p in params])
return f"g4f.provider.{cls.__name__} supports: ({param})"

@ -6,7 +6,8 @@ from asyncstdlib.itertools import tee
from async_property import async_cached_property
from ..base_provider import AsyncGeneratorProvider
from ..helper import get_browser, get_event_loop
from ..helper import get_event_loop
from ..webdriver import get_browser
from ...typing import AsyncResult, Messages
from ...requests import StreamSession
@ -38,7 +39,10 @@ class OpenaiChat(AsyncGeneratorProvider):
**kwargs
) -> Response:
if prompt:
messages.append({"role": "user", "content": prompt})
messages.append({
"role": "user",
"content": prompt
})
generator = cls.create_async_generator(
model,
messages,
@ -49,12 +53,9 @@ class OpenaiChat(AsyncGeneratorProvider):
response_fields=True,
**kwargs
)
fields: ResponseFields = await anext(generator)
if "access_token" not in kwargs:
kwargs["access_token"] = cls._access_token
return Response(
generator,
fields,
await anext(generator),
action,
messages,
kwargs
@ -87,7 +88,6 @@ class OpenaiChat(AsyncGeneratorProvider):
headers = {
"Accept": "text/event-stream",
"Authorization": f"Bearer {access_token}",
"Cookie": 'intercom-device-id-dgkjq2bp=0f047573-a750-46c8-be62-6d54b56e7bf0; ajs_user_id=user-iv3vxisaoNodwWpxmNpMfekH; ajs_anonymous_id=fd91be0b-0251-4222-ac1e-84b1071e9ec1; __Host-next-auth.csrf-token=d2b5f67d56f7dd6a0a42ae4becf2d1a6577b820a5edc88ab2018a59b9b506886%7Ce5c33eecc460988a137cbc72d90ee18f1b4e2f672104f368046df58e364376ac; _cfuvid=gt_mA.q6rue1.7d2.AR0KHpbVBS98i_ppfi.amj2._o-1700353424353-0-604800000; cf_clearance=GkHCfPSFU.NXGcHROoe4FantnqmnNcluhTNHz13Tk.M-1700353425-0-1-dfe77f81.816e9bc2.714615da-0.2.1700353425; __Secure-next-auth.callback-url=https%3A%2F%2Fchat.openai.com; intercom-session-dgkjq2bp=UWdrS1hHazk5VXN1c0V5Q1F0VXdCQmsyTU9pVjJMUkNpWnFnU3dKWmtIdGwxTC9wbjZuMk5hcEc0NWZDOGdndS0tSDNiaDNmMEdIL1RHU1dFWDBwOHFJUT09--f754361b91fddcd23a13b288dcb2bf8c7f509e91; _uasid="Z0FBQUFBQmxXVnV0a3dmVno4czRhcDc2ZVcwaUpSNUdZejlDR25YSk5NYTJQQkpyNmRvOGxjTHMyTlAxWmJhaURrMVhjLXZxQXdZeVpBbU1aczA5WUpHT2dwaS1MOWc4MnhyNWFnbGRzeGdJcGFKT0ZRdnBTMVJHcGV2MGNTSnVQY193c0hqUWIycHhQRVF4dENlZ3phcDdZeHgxdVhoalhrZmtZME9NbWhMQjdVR3Vzc3FRRk0ybjJjNWMwTWtIRjdPb19lUkFtRmV2MDVqd1kwWU11QTYtQkdZenEzVHhLMGplY1hZM3FlYUt1cVZaNWFTRldleEJETzJKQjk1VTJScy1GUnMxUVZWMnVxYklxMjdockVZbkZyd1R4U1RtMnA1ZzlSeXphdmVOVk9xeEdrRkVOSjhwTVd1QzFtQjhBcWdDaE92Q1VlM2pwcjFQTXRuLVJNRVlZSGpIdlZ0aGV3PT0="; _dd_s=rum=0&expire=1700356244884; __Secure-next-auth.session-token=eyJhbGciOiJkaXIiLCJlbmMiOiJBMjU2R0NNIn0..3aK6Fbdy2_8f07bf.8eT2xgonrCnz7ySY6qXFsg3kzL6UQfXKAYaw3tyn-6_X9657zy47k9qGvmi9mF0QKozj5jau3_Ca62AQQ7FmeC6Y2F1urtzqrXqwTTsQ2LuzFPIQkx6KKb2DXc8zW2-oyEzJ_EY5yxfLB2RlRkSh3M7bYNZh4_ltEcfkj38s_kIPGMxv34udtPWGWET99MCjkdwQWXylJag4s0fETA0orsBAKnGCyqAUNJbb_D7BYtGSV-MQ925kZMG6Di_QmfO0HQWURDYjmdRNcuy1PT_xJ1DJko8sjL42i4j3RhkNDkhqCIqyYImz2eHFWHW7rYKxTkrBhlCPMS5hRdcCswD7JYPcSBiwnVRYgyOocFGXoFvQgIZ2FX9NiZ3SMEVM1VwIGSE-qH0H2nMa8_iBvsOgOWJgKjVAvzzyzZvRVDUUHzJrikSFPNONVDU3h-04c1kVL4qIu9DfeTPN7n8AvNmYwMbro0L9-IUAeXNo4-pwF0Kt-AtTsamqWvMqnK4O_YOyLnDDlvkmnOvDC2d5uinwlQIxr6APO6qFfGLlHiLZemKoekxEE1Fx70dl-Ouhk1VIzbF3OC6XNNxeBm9BUYUiHdL0wj2H9rHgX4cz6ZmS_3VTgpD6UJh-evu5KJ2gIvjYmVbyzEN0aPNDxfvBaOm-Ezpy4bUJ2bUrOwNn-0knWkDiTvjYmNhCyefPCtCF6rpKNay8PCw_yh79C4SdEP6Q4V7LI0Tvdi5uz7kLCiBC4AT9L0ao1WDX03mkUOpjvzHDvPLmj8chW3lTVm_kA0eYGQY4wT0jzleWlfV0Q8rB2oYECNLWksA3F1zlGfcl4lQjprvTXRePkvAbMpoJEsZD3Ylq7-foLDLk4-M2LYAFZDs282AY04sFjAjQBxTELFCCuDgTIgTXSIskY_XCxpVXDbdLlbCJY7XVK45ybwtfqwlKRp8Mo0B131uQAFc-migHaUaoGujxJJk21bP8F0OmhNYHBo4FQqE1rQm2JH5bNM7txKeh5KXdJgVUVbRSr7OIp_OF5-Bx_v9eRBGAIDkue26E2-O8Rnrp5zQ5TnvecQLDaUzWavCLPwsZ0_gsOLBxNOmauNYZtF8IElCsQSFDdhoiMxXsYUm4ZYKEAy3GWq8HGTAvBhNkh1hvnI7y-d8-DOaZf_D_D98-olZfm-LUkeosLNpPB9rxYMqViCiW3KrXE9Yx0wlFm5ePKaVvR7Ym_EPhSOhJBKFPCvdTdMZSNPUcW0ZJBVByq0A9sxD51lYq3gaFyqh94S4s_ox182AQ3szGzHkdgLcnQmJG9OYvKxAVcd43eg6_gODAYhx02GjbMw-7JTAhyXSeCrlMteHyOXl8hai-3LilC3PmMzi7Vbu49dhF1s4LcVlUowen5ira44rQQaB26mdaOUoQfodgt66M3RTWGPXyK1Nb72AzSXsCKyaQPbzeb6cN0fdGSdG4ktwvR04eFNEkquo_3aKu2GmUKTD0XcRx9dYrfXjgY-X1DDTVs1YND2gRhdx7FFEeBVjtbj2UqmG3Rvd4IcHGe7OnYWw2MHDcol68SsR1KckXWwWREz7YTGUnDB2M1kx_H4W2mjclytnlHOnYU3RflegRPeSTbdzUZJvGKXCCz45luHkQWN_4DExE76D-9YqbFIz-RY5yL4h-Zs-i2xjm2K-4xCMM9nQIOqhLMqixIZQ2ldDAidKoYtbs5ppzbcBLyrZM96bq9DwRBY3aacqWdlRd-TfX0wv5KO4fo0sSh5FsuhuN0zcEV_NNXgqIEM_p14EcPqgbrAvCBQ8os70TRBQLXiF0EniSofGjxwF8kQvUk3C6Wfc8cTTeN-E6GxCVTn91HBwA1iSEZlRLMVb8_BcRJNqwbgnb_07jR6-eo42u88CR3KQdAWwbQRdMxsURFwZ0ujHXVGG0Ll6qCFBcHXWyDO1x1yHdHnw8_8yF26pnA2iPzrFR-8glMgIA-639sLuGAxjO1_ZuvJ9CAB41Az9S_jaZwaWy215Hk4-BRYD-MKmHtonwo3rrxhE67WJgbbu14efsw5nT6ow961pffgwXov5VA1Rg7nv1E8RvQOx7umWW6o8R4W6L8f2COsmPTXfgwIjoJKkjhUqAQ8ceG7cM0ET-38yaC0ObU8EkXfdGGgxI28qTEZWczG66_iM4hw7QEGCY5Cz2kbO6LETAiw9OsSigtBvDS7f0Ou0bZ41pdK7G3FmvdZAnjWPjObnDF4k4uWfn7mzt0fgj3FyqK20JezRDyGuAbUUhOvtZpc9sJpzxR34eXEZTouuALrHcGuNij4z6rx51FrQsaMtiup8QVrhtZbXtKLMYnWYSbkhuTeN2wY-xV1ZUsQlakIZszzGF7kuIG87KKWMpuPMvbXjz6Pp_gWJiIC6aQuk8xl5g0iBPycf_6Q-MtpuYxzNE2TpI1RyR9mHeXmteoRzrFiWp7yEC-QGNFyAJgxTqxM3CjHh1Jt6IddOsmn89rUo1dZM2Smijv_fbIv3avXLkIPX1KZjILeJCtpU0wAdsihDaRiRgDdx8fG__F8zuP0n7ziHas73cwrfg-Ujr6DhC0gTNxyd9dDA_oho9N7CQcy6EFmfNF2te7zpLony0859jtRv2t1TnpzAa1VvMK4u6mXuJ2XDo04_6GzLO3aPHinMdl1BcIAWnqAqWAu3euGFLTHOhXlfijut9N1OCifd_zWjhVtzlR39uFeCQBU5DyQArzQurdoMx8U1ETsnWgElxGSStRW-YQoPsAJ87eg9trqKspFpTVlAVN3t1GtoEAEhcwhe81SDssLmKGLc.7PqS6jRGTIfgTPlO7Ognvg; __cf_bm=VMWoAKEB45hQSwxXtnYXcurPaGZDJS4dMi6dIMFLwdw-1700355394-0-ATVsbq97iCaTaJbtYr8vtg1Zlbs3nLrJLKVBHYa2Jn7hhkGclqAy8Gbyn5ePEhDRqj93MsQmtayfYLqY5n4WiLY=; __cflb=0H28vVfF4aAyg2hkHFH9CkdHRXPsfCUf6VpYf2kz3RX'
}
async with StreamSession(
proxies={"https": proxy},
@ -95,6 +95,8 @@ class OpenaiChat(AsyncGeneratorProvider):
headers=headers,
timeout=timeout
) as session:
end_turn = EndTurn()
while not end_turn.is_end:
data = {
"action": action,
"arkose_token": await get_arkose_token(proxy, timeout),
@ -109,10 +111,6 @@ class OpenaiChat(AsyncGeneratorProvider):
"author": {"role": "user"},
"content": {"content_type": "text", "parts": [messages[-1]["content"]]},
}]
first = True
end_turn = EndTurn()
while first or auto_continue and not end_turn.is_end:
first = False
async with session.post(f"{cls.url}/backend-api/conversation", json=data) as response:
try:
response.raise_for_status()
@ -120,7 +118,8 @@ class OpenaiChat(AsyncGeneratorProvider):
raise RuntimeError(f"Error {response.status_code}: {await response.text()}")
last_message = 0
async for line in response.iter_lines():
if line.startswith(b"data: "):
if not line.startswith(b"data: "):
continue
line = line[6:]
if line == b"[DONE]":
break
@ -146,17 +145,11 @@ class OpenaiChat(AsyncGeneratorProvider):
yield new_message[last_message:]
last_message = len(new_message)
if "finish_details" in line["message"]["metadata"]:
if line["message"]["metadata"]["finish_details"]["type"] == "max_tokens":
if line["message"]["metadata"]["finish_details"]["type"] == "stop":
end_turn.end()
data = {
"action": "continue",
"arkose_token": await get_arkose_token(proxy, timeout),
"conversation_id": conversation_id,
"parent_message_id": parent_id,
"model": models[model],
"history_and_training_disabled": False,
}
if not auto_continue:
break
action = "continue"
await asyncio.sleep(5)
@classmethod
@ -167,7 +160,7 @@ class OpenaiChat(AsyncGeneratorProvider):
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
driver = get_browser("~/openai", proxy=proxy)
driver = get_browser(proxy=proxy)
except ImportError:
return
try:
@ -193,18 +186,6 @@ class OpenaiChat(AsyncGeneratorProvider):
raise RuntimeError("Read access token failed")
return cls._access_token
@classmethod
@property
def params(cls):
params = [
("model", "str"),
("messages", "list[dict[str, str]]"),
("stream", "bool"),
("proxy", "str"),
("access_token", "str"),
]
param = ", ".join([": ".join(p) for p in params])
return f"g4f.provider.{cls.__name__} supports: ({param})"
async def get_arkose_token(proxy: str = None, timeout: int = None) -> str:
config = {
@ -293,7 +274,7 @@ class Response():
async def variant(self, **kwargs) -> Response:
if self.action != "next":
raise RuntimeError("Can't create variant with continue or variant request.")
raise RuntimeError("Can't create variant from continue or variant request.")
return await OpenaiChat.create(
**self._options,
messages=self._messages,

@ -4,7 +4,8 @@ import time
from ...typing import CreateResult, Messages
from ..base_provider import BaseProvider
from ..helper import WebDriver, WebDriverSession, format_prompt
from ..helper import format_prompt
from ..webdriver import WebDriver, WebDriverSession
models = {
"meta-llama/Llama-2-7b-chat-hf": {"name": "Llama-2-7b"},
@ -33,7 +34,7 @@ class Poe(BaseProvider):
messages: Messages,
stream: bool,
proxy: str = None,
web_driver: WebDriver = None,
webdriver: WebDriver = None,
user_data_dir: str = None,
headless: bool = True,
**kwargs
@ -44,7 +45,7 @@ class Poe(BaseProvider):
raise ValueError(f"Model are not supported: {model}")
prompt = format_prompt(messages)
session = WebDriverSession(web_driver, user_data_dir, headless, proxy=proxy)
session = WebDriverSession(webdriver, user_data_dir, headless, proxy=proxy)
with session as driver:
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
@ -80,8 +81,8 @@ class Poe(BaseProvider):
wait.until(EC.visibility_of_element_located((By.CSS_SELECTOR, "textarea[class^='GrowingTextArea']")))
except:
# Reopen browser for login
if not web_driver:
driver = session.reopen(headless=False)
if not webdriver:
driver = session.reopen()
driver.get(f"{cls.url}/{models[model]['name']}")
wait = WebDriverWait(driver, 240)
wait.until(EC.visibility_of_element_located((By.CSS_SELECTOR, "textarea[class^='GrowingTextArea']")))

@ -60,18 +60,3 @@ class Raycast(BaseProvider):
token = completion_chunk['text']
if token != None:
yield token
@classmethod
@property
def params(cls):
params = [
("model", "str"),
("messages", "list[dict[str, str]]"),
("stream", "bool"),
("temperature", "float"),
("top_p", "int"),
("model", "str"),
("auth", "str"),
]
param = ", ".join([": ".join(p) for p in params])
return f"g4f.provider.{cls.__name__} supports: ({param})"

@ -4,7 +4,8 @@ import time
from ...typing import CreateResult, Messages
from ..base_provider import BaseProvider
from ..helper import WebDriver, WebDriverSession, format_prompt
from ..helper import format_prompt
from ..webdriver import WebDriver, WebDriverSession
models = {
"theb-ai": "TheB.AI",
@ -44,14 +45,14 @@ class Theb(BaseProvider):
messages: Messages,
stream: bool,
proxy: str = None,
web_driver: WebDriver = None,
webdriver: WebDriver = None,
virtual_display: bool = True,
**kwargs
) -> CreateResult:
if model in models:
model = models[model]
prompt = format_prompt(messages)
web_session = WebDriverSession(web_driver, virtual_display=virtual_display, proxy=proxy)
web_session = WebDriverSession(webdriver, virtual_display=virtual_display, proxy=proxy)
with web_session as driver:
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
@ -61,22 +62,16 @@ class Theb(BaseProvider):
# Register fetch hook
script = """
window._fetch = window.fetch;
window.fetch = (url, options) => {
window.fetch = async (url, options) => {
// Call parent fetch method
const result = window._fetch(url, options);
const response = await window._fetch(url, options);
if (!url.startsWith("/api/conversation")) {
return result;
}
// Load response reader
result.then((response) => {
if (!response.body.locked) {
window._reader = response.body.getReader();
}
});
// Return dummy response
return new Promise((resolve, reject) => {
resolve(new Response(new ReadableStream()))
});
// Copy response
copy = response.clone();
window._reader = response.body.pipeThrough(new TextDecoderStream()).getReader();
return copy;
}
window._last_message = "";
"""
@ -97,7 +92,6 @@ window._last_message = "";
wait = WebDriverWait(driver, 240)
wait.until(EC.visibility_of_element_located((By.ID, "textareaAutosize")))
time.sleep(200)
try:
driver.find_element(By.CSS_SELECTOR, ".driver-overlay").click()
driver.find_element(By.CSS_SELECTOR, ".driver-overlay").click()
@ -134,9 +128,8 @@ if(window._reader) {
if (chunk['done']) {
return null;
}
text = (new TextDecoder()).decode(chunk['value']);
message = '';
text.split('\\r\\n').forEach((line, index) => {
chunk['value'].split('\\r\\n').forEach((line, index) => {
if (line.startsWith('data: ')) {
try {
line = JSON.parse(line.substring('data: '.length));

@ -0,0 +1,92 @@
from __future__ import annotations
import time
from platformdirs import user_config_dir
try:
from selenium.webdriver.remote.webdriver import WebDriver
except ImportError:
class WebDriver():
pass
try:
from undetected_chromedriver import Chrome, ChromeOptions
except ImportError:
class Chrome():
def __init__():
raise RuntimeError('Please install the "undetected_chromedriver" package')
class ChromeOptions():
def add_argument():
pass
try:
from pyvirtualdisplay import Display
has_pyvirtualdisplay = True
except ImportError:
has_pyvirtualdisplay = False
def get_browser(
user_data_dir: str = None,
headless: bool = False,
proxy: str = None,
options: ChromeOptions = None
) -> Chrome:
if user_data_dir == None:
user_data_dir = user_config_dir("g4f")
if proxy:
if not options:
options = ChromeOptions()
options.add_argument(f'--proxy-server={proxy}')
return Chrome(options=options, user_data_dir=user_data_dir, headless=headless)
class WebDriverSession():
def __init__(
self,
webdriver: WebDriver = None,
user_data_dir: str = None,
headless: bool = False,
virtual_display: bool = False,
proxy: str = None,
options: ChromeOptions = None
):
self.webdriver = webdriver
self.user_data_dir = user_data_dir
self.headless = headless
self.virtual_display = None
if has_pyvirtualdisplay and virtual_display:
self.virtual_display = Display(size=(1920,1080))
self.proxy = proxy
self.options = options
self.default_driver = None
def reopen(
self,
user_data_dir: str = None,
headless: bool = False,
virtual_display: bool = False
) -> WebDriver:
if user_data_dir == None:
user_data_dir = self.user_data_dir
if self.default_driver:
self.default_driver.quit()
if not virtual_display and self.virtual_display:
self.virtual_display.stop()
self.virtual_display = None
self.default_driver = get_browser(user_data_dir, headless, self.proxy)
return self.default_driver
def __enter__(self) -> WebDriver:
if self.webdriver:
return self.webdriver
if self.virtual_display:
self.virtual_display.start()
self.default_driver = get_browser(self.user_data_dir, self.headless, self.proxy, self.options)
return self.default_driver
def __exit__(self, exc_type, exc_val, exc_tb):
if self.default_driver:
try:
self.default_driver.close()
except:
pass
time.sleep(0.1)
self.default_driver.quit()
if self.virtual_display:
self.virtual_display.stop()
Loading…
Cancel
Save