mirror of
https://github.com/xtekky/gpt4free.git
synced 2024-11-05 00:01:00 +00:00
b0276f6c9e
* Added new provider PI (Hacky way to use) * Updated models endpoint made it show real info about the model.py * Added cloudscraper to the requirements * Fixed some bugs aka made streaming also return role
189 lines
7.5 KiB
Python
189 lines
7.5 KiB
Python
import ast
|
|
import logging
|
|
|
|
from fastapi import FastAPI, Response, Request
|
|
from fastapi.responses import StreamingResponse
|
|
from typing import List, Union, Any, Dict, AnyStr
|
|
#from ._tokenizer import tokenize
|
|
from .. import BaseProvider
|
|
|
|
import time
|
|
import json
|
|
import random
|
|
import string
|
|
import uvicorn
|
|
import nest_asyncio
|
|
import g4f
|
|
|
|
class Api:
|
|
def __init__(self, engine: g4f, debug: bool = True, sentry: bool = False,
|
|
list_ignored_providers: List[Union[str, BaseProvider]] = None) -> None:
|
|
self.engine = engine
|
|
self.debug = debug
|
|
self.sentry = sentry
|
|
self.list_ignored_providers = list_ignored_providers
|
|
|
|
self.app = FastAPI()
|
|
nest_asyncio.apply()
|
|
|
|
JSONObject = Dict[AnyStr, Any]
|
|
JSONArray = List[Any]
|
|
JSONStructure = Union[JSONArray, JSONObject]
|
|
|
|
@self.app.get("/")
|
|
async def read_root():
|
|
return Response(content=json.dumps({"info": "g4f API"}, indent=4), media_type="application/json")
|
|
|
|
@self.app.get("/v1")
|
|
async def read_root_v1():
|
|
return Response(content=json.dumps({"info": "Go to /v1/chat/completions or /v1/models."}, indent=4), media_type="application/json")
|
|
|
|
@self.app.get("/v1/models")
|
|
async def models():
|
|
model_list = []
|
|
for model in g4f.Model.__all__():
|
|
model_info = (g4f.ModelUtils.convert[model])
|
|
model_list.append({
|
|
'id': model,
|
|
'object': 'model',
|
|
'created': 0,
|
|
'owned_by': model_info.base_provider}
|
|
)
|
|
return Response(content=json.dumps({
|
|
'object': 'list',
|
|
'data': model_list}, indent=4), media_type="application/json")
|
|
|
|
@self.app.get("/v1/models/{model_name}")
|
|
async def model_info(model_name: str):
|
|
try:
|
|
model_info = (g4f.ModelUtils.convert[model_name])
|
|
|
|
return Response(content=json.dumps({
|
|
'id': model_name,
|
|
'object': 'model',
|
|
'created': 0,
|
|
'owned_by': model_info.base_provider
|
|
}, indent=4), media_type="application/json")
|
|
except:
|
|
return Response(content=json.dumps({"error": "The model does not exist."}, indent=4), media_type="application/json")
|
|
|
|
@self.app.post("/v1/chat/completions")
|
|
async def chat_completions(request: Request, item: JSONStructure = None):
|
|
item_data = {
|
|
'model': 'gpt-3.5-turbo',
|
|
'stream': False,
|
|
}
|
|
|
|
# item contains byte keys, and dict.get suppresses error
|
|
item_data.update({key.decode('utf-8') if isinstance(key, bytes) else key: str(value) for key, value in (item or {}).items()})
|
|
# messages is str, need dict
|
|
if isinstance(item_data.get('messages'), str):
|
|
item_data['messages'] = ast.literal_eval(item_data.get('messages'))
|
|
|
|
model = item_data.get('model')
|
|
stream = True if item_data.get("stream") == "True" else False
|
|
messages = item_data.get('messages')
|
|
conversation = item_data.get('conversation') if item_data.get('conversation') != None else None
|
|
|
|
try:
|
|
if model == 'pi':
|
|
response = g4f.ChatCompletion.create(
|
|
model=model,
|
|
stream=stream,
|
|
messages=messages,
|
|
conversation=conversation,
|
|
ignored=self.list_ignored_providers)
|
|
else:
|
|
response = g4f.ChatCompletion.create(
|
|
model=model,
|
|
stream=stream,
|
|
messages=messages,
|
|
ignored=self.list_ignored_providers)
|
|
except Exception as e:
|
|
logging.exception(e)
|
|
return Response(content=json.dumps({"error": "An error occurred while generating the response."}, indent=4), media_type="application/json")
|
|
completion_id = ''.join(random.choices(string.ascii_letters + string.digits, k=28))
|
|
completion_timestamp = int(time.time())
|
|
|
|
if not stream:
|
|
#prompt_tokens, _ = tokenize(''.join([message['content'] for message in messages]))
|
|
#completion_tokens, _ = tokenize(response)
|
|
|
|
json_data = {
|
|
'id': f'chatcmpl-{completion_id}',
|
|
'object': 'chat.completion',
|
|
'created': completion_timestamp,
|
|
'model': model,
|
|
'choices': [
|
|
{
|
|
'index': 0,
|
|
'message': {
|
|
'role': 'assistant',
|
|
'content': response,
|
|
},
|
|
'finish_reason': 'stop',
|
|
}
|
|
],
|
|
'usage': {
|
|
'prompt_tokens': 0, #prompt_tokens,
|
|
'completion_tokens': 0, #completion_tokens,
|
|
'total_tokens': 0, #prompt_tokens + completion_tokens,
|
|
},
|
|
}
|
|
|
|
return Response(content=json.dumps(json_data, indent=4), media_type="application/json")
|
|
|
|
def streaming():
|
|
try:
|
|
for chunk in response:
|
|
completion_data = {
|
|
'id': f'chatcmpl-{completion_id}',
|
|
'object': 'chat.completion.chunk',
|
|
'created': completion_timestamp,
|
|
'model': model,
|
|
'choices': [
|
|
{
|
|
'index': 0,
|
|
'delta': {
|
|
'role': 'assistant',
|
|
'content': chunk,
|
|
},
|
|
'finish_reason': None,
|
|
}
|
|
],
|
|
}
|
|
|
|
content = json.dumps(completion_data, separators=(',', ':'))
|
|
yield f'data: {content}\n\n'
|
|
time.sleep(0.03)
|
|
|
|
end_completion_data = {
|
|
'id': f'chatcmpl-{completion_id}',
|
|
'object': 'chat.completion.chunk',
|
|
'created': completion_timestamp,
|
|
'model': model,
|
|
'choices': [
|
|
{
|
|
'index': 0,
|
|
'delta': {},
|
|
'finish_reason': 'stop',
|
|
}
|
|
],
|
|
}
|
|
|
|
content = json.dumps(end_completion_data, separators=(',', ':'))
|
|
yield f'data: {content}\n\n'
|
|
|
|
except GeneratorExit:
|
|
pass
|
|
|
|
return StreamingResponse(streaming(), media_type="text/event-stream")
|
|
|
|
@self.app.post("/v1/completions")
|
|
async def completions():
|
|
return Response(content=json.dumps({'info': 'Not working yet.'}, indent=4), media_type="application/json")
|
|
|
|
def run(self, ip):
|
|
split_ip = ip.split(":")
|
|
uvicorn.run(app=self.app, host=split_ip[0], port=int(split_ip[1]), use_colors=False)
|