gpt4free/g4f/Provider/Bard.py

93 lines
2.7 KiB
Python
Raw Normal View History

from __future__ import annotations
2023-07-28 10:07:17 +00:00
import json
import random
import re
2023-08-18 09:45:35 +00:00
from aiohttp import ClientSession
2023-07-28 10:07:17 +00:00
from .base_provider import AsyncProvider, format_prompt, get_cookies
2023-07-28 10:07:17 +00:00
class Bard(AsyncProvider):
2023-07-28 10:07:17 +00:00
url = "https://bard.google.com"
needs_auth = True
working = True
_snlm0e = None
2023-07-28 10:07:17 +00:00
2023-08-18 09:45:35 +00:00
@classmethod
async def create_async(
cls,
model: str,
messages: list[dict[str, str]],
proxy: str = None,
cookies: dict = None,
**kwargs
) -> str:
prompt = format_prompt(messages)
2023-08-18 09:45:35 +00:00
if proxy and "://" not in proxy:
proxy = f"http://{proxy}"
if not cookies:
cookies = get_cookies(".google.com")
2023-07-28 10:07:17 +00:00
2023-08-18 09:45:35 +00:00
headers = {
'authority': 'bard.google.com',
'origin': 'https://bard.google.com',
'referer': 'https://bard.google.com/',
'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36',
'x-same-domain': '1',
2023-07-28 10:07:17 +00:00
}
2023-08-18 09:45:35 +00:00
async with ClientSession(
cookies=cookies,
headers=headers
) as session:
if not cls._snlm0e:
async with session.get(cls.url, proxy=proxy) as response:
text = await response.text()
match = re.search(r'SNlM0e\":\"(.*?)\"', text)
if not match:
raise RuntimeError("No snlm0e value.")
cls._snlm0e = match.group(1)
2023-08-18 09:45:35 +00:00
params = {
'bl': 'boq_assistant-bard-web-server_20230326.21_p0',
'_reqid': random.randint(1111, 9999),
'rt': 'c'
}
data = {
'at': cls._snlm0e,
2023-08-18 09:45:35 +00:00
'f.req': json.dumps([None, json.dumps([[prompt]])])
}
intents = '.'.join([
'assistant',
'lamda',
'BardFrontendService'
])
async with session.post(
f'{cls.url}/_/BardChatUi/data/{intents}/StreamGenerate',
data=data,
params=params,
proxy=proxy
) as response:
response = await response.text()
response = json.loads(response.splitlines()[3])[0][2]
response = json.loads(response)[4][0][1][0]
return response
2023-07-28 10:07:17 +00:00
@classmethod
@property
def params(cls):
params = [
("model", "str"),
("messages", "list[dict[str, str]]"),
("stream", "bool"),
("proxy", "str"),
]
param = ", ".join([": ".join(p) for p in params])
2023-09-17 21:23:54 +00:00
return f"g4f.provider.{cls.__name__} supports: ({param})"