Add websearch to gui (#1314)

* Add websearch to gui
* Fix version_check config
* Add version badge in README.md
* Show version in gui
* Add docker hub build
* Fix gui backend, improve style
pull/1320/head
H Lohaus 10 months ago committed by GitHub
parent 5862d55abf
commit 484b96d850
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -9,21 +9,30 @@ jobs:
publish: publish:
runs-on: ubuntu-latest runs-on: ubuntu-latest
steps: steps:
- name: Setup Buildx
uses: docker/setup-buildx-action@v3
- name: Checkout repository - name: Checkout repository
uses: actions/checkout@v4 uses: actions/checkout@v4
- name: Get metadata for Docker - name: Get metadata for Docker
id: metadata id: metadata
uses: docker/metadata-action@v5 uses: docker/metadata-action@v5
with: with:
images: ghcr.io/${{ github.repository }} images: |
hlohaus789/g4f
ghcr.io/${{ github.repository }}
- name: Log in to Docker Hub
uses: docker/login-action@f4ef78c080cd8ba55a85445d5b36e214a81df20a
with:
username: ${{ secrets.DOCKER_USERNAME }}
password: ${{ secrets.DOCKER_PASSWORD }}
- name: Login to GitHub Container Registry - name: Login to GitHub Container Registry
uses: docker/login-action@v3 uses: docker/login-action@v3
with: with:
registry: ghcr.io registry: ghcr.io
username: ${{ github.repository_owner }} username: ${{ github.repository_owner }}
password: ${{ secrets.GHCR_PAT }} password: ${{ secrets.GHCR_PAT }}
- name: Build and push image - name: Build and push image
uses: docker/build-push-action@v5 uses: docker/build-push-action@v5
with: with:
@ -32,3 +41,5 @@ jobs:
push: true push: true
tags: ${{ steps.metadata.outputs.tags }} tags: ${{ steps.metadata.outputs.tags }}
labels: ${{ steps.metadata.outputs.labels }} labels: ${{ steps.metadata.outputs.labels }}
build-args: |
G4F_VERSION=${{ github.ref_name }}

@ -1,4 +1,4 @@
![g4f](g4f.png) ![248433934-7886223b-c1d1-4260-82aa-da5741f303bb](https://github.com/xtekky/gpt4free/assets/98614666/ea012c87-76e0-496a-8ac4-e2de090cc6c9)
<a href='https://ko-fi.com/xtekky' target='_blank'><img height='35' style='border:0px;height:46px;' src='https://az743702.vo.msecnd.net/cdn/kofi3.png?v=0' border='0' alt='Buy Me a Coffee at ko-fi.com' /> <a href='https://ko-fi.com/xtekky' target='_blank'><img height='35' style='border:0px;height:46px;' src='https://az743702.vo.msecnd.net/cdn/kofi3.png?v=0' border='0' alt='Buy Me a Coffee at ko-fi.com' />
<div id="top"></div> <div id="top"></div>
@ -6,7 +6,8 @@
> By using this repository or any code related to it, you agree to the [legal notice](LEGAL_NOTICE.md). The author is not responsible for any copies, forks, re-uploads made by other users, or anything else related to GPT4Free. This is the author's only account and repository. To prevent impersonation or irresponsible actions, please comply with the GNU GPL license this Repository uses. > By using this repository or any code related to it, you agree to the [legal notice](LEGAL_NOTICE.md). The author is not responsible for any copies, forks, re-uploads made by other users, or anything else related to GPT4Free. This is the author's only account and repository. To prevent impersonation or irresponsible actions, please comply with the GNU GPL license this Repository uses.
> [!Note] > [!Note]
> Latest pypi version: [`0.1.9.2`](https://pypi.org/project/g4f/0.1.9.2) Lastet version:
>> [![PyPI version](https://badge.fury.io/py/g4f.svg)](https://pypi.org/project/g4f)
```sh ```sh
pip install -U g4f pip install -U g4f
``` ```
@ -165,8 +166,7 @@ docker-compose down
import g4f import g4f
g4f.debug.logging = True # Enable logging g4f.debug.logging = True # Enable logging
g4f.check_version = False # Disable automatic version checking g4f.debug.check_version = False # Disable automatic version checking
print(g4f.version) # Check version
print(g4f.Provider.Ails.params) # Supported args print(g4f.Provider.Ails.params) # Supported args
# Automatic selection of provider # Automatic selection of provider

@ -2,13 +2,13 @@ version: '3'
services: services:
gpt4free: gpt4free:
image: ghcr.io/xtekky/gpt4free:latest image: hlohaus789/g4f:latest
shm_size: 2gb shm_size: 2gb
build: build:
context: . context: .
dockerfile: docker/Dockerfile dockerfile: docker/Dockerfile
volumes: # volumes:
- .:/app # - .:/app
ports: ports:
- '8080:80' - '8080:80'
- '1337:1337' - '1337:1337'

@ -3,6 +3,8 @@ FROM selenium/node-chrome
ENV SE_SCREEN_WIDTH 1850 ENV SE_SCREEN_WIDTH 1850
ENV G4F_LOGIN_URL http://localhost:7900/?autoconnect=1&resize=scale&password=secret ENV G4F_LOGIN_URL http://localhost:7900/?autoconnect=1&resize=scale&password=secret
ENV PYTHONUNBUFFERED 1 ENV PYTHONUNBUFFERED 1
ARG G4F_VERSION
ENV G4F_VERSION ${G4F_VERSION}
USER root USER root
@ -22,7 +24,7 @@ RUN rm -rf /var/lib/apt/lists/* /var/cache/apt/* \
COPY docker/supervisor.conf /etc/supervisor/conf.d/selenium.conf COPY docker/supervisor.conf /etc/supervisor/conf.d/selenium.conf
# Change background image # Change background image
COPY g4f.png /usr/share/images/fluxbox/ubuntu-light.png COPY docker/background.png /usr/share/images/fluxbox/ubuntu-light.png
# Switch user # Switch user
USER 1200 USER 1200

Before

Width:  |  Height:  |  Size: 152 KiB

After

Width:  |  Height:  |  Size: 152 KiB

@ -49,7 +49,7 @@ stderr_logfile_backups=5
stdout_capture_maxbytes=50MB stdout_capture_maxbytes=50MB
stderr_capture_maxbytes=50MB stderr_capture_maxbytes=50MB
[program:g4f-cli] [program:g4f-api]
priority=15 priority=15
command=python -m g4f.cli api command=python -m g4f.cli api
directory=/app directory=/app

@ -5,7 +5,7 @@ import time, json
from ..typing import CreateResult, Messages from ..typing import CreateResult, Messages
from .base_provider import BaseProvider from .base_provider import BaseProvider
from .helper import format_prompt from .helper import format_prompt
from ..webdriver import WebDriver, WebDriverSession from ..webdriver import WebDriver, WebDriverSession, bypass_cloudflare
class MyShell(BaseProvider): class MyShell(BaseProvider):
url = "https://app.myshell.ai/chat" url = "https://app.myshell.ai/chat"
@ -25,16 +25,8 @@ class MyShell(BaseProvider):
**kwargs **kwargs
) -> CreateResult: ) -> CreateResult:
with WebDriverSession(webdriver, "", proxy=proxy) as driver: with WebDriverSession(webdriver, "", proxy=proxy) as driver:
from selenium.webdriver.common.by import By bypass_cloudflare(driver, cls.url, timeout)
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
driver.get(cls.url)
# Wait for page load and cloudflare validation
WebDriverWait(driver, timeout).until(
EC.presence_of_element_located((By.CSS_SELECTOR, "body:not(.no-js)"))
)
# Send request with message # Send request with message
data = { data = {
"botId": "4738", "botId": "4738",
@ -58,11 +50,11 @@ window._reader = response.body.pipeThrough(new TextDecoderStream()).getReader();
driver.execute_script(script.replace("{body}", json.dumps(data))) driver.execute_script(script.replace("{body}", json.dumps(data)))
script = """ script = """
chunk = await window._reader.read(); chunk = await window._reader.read();
if (chunk['done']) { if (chunk.done) {
return null; return null;
} }
content = ''; content = '';
chunk['value'].split('\\n').forEach((line, index) => { chunk.value.split('\\n').forEach((line, index) => {
if (line.startsWith('data: ')) { if (line.startsWith('data: ')) {
try { try {
const data = JSON.parse(line.substring('data: '.length)); const data = JSON.parse(line.substring('data: '.length));

@ -0,0 +1,66 @@
from __future__ import annotations
from urllib.parse import unquote
from ...typing import AsyncResult, Messages
from ..base_provider import BaseProvider
from ...webdriver import WebDriver
from ...requests import Session, get_session_from_browser
class AiChatting(BaseProvider):
url = "https://www.aichatting.net"
supports_gpt_35_turbo = True
_session: Session = None
@classmethod
def create_completion(
cls,
model: str,
messages: Messages,
stream: bool,
proxy: str = None,
timeout: int = 120,
webdriver: WebDriver = None,
**kwargs
) -> AsyncResult:
if not cls._session:
cls._session = get_session_from_browser(cls.url, webdriver, proxy, timeout)
visitorId = unquote(cls._session.cookies.get("aichatting.website.visitorId"))
headers = {
"accept": "application/json, text/plain, */*",
"lang": "en",
"source": "web"
}
data = {
"roleId": 0,
}
try:
response = cls._session.post("https://aga-api.aichatting.net/aigc/chat/record/conversation/create", json=data, headers=headers)
response.raise_for_status()
conversation_id = response.json()["data"]["conversationId"]
except Exception as e:
cls.reset()
raise e
headers = {
"authority": "aga-api.aichatting.net",
"accept": "text/event-stream,application/json, text/event-stream",
"lang": "en",
"source": "web",
"vtoken": visitorId,
}
data = {
"spaceHandle": True,
"roleId": 0,
"messages": messages,
"conversationId": conversation_id,
}
response = cls._session.post("https://aga-api.aichatting.net/aigc/chat/v2/stream", json=data, headers=headers, stream=True)
response.raise_for_status()
for chunk in response.iter_lines():
if chunk.startswith(b"data:"):
yield chunk[5:].decode().replace("-=- --", " ").replace("-=-n--", "\n").replace("--@DONE@--", "")
@classmethod
def reset(cls):
cls._session = None

@ -1,3 +1,4 @@
from .MikuChat import MikuChat from .MikuChat import MikuChat
from .Komo import Komo from .Komo import Komo
from .ChatAiGpt import ChatAiGpt from .ChatAiGpt import ChatAiGpt
from .AiChatting import AiChatting

@ -1,25 +1,44 @@
from __future__ import annotations from __future__ import annotations
from requests import get
from .models import Model, ModelUtils, _all_models
from .Provider import BaseProvider, AsyncGeneratorProvider, RetryProvider
from .typing import Messages, CreateResult, AsyncResult, Union, List
from . import debug
version = '0.1.9.2' import os
version_check = True from requests import get
from importlib.metadata import version as get_package_version, PackageNotFoundError
from subprocess import check_output, CalledProcessError, PIPE
def check_pypi_version() -> None: from .models import Model, ModelUtils, _all_models
try: from .Provider import BaseProvider, AsyncGeneratorProvider, RetryProvider
response = get("https://pypi.org/pypi/g4f/json").json() from .typing import Messages, CreateResult, AsyncResult, Union, List
latest_version = response["info"]["version"] from . import debug
if version != latest_version: def get_version() -> str:
print(f'New pypi version: {latest_version} (current: {version}) | pip install -U g4f') # Read from package manager
return False try:
return True return get_package_version("g4f")
except PackageNotFoundError:
pass
# Read from docker environment
current_version = os.environ.get("G4F_VERSION")
if current_version:
return current_version
# Read from git repository
try:
command = ["git", "describe", "--tags", "--abbrev=0"]
return check_output(command, text=True, stderr=PIPE).strip()
except CalledProcessError:
pass
def get_lastet_version() -> str:
response = get("https://pypi.org/pypi/g4f/json").json()
return response["info"]["version"]
def check_pypi_version() -> None:
try:
version = get_version()
latest_version = get_lastet_version()
except Exception as e: except Exception as e:
print(f'Failed to check g4f pypi version: {e}') print(f'Failed to check g4f pypi version: {e}')
if version != latest_version:
print(f'New pypi version: {latest_version} (current: {version}) | pip install -U g4f')
def get_model_and_provider(model : Union[Model, str], def get_model_and_provider(model : Union[Model, str],
provider : Union[type[BaseProvider], None], provider : Union[type[BaseProvider], None],
@ -27,6 +46,9 @@ def get_model_and_provider(model : Union[Model, str],
ignored : List[str] = None, ignored : List[str] = None,
ignore_working: bool = False, ignore_working: bool = False,
ignore_stream: bool = False) -> tuple[Model, type[BaseProvider]]: ignore_stream: bool = False) -> tuple[Model, type[BaseProvider]]:
if debug.version_check:
check_pypi_version()
debug.version_check = False
if isinstance(model, str): if isinstance(model, str):
if model in ModelUtils.convert: if model in ModelUtils.convert:
@ -118,7 +140,4 @@ class Completion:
result = provider.create_completion(model.name, [{"role": "user", "content": prompt}], stream, **kwargs) result = provider.create_completion(model.name, [{"role": "user", "content": prompt}], stream, **kwargs)
return result if stream else ''.join(result) return result if stream else ''.join(result)
if version_check:
check_pypi_version()

@ -1 +1,2 @@
logging = False logging = False
version_check = True

@ -211,6 +211,9 @@ body {
.convo-title { .convo-title {
color: var(--colour-3); color: var(--colour-3);
font-size: 14px; font-size: 14px;
text-overflow: ellipsis;
overflow: hidden;
white-space: nowrap;
} }
.message { .message {

@ -88,6 +88,10 @@
<span class="convo-title">github ~ <a href="https://github.com/xtekky/gpt4free">@gpt4free</a> <span class="convo-title">github ~ <a href="https://github.com/xtekky/gpt4free">@gpt4free</a>
</span> </span>
</div> </div>
<div class="info">
<i class="fa-solid fa-star"></i>
<span id="version_text" class="convo-title"></span>
</div>
</div> </div>
</div> </div>
<div class="conversation"> <div class="conversation">

@ -628,4 +628,19 @@ observer.observe(message_input, { attributes: true });
option.value = option.text = provider; option.value = option.text = provider;
select.appendChild(option); select.appendChild(option);
} }
})();
(async () => {
response = await fetch('/backend-api/v2/version')
versions = await response.json()
document.title = 'g4f - gui - ' + versions["version"];
text = "version ~ "
if (versions["version"] != versions["lastet_version"]) {
release_url = 'https://github.com/xtekky/gpt4free/releases/tag/' + versions["lastet_version"];
text += '<a href="' + release_url +'" target="_blank" title="New version: ' + versions["lastet_version"] +'">' + versions["version"] + ' 🆕</a>';
} else {
text += versions["version"];
}
document.getElementById("version_text").innerHTML = text
})(); })();

@ -1,8 +1,7 @@
import g4f import g4f
from flask import request from flask import request
from .internet import search from .internet import get_search_message
from .config import special_instructions
g4f.debug.logging = True g4f.debug.logging = True
@ -18,6 +17,10 @@ class Backend_Api:
'function': self.providers, 'function': self.providers,
'methods' : ['GET'] 'methods' : ['GET']
}, },
'/backend-api/v2/version': {
'function': self.version,
'methods' : ['GET']
},
'/backend-api/v2/conversation': { '/backend-api/v2/conversation': {
'function': self._conversation, 'function': self._conversation,
'methods': ['POST'] 'methods': ['POST']
@ -45,6 +48,12 @@ class Backend_Api:
provider.__name__ for provider in g4f.Provider.__providers__ provider.__name__ for provider in g4f.Provider.__providers__
if provider.working and provider is not g4f.Provider.RetryProvider if provider.working and provider is not g4f.Provider.RetryProvider
] ]
def version(self):
return {
"version": g4f.get_version(),
"lastet_version": g4f.get_lastet_version(),
}
def _gen_title(self): def _gen_title(self):
return { return {
@ -53,14 +62,15 @@ class Backend_Api:
def _conversation(self): def _conversation(self):
try: try:
#jailbreak = request.json['jailbreak'] #jailbreak = request.json['jailbreak']
#internet_access = request.json['meta']['content']['internet_access'] web_search = request.json['meta']['content']['internet_access']
#conversation = request.json['meta']['content']['conversation']
messages = request.json['meta']['content']['parts'] messages = request.json['meta']['content']['parts']
if web_search:
messages[-1]["content"] = get_search_message(messages[-1]["content"])
model = request.json.get('model') model = request.json.get('model')
model = model if model else g4f.models.default model = model if model else g4f.models.default
provider = request.json.get('provider', 'Auto').replace('g4f.Provider.', '') provider = request.json.get('provider').replace('g4f.Provider.', '')
provider = provider if provider != "Auto" else None provider = provider if provider and provider != "Auto" else None
if provider != None: if provider != None:
provider = g4f.Provider.ProviderUtils.convert.get(provider) provider = g4f.Provider.ProviderUtils.convert.get(provider)

@ -1,58 +1,149 @@
from __future__ import annotations from __future__ import annotations
from datetime import datetime from bs4 import BeautifulSoup
from aiohttp import ClientSession, ClientTimeout
from duckduckgo_search import DDGS from duckduckgo_search import DDGS
import asyncio
ddgs = DDGS(timeout=20)
class SearchResults():
def __init__(self, results: list):
def search(internet_access, prompt): self.results = results
print(prompt)
def __iter__(self):
yield from self.results
def __str__(self):
search = ""
for idx, result in enumerate(self.results):
if search:
search += "\n\n\n"
search += f"Title: {result.title}\n\n"
if result.text:
search += result.text
else:
search += result.snippet
search += f"\n\nSource: [[{idx}]]({result.url})"
return search
class SearchResultEntry():
def __init__(self, title: str, url: str, snippet: str, text: str = None):
self.title = title
self.url = url
self.snippet = snippet
self.text = text
def set_text(self, text: str):
self.text = text
def scrape_text(html: str, max_words: int = None) -> str:
soup = BeautifulSoup(html, "html.parser")
for exclude in soup(["script", "style"]):
exclude.extract()
for selector in [
"main",
".main-content-wrapper",
".main-content",
".emt-container-inner",
".content-wrapper",
"#content",
"#mainContent",
]:
select = soup.select_one(selector)
if select:
soup = select
break
# Zdnet
for remove in [".c-globalDisclosure"]:
select = soup.select_one(remove)
if select:
select.extract()
clean_text = ""
for paragraph in soup.select("p"):
text = paragraph.get_text()
for line in text.splitlines():
words = []
for word in line.replace("\t", " ").split(" "):
if word:
words.append(word)
count = len(words)
if not count:
continue
if max_words:
max_words -= count
if max_words <= 0:
break
if clean_text:
clean_text += "\n"
clean_text += " ".join(words)
return clean_text
async def fetch_and_scrape(session: ClientSession, url: str, max_words: int = None) -> str:
try: try:
if not internet_access: async with session.get(url) as response:
return [] if response.status == 200:
html = await response.text()
results = duckduckgo_search(q=prompt) return scrape_text(html, max_words)
except:
if not search: return
return []
async def search(query: str, n_results: int = 5, max_words: int = 2500, add_text: bool = True) -> SearchResults:
with DDGS() as ddgs:
results = []
for result in ddgs.text(
query,
region="wt-wt",
safesearch="moderate",
timelimit="y",
):
results.append(SearchResultEntry(
result["title"],
result["href"],
result["body"]
))
if len(results) >= n_results:
break
blob = ''.join( if add_text:
f'[{index}] "{result["body"]}"\nURL:{result["href"]}\n\n' requests = []
for index, result in enumerate(results) async with ClientSession(timeout=ClientTimeout(5)) as session:
) for entry in results:
date = datetime.now().strftime('%d/%m/%y') requests.append(fetch_and_scrape(session, entry.url, int(max_words / (n_results - 1))))
texts = await asyncio.gather(*requests)
formatted_results = []
left_words = max_words;
for i, entry in enumerate(results):
if add_text:
entry.text = texts[i]
if left_words:
left_words -= entry.title.count(" ") + 5
if entry.text:
left_words -= entry.text.count(" ")
else:
left_words -= entry.snippet.count(" ")
if 0 > left_words:
break
formatted_results.append(entry)
return SearchResults(formatted_results)
def get_search_message(prompt) -> str:
try:
search_results = asyncio.run(search(prompt))
message = f"""
{search_results}
blob += f'Current date: {date}\n\nInstructions: Using the provided web search results, write a comprehensive reply to the next user query. Make sure to cite results using [[number](URL)] notation after the reference. If the provided search results refer to multiple subjects with the same name, write separate answers for each subject. Ignore your previous response if any.'
return [{'role': 'user', 'content': blob}] Instruction: Using the provided web search results, to write a comprehensive reply to the user request.
Make sure to add the sources of cites using [[Number]](Url) notation after the reference. Example: [[0]](http://google.com)
If the provided search results refer to multiple subjects with the same name, write separate answers for each subject.
User request:
{prompt}
"""
return message
except Exception as e: except Exception as e:
print("Couldn't search DuckDuckGo:", e) print("Couldn't search DuckDuckGo:", e)
print(e.__traceback__.tb_next) return prompt
return []
def duckduckgo_search(q: str, max_results: int = 3, safesearch: str = "moderate", region: str = "us-en") -> list | None:
if region is None:
region = "us-en"
if safesearch is None:
safesearch = "moderate"
if q is None:
return None
results = []
try:
for r in ddgs.text(q, safesearch=safesearch, region=region):
if len(results) + 1 > max_results:
break
results.append(r)
except Exception as e:
print(e)
return results

@ -6,10 +6,7 @@ from functools import partialmethod
from typing import AsyncGenerator from typing import AsyncGenerator
from urllib.parse import urlparse from urllib.parse import urlparse
from curl_cffi.requests import AsyncSession, Session, Response from curl_cffi.requests import AsyncSession, Session, Response
from .webdriver import WebDriver, WebDriverSession from .webdriver import WebDriver, WebDriverSession, bypass_cloudflare
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
class StreamResponse: class StreamResponse:
def __init__(self, inner: Response) -> None: def __init__(self, inner: Response) -> None:
@ -58,28 +55,7 @@ class StreamSession(AsyncSession):
def get_session_from_browser(url: str, webdriver: WebDriver = None, proxy: str = None, timeout: int = 120): def get_session_from_browser(url: str, webdriver: WebDriver = None, proxy: str = None, timeout: int = 120):
with WebDriverSession(webdriver, "", proxy=proxy, virtual_display=True) as driver: with WebDriverSession(webdriver, "", proxy=proxy, virtual_display=True) as driver:
driver.get(url) bypass_cloudflare(driver, url, timeout)
# Is cloudflare protection
if driver.find_element(By.TAG_NAME, "body").get_attribute("class") == "no-js":
try:
# Click button in iframe
WebDriverWait(driver, 5).until(
EC.presence_of_element_located((By.CSS_SELECTOR, "#turnstile-wrapper iframe"))
)
driver.switch_to.frame(driver.find_element(By.CSS_SELECTOR, "#turnstile-wrapper iframe"))
WebDriverWait(driver, 5).until(
EC.presence_of_element_located((By.CSS_SELECTOR, "#challenge-stage input"))
)
driver.find_element(By.CSS_SELECTOR, "#challenge-stage input").click()
except:
pass
finally:
driver.switch_to.default_content()
# No cloudflare protection
WebDriverWait(driver, timeout).until(
EC.presence_of_element_located((By.CSS_SELECTOR, "body:not(.no-js)"))
)
cookies = dict([(cookie["name"], cookie["value"]) for cookie in driver.get_cookies()]) cookies = dict([(cookie["name"], cookie["value"]) for cookie in driver.get_cookies()])
user_agent = driver.execute_script("return navigator.userAgent") user_agent = driver.execute_script("return navigator.userAgent")

@ -1,10 +1,12 @@
from __future__ import annotations from __future__ import annotations
import time
from platformdirs import user_config_dir from platformdirs import user_config_dir
from selenium.webdriver.remote.webdriver import WebDriver from selenium.webdriver.remote.webdriver import WebDriver
from undetected_chromedriver import Chrome, ChromeOptions from undetected_chromedriver import Chrome, ChromeOptions
import os.path from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from os import path
from . import debug from . import debug
try: try:
@ -21,16 +23,47 @@ def get_browser(
) -> WebDriver: ) -> WebDriver:
if user_data_dir == None: if user_data_dir == None:
user_data_dir = user_config_dir("g4f") user_data_dir = user_config_dir("g4f")
if debug.logging: if user_data_dir and debug.logging:
print("Open browser with config dir:", user_data_dir) print("Open browser with config dir:", user_data_dir)
if not options: if not options:
options = ChromeOptions() options = ChromeOptions()
if proxy: if proxy:
options.add_argument(f'--proxy-server={proxy}') options.add_argument(f'--proxy-server={proxy}')
driver = '/usr/bin/chromedriver' driver = '/usr/bin/chromedriver'
if not os.path.isfile(driver): if not path.isfile(driver):
driver = None driver = None
return Chrome(options=options, user_data_dir=user_data_dir, driver_executable_path=driver, headless=headless) return Chrome(
options=options,
user_data_dir=user_data_dir,
driver_executable_path=driver,
headless=headless
)
def bypass_cloudflare(driver: WebDriver, url: str, timeout: int) -> None:
# Open website
driver.get(url)
# Is cloudflare protection
if driver.find_element(By.TAG_NAME, "body").get_attribute("class") == "no-js":
if debug.logging:
print("Cloudflare protection detected:", url)
try:
# Click button in iframe
WebDriverWait(driver, 5).until(
EC.presence_of_element_located((By.CSS_SELECTOR, "#turnstile-wrapper iframe"))
)
driver.switch_to.frame(driver.find_element(By.CSS_SELECTOR, "#turnstile-wrapper iframe"))
WebDriverWait(driver, 5).until(
EC.presence_of_element_located((By.CSS_SELECTOR, "#challenge-stage input"))
)
driver.find_element(By.CSS_SELECTOR, "#challenge-stage input").click()
except:
pass
finally:
driver.switch_to.default_content()
# No cloudflare protection
WebDriverWait(driver, timeout).until(
EC.presence_of_element_located((By.CSS_SELECTOR, "body:not(.no-js)"))
)
class WebDriverSession(): class WebDriverSession():
def __init__( def __init__(
@ -47,7 +80,7 @@ class WebDriverSession():
self.headless = headless self.headless = headless
self.virtual_display = None self.virtual_display = None
if has_pyvirtualdisplay and virtual_display: if has_pyvirtualdisplay and virtual_display:
self.virtual_display = Display(size=(1920,1080)) self.virtual_display = Display(size=(1920, 1080))
self.proxy = proxy self.proxy = proxy
self.options = options self.options = options
self.default_driver = None self.default_driver = None
@ -82,7 +115,6 @@ class WebDriverSession():
self.default_driver.close() self.default_driver.close()
except: except:
pass pass
time.sleep(0.1)
self.default_driver.quit() self.default_driver.quit()
if self.virtual_display: if self.virtual_display:
self.virtual_display.stop() self.virtual_display.stop()

@ -25,4 +25,5 @@ asyncstdlib
async-property async-property
undetected-chromedriver undetected-chromedriver
asyncstdlib asyncstdlib
async_property async_property
bs4
Loading…
Cancel
Save