2024-01-14 06:45:41 +00:00
"""
This module provides functionalities for creating and managing images using Bing ' s service.
It includes functions for user login , session creation , image creation , and processing .
"""
2024-01-10 09:34:56 +00:00
import asyncio
2024-01-14 06:45:41 +00:00
import time
import json
import os
2024-01-10 09:34:56 +00:00
from aiohttp import ClientSession
from bs4 import BeautifulSoup
from urllib . parse import quote
2024-01-14 06:45:41 +00:00
from typing import Generator , List , Dict
2024-01-10 09:34:56 +00:00
from . . create_images import CreateImagesProvider
from . . helper import get_cookies , get_event_loop
from . . . webdriver import WebDriver , get_driver_cookies , get_browser
from . . . base_provider import ProviderType
2024-01-13 14:37:36 +00:00
from . . . image import format_images_markdown
2024-01-10 09:34:56 +00:00
BING_URL = " https://www.bing.com "
2024-01-14 06:45:41 +00:00
TIMEOUT_LOGIN = 1200
TIMEOUT_IMAGE_CREATION = 300
ERRORS = [
" this prompt is being reviewed " ,
" this prompt has been blocked " ,
" we ' re working hard to offer image creator in more languages " ,
" we can ' t create your images right now "
]
BAD_IMAGES = [
" https://r.bing.com/rp/in-2zU3AJUdkgFe7ZKv19yPBHVs.png " ,
" https://r.bing.com/rp/TX9QuO3WzcCJz1uaaSwQAz39Kb0.jpg " ,
]
def wait_for_login ( driver : WebDriver , timeout : int = TIMEOUT_LOGIN ) - > None :
"""
Waits for the user to log in within a given timeout period .
2024-01-10 09:34:56 +00:00
2024-01-14 06:45:41 +00:00
Args :
driver ( WebDriver ) : Webdriver for browser automation .
timeout ( int ) : Maximum waiting time in seconds .
Raises :
RuntimeError : If the login process exceeds the timeout .
"""
2024-01-10 09:34:56 +00:00
driver . get ( f " { BING_URL } / " )
start_time = time . time ( )
2024-01-14 06:45:41 +00:00
while not driver . get_cookie ( " _U " ) :
2024-01-10 09:34:56 +00:00
if time . time ( ) - start_time > timeout :
raise RuntimeError ( " Timeout error " )
time . sleep ( 0.5 )
2024-01-14 06:45:41 +00:00
def create_session ( cookies : Dict [ str , str ] ) - > ClientSession :
"""
Creates a new client session with specified cookies and headers .
Args :
cookies ( Dict [ str , str ] ) : Cookies to be used for the session .
Returns :
ClientSession : The created client session .
"""
2024-01-10 09:34:56 +00:00
headers = {
" accept " : " text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7 " ,
" accept-encoding " : " gzip, deflate, br " ,
" accept-language " : " en-US,en;q=0.9,zh-CN;q=0.8,zh-TW;q=0.7,zh;q=0.6 " ,
" content-type " : " application/x-www-form-urlencoded " ,
" referrer-policy " : " origin-when-cross-origin " ,
" referrer " : " https://www.bing.com/images/create/ " ,
" origin " : " https://www.bing.com " ,
" user-agent " : " Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36 Edg/111.0.1661.54 " ,
" sec-ch-ua " : " \" Microsoft Edge \" ;v= \" 111 \" , \" Not(A:Brand \" ;v= \" 8 \" , \" Chromium \" ;v= \" 111 \" " ,
" sec-ch-ua-mobile " : " ?0 " ,
" sec-fetch-dest " : " document " ,
" sec-fetch-mode " : " navigate " ,
" sec-fetch-site " : " same-origin " ,
" sec-fetch-user " : " ?1 " ,
" upgrade-insecure-requests " : " 1 " ,
}
if cookies :
2024-01-14 06:45:41 +00:00
headers [ " Cookie " ] = " ; " . join ( f " { k } = { v } " for k , v in cookies . items ( ) )
2024-01-10 09:34:56 +00:00
return ClientSession ( headers = headers )
2024-01-14 06:45:41 +00:00
async def create_images ( session : ClientSession , prompt : str , proxy : str = None , timeout : int = TIMEOUT_IMAGE_CREATION ) - > List [ str ] :
"""
Creates images based on a given prompt using Bing ' s service.
Args :
session ( ClientSession ) : Active client session .
prompt ( str ) : Prompt to generate images .
proxy ( str , optional ) : Proxy configuration .
timeout ( int ) : Timeout for the request .
Returns :
List [ str ] : A list of URLs to the created images .
Raises :
RuntimeError : If image creation fails or times out .
"""
url_encoded_prompt = quote ( prompt )
2024-01-10 09:34:56 +00:00
payload = f " q= { url_encoded_prompt } &rt=4&FORM=GENCRE "
url = f " { BING_URL } /images/create?q= { url_encoded_prompt } &rt=4&FORM=GENCRE "
2024-01-14 06:45:41 +00:00
async with session . post ( url , allow_redirects = False , data = payload , timeout = timeout ) as response :
2024-01-10 09:34:56 +00:00
response . raise_for_status ( )
text = ( await response . text ( ) ) . lower ( )
2024-01-14 06:45:41 +00:00
for error in ERRORS :
2024-01-10 09:34:56 +00:00
if error in text :
raise RuntimeError ( f " Create images failed: { error } " )
if response . status != 302 :
url = f " { BING_URL } /images/create?q= { url_encoded_prompt } &rt=3&FORM=GENCRE "
async with session . post ( url , allow_redirects = False , proxy = proxy , timeout = timeout ) as response :
if response . status != 302 :
2024-01-13 14:37:36 +00:00
raise RuntimeError ( f " Create images failed. Code: { response . status } " )
2024-01-10 09:34:56 +00:00
redirect_url = response . headers [ " Location " ] . replace ( " &nfy=1 " , " " )
redirect_url = f " { BING_URL } { redirect_url } "
request_id = redirect_url . split ( " id= " ) [ 1 ]
async with session . get ( redirect_url ) as response :
response . raise_for_status ( )
polling_url = f " { BING_URL } /images/create/async/results/ { request_id } ?q= { url_encoded_prompt } "
start_time = time . time ( )
while True :
if time . time ( ) - start_time > timeout :
2024-01-13 14:37:36 +00:00
raise RuntimeError ( f " Timeout error after { timeout } sec " )
2024-01-10 09:34:56 +00:00
async with session . get ( polling_url ) as response :
if response . status != 200 :
2024-01-13 14:37:36 +00:00
raise RuntimeError ( f " Polling images faild. Code: { response . status } " )
2024-01-10 09:34:56 +00:00
text = await response . text ( )
if not text :
await asyncio . sleep ( 1 )
else :
break
error = None
try :
error = json . loads ( text ) . get ( " errorMessage " )
except :
pass
if error == " Pending " :
raise RuntimeError ( " Prompt is been blocked " )
elif error :
raise RuntimeError ( error )
return read_images ( text )
2024-01-14 06:45:41 +00:00
def read_images ( html_content : str ) - > List [ str ] :
"""
Extracts image URLs from the HTML content .
Args :
html_content ( str ) : HTML content containing image URLs .
Returns :
List [ str ] : A list of image URLs .
"""
soup = BeautifulSoup ( html_content , " html.parser " )
tags = soup . find_all ( " img " , class_ = " mimg " )
images = [ img [ " src " ] . split ( " ?w= " ) [ 0 ] for img in tags ]
if any ( im in BAD_IMAGES for im in images ) :
2024-01-10 09:34:56 +00:00
raise RuntimeError ( " Bad images found " )
if not images :
raise RuntimeError ( " No images found " )
return images
2024-01-14 06:45:41 +00:00
async def create_images_markdown ( cookies : Dict [ str , str ] , prompt : str , proxy : str = None ) - > str :
"""
Creates markdown formatted string with images based on the prompt .
Args :
cookies ( Dict [ str , str ] ) : Cookies to be used for the session .
prompt ( str ) : Prompt to generate images .
proxy ( str , optional ) : Proxy configuration .
Returns :
str : Markdown formatted string with images .
"""
async with create_session ( cookies ) as session :
2024-01-10 09:34:56 +00:00
images = await create_images ( session , prompt , proxy )
return format_images_markdown ( images , prompt )
2024-01-14 06:45:41 +00:00
def get_cookies_from_browser ( proxy : str = None ) - > Dict [ str , str ] :
"""
Retrieves cookies from the browser using webdriver .
Args :
proxy ( str , optional ) : Proxy configuration .
Returns :
Dict [ str , str ] : Retrieved cookies .
"""
with get_browser ( proxy = proxy ) as driver :
2024-01-10 09:34:56 +00:00
wait_for_login ( driver )
2024-01-14 06:45:41 +00:00
time . sleep ( 1 )
2024-01-10 09:34:56 +00:00
return get_driver_cookies ( driver )
2024-01-14 06:45:41 +00:00
class CreateImagesBing :
""" A class for creating images using Bing. """
_cookies : Dict [ str , str ] = { }
@classmethod
2024-01-14 14:04:37 +00:00
def create_completion ( cls , prompt : str , cookies : Dict [ str , str ] = None , proxy : str = None ) - > Generator [ str , None , None ] :
2024-01-14 06:45:41 +00:00
"""
Generator for creating imagecompletion based on a prompt .
Args :
prompt ( str ) : Prompt to generate images .
cookies ( Dict [ str , str ] , optional ) : Cookies for the session . If None , cookies are retrieved automatically .
proxy ( str , optional ) : Proxy configuration .
Yields :
Generator [ str , None , None ] : The final output as markdown formatted string with images .
"""
loop = get_event_loop ( )
cookies = cookies or cls . _cookies or get_cookies ( " .bing.com " )
if " _U " not in cookies :
login_url = os . environ . get ( " G4F_LOGIN_URL " )
if login_url :
yield f " Please login: [Bing]( { login_url } ) \n \n "
cls . _cookies = cookies = get_cookies_from_browser ( proxy )
yield loop . run_until_complete ( create_images_markdown ( cookies , prompt , proxy ) )
@classmethod
async def create_async ( cls , prompt : str , cookies : Dict [ str , str ] = None , proxy : str = None ) - > str :
"""
Asynchronously creates a markdown formatted string with images based on the prompt .
Args :
prompt ( str ) : Prompt to generate images .
cookies ( Dict [ str , str ] , optional ) : Cookies for the session . If None , cookies are retrieved automatically .
proxy ( str , optional ) : Proxy configuration .
Returns :
str : Markdown formatted string with images .
"""
cookies = cookies or cls . _cookies or get_cookies ( " .bing.com " )
if " _U " not in cookies :
cls . _cookies = cookies = get_cookies_from_browser ( proxy )
return await create_images_markdown ( cookies , prompt , proxy )
2024-01-10 09:34:56 +00:00
def patch_provider ( provider : ProviderType ) - > CreateImagesProvider :
2024-01-14 06:45:41 +00:00
"""
Patches a provider to include image creation capabilities .
Args :
provider ( ProviderType ) : The provider to be patched .
Returns :
CreateImagesProvider : The patched provider with image creation capabilities .
"""
return CreateImagesProvider ( provider , CreateImagesBing . create_completion , CreateImagesBing . create_async )