mirror of https://github.com/xtekky/gpt4free
Add Gemini Provider with image upload and generation
parent
2a591b8d6e
commit
c1b992c346
@ -0,0 +1,165 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import random
|
||||
import re
|
||||
|
||||
from aiohttp import ClientSession
|
||||
|
||||
from ...typing import Messages, Cookies, ImageType, AsyncResult
|
||||
from ..base_provider import AsyncGeneratorProvider
|
||||
from ..helper import format_prompt, get_cookies
|
||||
from ...errors import MissingAuthError
|
||||
from ...image import to_bytes, ImageResponse
|
||||
|
||||
REQUEST_HEADERS = {
|
||||
"authority": "gemini.google.com",
|
||||
"origin": "https://gemini.google.com",
|
||||
"referer": "https://gemini.google.com/",
|
||||
'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36',
|
||||
'x-same-domain': '1',
|
||||
}
|
||||
REQUEST_BL_PARAM = "boq_assistant-bard-web-server_20240201.08_p8"
|
||||
REQUEST_URL = "https://gemini.google.com/_/BardChatUi/data/assistant.lamda.BardFrontendService/StreamGenerate"
|
||||
UPLOAD_IMAGE_URL = "https://content-push.googleapis.com/upload/"
|
||||
UPLOAD_IMAGE_HEADERS = {
|
||||
"authority": "content-push.googleapis.com",
|
||||
"accept": "*/*",
|
||||
"accept-language": "en-US,en;q=0.7",
|
||||
"authorization": "Basic c2F2ZXM6cyNMdGhlNmxzd2F2b0RsN3J1d1U=",
|
||||
"content-type": "application/x-www-form-urlencoded;charset=UTF-8",
|
||||
"origin": "https://gemini.google.com",
|
||||
"push-id": "feeds/mcudyrk2a4khkz",
|
||||
"referer": "https://gemini.google.com/",
|
||||
"x-goog-upload-command": "start",
|
||||
"x-goog-upload-header-content-length": "",
|
||||
"x-goog-upload-protocol": "resumable",
|
||||
"x-tenant-id": "bard-storage",
|
||||
}
|
||||
|
||||
class Gemini(AsyncGeneratorProvider):
|
||||
url = "https://gemini.google.com"
|
||||
needs_auth = True
|
||||
working = True
|
||||
supports_stream = False
|
||||
|
||||
@classmethod
|
||||
async def create_async_generator(
|
||||
cls,
|
||||
model: str,
|
||||
messages: Messages,
|
||||
proxy: str = None,
|
||||
cookies: Cookies = None,
|
||||
image: ImageType = None,
|
||||
image_name: str = None,
|
||||
**kwargs
|
||||
) -> AsyncResult:
|
||||
prompt = format_prompt(messages)
|
||||
if not cookies:
|
||||
cookies = get_cookies(".google.com", False)
|
||||
if "__Secure-1PSID" not in cookies:
|
||||
raise MissingAuthError('Missing "__Secure-1PSID" cookie')
|
||||
|
||||
image_url = await cls.upload_image(to_bytes(image), image_name, proxy) if image else None
|
||||
|
||||
async with ClientSession(
|
||||
cookies=cookies,
|
||||
headers=REQUEST_HEADERS
|
||||
) as session:
|
||||
async with session.get(cls.url, proxy=proxy) as response:
|
||||
text = await response.text()
|
||||
match = re.search(r'SNlM0e\":\"(.*?)\"', text)
|
||||
if match:
|
||||
snlm0e = match.group(1)
|
||||
else:
|
||||
raise RuntimeError("SNlM0e not found")
|
||||
|
||||
params = {
|
||||
'bl': REQUEST_BL_PARAM,
|
||||
'_reqid': random.randint(1111, 9999),
|
||||
'rt': 'c'
|
||||
}
|
||||
data = {
|
||||
'at': snlm0e,
|
||||
'f.req': json.dumps([None, json.dumps(cls.build_request(
|
||||
prompt,
|
||||
image_url=image_url,
|
||||
image_name=image_name
|
||||
))])
|
||||
}
|
||||
async with session.post(
|
||||
REQUEST_URL,
|
||||
data=data,
|
||||
params=params,
|
||||
proxy=proxy
|
||||
) as response:
|
||||
response = await response.text()
|
||||
response_part = json.loads(json.loads(response.splitlines()[-5])[0][2])
|
||||
if response_part[4] is None:
|
||||
response_part = json.loads(json.loads(response.splitlines()[-7])[0][2])
|
||||
|
||||
content = response_part[4][0][1][0]
|
||||
image_prompt = None
|
||||
match = re.search(r'\[Imagen of (.*?)\]', content)
|
||||
if match:
|
||||
image_prompt = match.group(1)
|
||||
content = content.replace(match.group(0), '')
|
||||
|
||||
yield content
|
||||
if image_prompt:
|
||||
images = [image[0][3][3] for image in response_part[4][0][12][7][0]]
|
||||
yield ImageResponse(images, image_prompt)
|
||||
|
||||
def build_request(
|
||||
prompt: str,
|
||||
conversation_id: str = "",
|
||||
response_id: str = "",
|
||||
choice_id: str = "",
|
||||
image_url: str = None,
|
||||
image_name: str = None,
|
||||
tools: list[list[str]] = []
|
||||
) -> list:
|
||||
image_list = [[[image_url, 1], image_name]] if image_url else []
|
||||
return [
|
||||
[prompt, 0, None, image_list, None, None, 0],
|
||||
["en"],
|
||||
[conversation_id, response_id, choice_id, None, None, []],
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
[1],
|
||||
0,
|
||||
[],
|
||||
tools,
|
||||
1,
|
||||
0,
|
||||
]
|
||||
|
||||
async def upload_image(image: bytes, image_name: str = None, proxy: str = None):
|
||||
async with ClientSession(
|
||||
headers=UPLOAD_IMAGE_HEADERS
|
||||
) as session:
|
||||
async with session.options(UPLOAD_IMAGE_URL, proxy=proxy) as reponse:
|
||||
reponse.raise_for_status()
|
||||
|
||||
headers = {
|
||||
"size": str(len(image)),
|
||||
"x-goog-upload-command": "start"
|
||||
}
|
||||
data = f"File name: {image_name}" if image_name else None
|
||||
async with session.post(
|
||||
UPLOAD_IMAGE_URL, headers=headers, data=data, proxy=proxy
|
||||
) as response:
|
||||
response.raise_for_status()
|
||||
upload_url = response.headers["X-Goog-Upload-Url"]
|
||||
|
||||
async with session.options(upload_url, headers=headers) as response:
|
||||
response.raise_for_status()
|
||||
|
||||
headers["x-goog-upload-command"] = "upload, finalize"
|
||||
headers["X-Goog-Upload-Offset"] = "0"
|
||||
async with session.post(
|
||||
upload_url, headers=headers, data=image, proxy=proxy
|
||||
) as response:
|
||||
response.raise_for_status()
|
||||
return await response.text()
|
Loading…
Reference in New Issue