gpt4free/g4f/Provider/GptTalkRu.py

59 lines
2.3 KiB
Python
Raw Normal View History

2023-11-22 20:21:29 +00:00
from __future__ import annotations
from aiohttp import ClientSession, BaseConnector
2023-11-22 20:21:29 +00:00
from ..typing import AsyncResult, Messages
from .base_provider import AsyncGeneratorProvider
from .helper import get_random_string, get_connector
from ..requests import raise_for_status, get_args_from_browser, WebDriver
from ..webdriver import has_seleniumwire
from ..errors import MissingRequirementsError
2023-11-22 20:21:29 +00:00
class GptTalkRu(AsyncGeneratorProvider):
url = "https://gpttalk.ru"
working = True
supports_gpt_35_turbo = True
@classmethod
async def create_async_generator(
cls,
model: str,
messages: Messages,
proxy: str = None,
connector: BaseConnector = None,
webdriver: WebDriver = None,
2023-11-22 20:21:29 +00:00
**kwargs
) -> AsyncResult:
if not model:
model = "gpt-3.5-turbo"
if not has_seleniumwire:
raise MissingRequirementsError('Install "selenium-wire" package')
args = get_args_from_browser(f"{cls.url}", webdriver)
args["headers"]["accept"] = "application/json, text/plain, */*"
async with ClientSession(connector=get_connector(connector, proxy), **args) as session:
async with session.get("https://gpttalk.ru/getToken") as response:
await raise_for_status(response)
public_key = (await response.json())["response"]["key"]["publicKey"]
random_string = get_random_string(8)
2023-11-22 20:21:29 +00:00
data = {
"model": model,
"modelType": 1,
"prompt": messages,
"responseType": "stream",
"security": {
"randomMessage": random_string,
"shifrText": encrypt(public_key, random_string)
}
2023-11-22 20:21:29 +00:00
}
async with session.post(f"{cls.url}/gpt2", json=data, proxy=proxy) as response:
await raise_for_status(response)
2023-11-22 20:21:29 +00:00
async for chunk in response.content.iter_any():
2024-04-06 21:55:30 +00:00
yield chunk.decode(errors="ignore")
def encrypt(public_key: str, value: str) -> str:
from Crypto.Cipher import PKCS1_v1_5
from Crypto.PublicKey import RSA
import base64
rsa_key = RSA.importKey(public_key)
cipher = PKCS1_v1_5.new(rsa_key)
return base64.b64encode(cipher.encrypt(value.encode())).decode()