mirror of https://github.com/xtekky/gpt4free
commit
f2bdd83950
@ -1,111 +0,0 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from ..typing import AsyncResult, Messages
|
||||
from .base_provider import AsyncGeneratorProvider
|
||||
|
||||
import json
|
||||
import cloudscraper
|
||||
|
||||
class PI(AsyncGeneratorProvider):
|
||||
url = "https://chat-gpt.com"
|
||||
working = True
|
||||
|
||||
@classmethod
|
||||
async def create_async_generator(
|
||||
cls,
|
||||
model: str,
|
||||
messages: Messages,
|
||||
proxy: str = None,
|
||||
**kwargs
|
||||
) -> AsyncResult:
|
||||
Conversation = kwargs['conversation']
|
||||
UserPrompt = messages[-1]
|
||||
if UserPrompt['role'] == 'user':
|
||||
UserPrompt = UserPrompt['content']
|
||||
else:
|
||||
UserPrompt = messages[-2]['content']
|
||||
if Conversation == None:
|
||||
Conversation = PI.Start_Conversation()
|
||||
Answer = Ask_PI(UserPrompt,Conversation['sid'],Conversation['cookies'])
|
||||
|
||||
yield Answer[0]['text']
|
||||
|
||||
def Start_Conversation():
|
||||
scraper.headers = {
|
||||
'accept-type': 'application/json'
|
||||
}
|
||||
response = scraper.post('https://pi.ai/api/chat/start', data="{}",headers={'x-api-version': '3'})
|
||||
cookies = response.cookies
|
||||
|
||||
if 'Just a moment' in response.text:
|
||||
return {
|
||||
'error': 'cloudflare detected',
|
||||
'sid': None,
|
||||
'cookies': None,
|
||||
}
|
||||
return {
|
||||
'sid': response.json()['conversations'][0]['sid'],
|
||||
'cookies': cookies
|
||||
}
|
||||
|
||||
def GetConversationTitle(Conversation):
|
||||
response = scraper.post('https://pi.ai/api/chat/start', data="{}",headers={'x-api-version': '3'}, cookies=Conversation['cookies'])
|
||||
if 'Just a moment' in response.text:
|
||||
return {
|
||||
'error': 'cloudflare detected',
|
||||
'title': 'Couldnt get the title',
|
||||
}
|
||||
return {
|
||||
'title': response.json()['conversations'][0]['title']
|
||||
}
|
||||
|
||||
def GetChatHistory(Conversation):
|
||||
params = {
|
||||
'conversation': Conversation['sid'],
|
||||
}
|
||||
response = scraper.get('https://pi.ai/api/chat/history', params=params, cookies=Conversation['cookies'])
|
||||
if 'Just a moment' in response.text:
|
||||
return {
|
||||
'error': 'cloudflare detected',
|
||||
'traceback': 'Couldnt get the chat history'
|
||||
}
|
||||
return response.json()
|
||||
|
||||
session = cloudscraper.session()
|
||||
|
||||
scraper = cloudscraper.create_scraper(
|
||||
browser={
|
||||
'browser': 'chrome',
|
||||
'platform': 'windows',
|
||||
'desktop': True
|
||||
},
|
||||
sess=session
|
||||
)
|
||||
|
||||
scraper.headers = {
|
||||
'Accept': '*/*',
|
||||
'Accept-Encoding': 'deflate,gzip,br',
|
||||
}
|
||||
|
||||
def Ask_PI(message,sid,cookies):
|
||||
json_data = {
|
||||
'text': message,
|
||||
'conversation': sid,
|
||||
'mode': 'BASE',
|
||||
}
|
||||
response = scraper.post('https://pi.ai/api/chat', json=json_data, cookies=cookies)
|
||||
|
||||
if 'Just a moment' in response.text:
|
||||
return [{
|
||||
'error': 'cloudflare detected',
|
||||
'text': 'Couldnt generate the answer because we got detected by cloudflare please try again later'
|
||||
}
|
||||
]
|
||||
result = []
|
||||
for line in response.iter_lines(chunk_size=1024, decode_unicode=True):
|
||||
if line.startswith('data: {"text":'):
|
||||
result.append(json.loads(line.split('data: ')[1].encode('utf-8')))
|
||||
if line.startswith('data: {"title":'):
|
||||
result.append(json.loads(line.split('data: ')[1].encode('utf-8')))
|
||||
|
||||
return result
|
@ -0,0 +1,93 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from ..typing import CreateResult, Messages
|
||||
from .base_provider import BaseProvider, format_prompt
|
||||
|
||||
import json
|
||||
from cloudscraper import CloudScraper, session, create_scraper
|
||||
|
||||
class Pi(BaseProvider):
|
||||
url = "https://chat-gpt.com"
|
||||
working = True
|
||||
supports_stream = True
|
||||
|
||||
@classmethod
|
||||
def create_completion(
|
||||
cls,
|
||||
model: str,
|
||||
messages: Messages,
|
||||
stream: bool,
|
||||
proxy: str = None,
|
||||
scraper: CloudScraper = None,
|
||||
conversation: dict = None,
|
||||
**kwargs
|
||||
) -> CreateResult:
|
||||
if not scraper:
|
||||
scraper = cls.get_scraper()
|
||||
if not conversation:
|
||||
conversation = cls.start_conversation(scraper)
|
||||
answer = cls.ask(scraper, messages, conversation)
|
||||
|
||||
last_answer = 0
|
||||
for line in answer:
|
||||
if "text" in line:
|
||||
yield line["text"][last_answer:]
|
||||
last_answer = len(line["text"])
|
||||
|
||||
def get_scraper():
|
||||
scraper = create_scraper(
|
||||
browser={
|
||||
'browser': 'chrome',
|
||||
'platform': 'windows',
|
||||
'desktop': True
|
||||
},
|
||||
sess=session()
|
||||
)
|
||||
scraper.headers = {
|
||||
'Accept': '*/*',
|
||||
'Accept-Encoding': 'deflate,gzip,br',
|
||||
}
|
||||
return scraper
|
||||
|
||||
def start_conversation(scraper: CloudScraper):
|
||||
response = scraper.post('https://pi.ai/api/chat/start', data="{}", headers={
|
||||
'accept': 'application/json',
|
||||
'x-api-version': '3'
|
||||
})
|
||||
if 'Just a moment' in response.text:
|
||||
raise RuntimeError('Error: Cloudflare detected')
|
||||
return Conversation(
|
||||
response.json()['conversations'][0]['sid'],
|
||||
response.cookies
|
||||
)
|
||||
|
||||
def get_chat_history(scraper: CloudScraper, conversation: Conversation):
|
||||
params = {
|
||||
'conversation': conversation.sid,
|
||||
}
|
||||
response = scraper.get('https://pi.ai/api/chat/history', params=params, cookies=conversation.cookies)
|
||||
if 'Just a moment' in response.text:
|
||||
raise RuntimeError('Error: Cloudflare detected')
|
||||
return response.json()
|
||||
|
||||
def ask(scraper: CloudScraper, messages: Messages, conversation: Conversation):
|
||||
json_data = {
|
||||
'text': format_prompt(messages),
|
||||
'conversation': conversation.sid,
|
||||
'mode': 'BASE',
|
||||
}
|
||||
response = scraper.post('https://pi.ai/api/chat', json=json_data, cookies=conversation.cookies, stream=True)
|
||||
|
||||
for line in response.iter_lines(chunk_size=1024, decode_unicode=True):
|
||||
if 'Just a moment' in line:
|
||||
raise RuntimeError('Error: Cloudflare detected')
|
||||
if line.startswith('data: {"text":'):
|
||||
yield json.loads(line.split('data: ')[1])
|
||||
if line.startswith('data: {"title":'):
|
||||
yield json.loads(line.split('data: ')[1])
|
||||
|
||||
class Conversation():
|
||||
def __init__(self, sid: str, cookies):
|
||||
self.sid = sid
|
||||
self.cookies = cookies
|
||||
|
@ -1,25 +0,0 @@
|
||||
from g4f import Provider
|
||||
|
||||
import g4f
|
||||
|
||||
Conversation = Provider.PI.Start_Conversation()
|
||||
|
||||
Chat_History = Provider.PI.GetChatHistory(Conversation)
|
||||
|
||||
response = g4f.ChatCompletion.create(
|
||||
model="pi",
|
||||
provider=g4f.Provider.PI,
|
||||
messages=[
|
||||
{
|
||||
"role": "user",
|
||||
"content": 'Hello who are you?'
|
||||
}
|
||||
],
|
||||
stream=False,
|
||||
conversation=Conversation
|
||||
)
|
||||
|
||||
for message in response:
|
||||
print(message, flush=True, end='')
|
||||
|
||||
Chat_Title = Provider.PI.GetConversationTitle(Conversation)
|
Loading…
Reference in New Issue