You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
gpt4free/g4f/api/__init__.py

227 lines
8.0 KiB
Python

This file contains invisible Unicode characters!

This file contains invisible Unicode characters that may be processed differently from what appears below. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to reveal hidden characters.

import typing
from .. import BaseProvider
import g4f; g4f.debug.logging = True
import time
import json
import random
import string
import logging
from typing import Union
from loguru import logger
from waitress import serve
from ._logging import hook_logging
from ._tokenizer import tokenize
from flask_cors import CORS
from werkzeug.serving import WSGIRequestHandler
from werkzeug.exceptions import default_exceptions
from werkzeug.middleware.proxy_fix import ProxyFix
from flask import (
Flask,
jsonify,
make_response,
request,
)
class Api:
__default_ip = '127.0.0.1'
__default_port = 1337
def __init__(self, engine: g4f, debug: bool = True, sentry: bool = False,
list_ignored_providers:typing.List[typing.Union[str, BaseProvider]]=None) -> None:
self.engine = engine
self.debug = debug
self.sentry = sentry
self.list_ignored_providers = list_ignored_providers
self.log_level = logging.DEBUG if debug else logging.WARN
hook_logging(level=self.log_level, format='[%(asctime)s] %(levelname)s in %(module)s: %(message)s')
self.logger = logging.getLogger('waitress')
self.app = Flask(__name__)
self.app.wsgi_app = ProxyFix(self.app.wsgi_app, x_port=1)
self.app.after_request(self.__after_request)
def run(self, bind_str, threads=8):
host, port = self.__parse_bind(bind_str)
CORS(self.app, resources={r'/v1/*': {'supports_credentials': True, 'expose_headers': [
'Content-Type',
'Authorization',
'X-Requested-With',
'Accept',
'Origin',
'Access-Control-Request-Method',
'Access-Control-Request-Headers',
'Content-Disposition'], 'max_age': 600}})
self.app.route('/v1/models', methods=['GET'])(self.models)
self.app.route('/v1/models/<model_id>', methods=['GET'])(self.model_info)
self.app.route('/v1/chat/completions', methods=['POST'])(self.chat_completions)
self.app.route('/v1/completions', methods=['POST'])(self.completions)
for ex in default_exceptions:
self.app.register_error_handler(ex, self.__handle_error)
if not self.debug:
self.logger.warning(f'Serving on http://{host}:{port}')
WSGIRequestHandler.protocol_version = 'HTTP/1.1'
serve(self.app, host=host, port=port, ident=None, threads=threads)
def __handle_error(self, e: Exception):
self.logger.error(e)
return make_response(jsonify({
'code': e.code,
'message': str(e.original_exception if self.debug and hasattr(e, 'original_exception') else e.name)}), 500)
@staticmethod
def __after_request(resp):
resp.headers['X-Server'] = f'g4f/{g4f.version}'
return resp
def __parse_bind(self, bind_str):
sections = bind_str.split(':', 2)
if len(sections) < 2:
try:
port = int(sections[0])
return self.__default_ip, port
except ValueError:
return sections[0], self.__default_port
return sections[0], int(sections[1])
async def home(self):
return 'Hello world | https://127.0.0.1:1337/v1'
async def chat_completions(self):
model = request.json.get('model', 'gpt-3.5-turbo')
stream = request.json.get('stream', False)
messages = request.json.get('messages')
logger.info(f'model: {model}, stream: {stream}, request: {messages[-1]["content"]}')
config = None
proxy = None
try:
config = json.load(open("config.json","r",encoding="utf-8"))
proxy = config["proxy"]
except Exception:
pass
if proxy != None:
response = self.engine.ChatCompletion.create(model=model,
stream=stream, messages=messages,
ignored=self.list_ignored_providers,
proxy=proxy)
else:
response = self.engine.ChatCompletion.create(model=model,
stream=stream, messages=messages,
ignored=self.list_ignored_providers)
completion_id = ''.join(random.choices(string.ascii_letters + string.digits, k=28))
completion_timestamp = int(time.time())
if not stream:
prompt_tokens, _ = tokenize(''.join([message['content'] for message in messages]))
completion_tokens, _ = tokenize(response)
return {
'id': f'chatcmpl-{completion_id}',
'object': 'chat.completion',
'created': completion_timestamp,
'model': model,
'choices': [
{
'index': 0,
'message': {
'role': 'assistant',
'content': response,
},
'finish_reason': 'stop',
}
],
'usage': {
'prompt_tokens': prompt_tokens,
'completion_tokens': completion_tokens,
'total_tokens': prompt_tokens + completion_tokens,
},
}
def streaming():
try:
for chunk in response:
completion_data = {
'id': f'chatcmpl-{completion_id}',
'object': 'chat.completion.chunk',
'created': completion_timestamp,
'model': model,
'choices': [
{
'index': 0,
'delta': {
'content': chunk,
},
'finish_reason': None,
}
],
}
content = json.dumps(completion_data, separators=(',', ':'))
yield f'data: {content}\n\n'
time.sleep(0.03)
end_completion_data = {
'id': f'chatcmpl-{completion_id}',
'object': 'chat.completion.chunk',
'created': completion_timestamp,
'model': model,
'choices': [
{
'index': 0,
'delta': {},
'finish_reason': 'stop',
}
],
}
content = json.dumps(end_completion_data, separators=(',', ':'))
yield f'data: {content}\n\n'
logger.success(f'model: {model}, stream: {stream}')
except GeneratorExit:
pass
return self.app.response_class(streaming(), mimetype='text/event-stream')
async def completions(self):
return 'not working yet', 500
async def model_info(self, model_name):
model_info = (g4f.ModelUtils.convert[model_name])
return jsonify({
'id' : model_name,
'object' : 'model',
'created' : 0,
'owned_by' : model_info.base_provider
})
async def models(self):
model_list = [{
'id' : model,
'object' : 'model',
'created' : 0,
'owned_by' : 'g4f'} for model in g4f.Model.__all__()]
return jsonify({
'object': 'list',
'data': model_list})