forked from Archives/langchain
You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
180 lines
5.9 KiB
Python
180 lines
5.9 KiB
Python
"""Web base loader class."""
|
|
import asyncio
|
|
import logging
|
|
from typing import Any, List, Optional, Union
|
|
|
|
import aiohttp
|
|
import requests
|
|
|
|
from langchain.docstore.document import Document
|
|
from langchain.document_loaders.base import BaseLoader
|
|
|
|
logger = logging.getLogger(__file__)
|
|
|
|
default_header_template = {
|
|
"User-Agent": "",
|
|
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*"
|
|
";q=0.8",
|
|
"Accept-Language": "en-US,en;q=0.5",
|
|
"Referer": "https://www.google.com/",
|
|
"DNT": "1",
|
|
"Connection": "keep-alive",
|
|
"Upgrade-Insecure-Requests": "1",
|
|
}
|
|
|
|
|
|
def _build_metadata(soup: Any, url: str) -> dict:
|
|
"""Build metadata from BeautifulSoup output."""
|
|
metadata = {"source": url}
|
|
if title := soup.find("title"):
|
|
metadata["title"] = title.get_text()
|
|
if description := soup.find("meta", attrs={"name": "description"}):
|
|
metadata["description"] = description.get("content", None)
|
|
if html := soup.find("html"):
|
|
metadata["language"] = html.get("lang", None)
|
|
return metadata
|
|
|
|
|
|
class WebBaseLoader(BaseLoader):
|
|
"""Loader that uses urllib and beautiful soup to load webpages."""
|
|
|
|
web_paths: List[str]
|
|
|
|
requests_per_second: int = 2
|
|
"""Max number of concurrent requests to make."""
|
|
|
|
default_parser: str = "html.parser"
|
|
"""Default parser to use for BeautifulSoup."""
|
|
|
|
def __init__(
|
|
self, web_path: Union[str, List[str]], header_template: Optional[dict] = None
|
|
):
|
|
"""Initialize with webpage path."""
|
|
|
|
# TODO: Deprecate web_path in favor of web_paths, and remove this
|
|
# left like this because there are a number of loaders that expect single
|
|
# urls
|
|
if isinstance(web_path, str):
|
|
self.web_paths = [web_path]
|
|
elif isinstance(web_path, List):
|
|
self.web_paths = web_path
|
|
|
|
self.session = requests.Session()
|
|
try:
|
|
import bs4 # noqa:F401
|
|
except ImportError:
|
|
raise ValueError(
|
|
"bs4 package not found, please install it with " "`pip install bs4`"
|
|
)
|
|
|
|
try:
|
|
from fake_useragent import UserAgent
|
|
|
|
headers = header_template or default_header_template
|
|
headers["User-Agent"] = UserAgent().random
|
|
self.session.headers = dict(headers)
|
|
except ImportError:
|
|
logger.info(
|
|
"fake_useragent not found, using default user agent."
|
|
"To get a realistic header for requests, `pip install fake_useragent`."
|
|
)
|
|
|
|
@property
|
|
def web_path(self) -> str:
|
|
if len(self.web_paths) > 1:
|
|
raise ValueError("Multiple webpaths found.")
|
|
return self.web_paths[0]
|
|
|
|
async def _fetch(self, url: str) -> str:
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(url, headers=self.session.headers) as response:
|
|
return await response.text()
|
|
|
|
async def _fetch_with_rate_limit(
|
|
self, url: str, semaphore: asyncio.Semaphore
|
|
) -> str:
|
|
async with semaphore:
|
|
return await self._fetch(url)
|
|
|
|
async def fetch_all(self, urls: List[str]) -> Any:
|
|
"""Fetch all urls concurrently with rate limiting."""
|
|
semaphore = asyncio.Semaphore(self.requests_per_second)
|
|
tasks = []
|
|
for url in urls:
|
|
task = asyncio.ensure_future(self._fetch_with_rate_limit(url, semaphore))
|
|
tasks.append(task)
|
|
return await asyncio.gather(*tasks)
|
|
|
|
@staticmethod
|
|
def _check_parser(parser: str) -> None:
|
|
"""Check that parser is valid for bs4."""
|
|
valid_parsers = ["html.parser", "lxml", "xml", "lxml-xml", "html5lib"]
|
|
if parser not in valid_parsers:
|
|
raise ValueError(
|
|
"`parser` must be one of " + ", ".join(valid_parsers) + "."
|
|
)
|
|
|
|
def scrape_all(self, urls: List[str], parser: Union[str, None] = None) -> List[Any]:
|
|
"""Fetch all urls, then return soups for all results."""
|
|
from bs4 import BeautifulSoup
|
|
|
|
results = asyncio.run(self.fetch_all(urls))
|
|
final_results = []
|
|
for i, result in enumerate(results):
|
|
url = urls[i]
|
|
if parser is None:
|
|
if url.endswith(".xml"):
|
|
parser = "xml"
|
|
else:
|
|
parser = self.default_parser
|
|
self._check_parser(parser)
|
|
final_results.append(BeautifulSoup(result, parser))
|
|
|
|
return final_results
|
|
|
|
def _scrape(self, url: str, parser: Union[str, None] = None) -> Any:
|
|
from bs4 import BeautifulSoup
|
|
|
|
if parser is None:
|
|
if url.endswith(".xml"):
|
|
parser = "xml"
|
|
else:
|
|
parser = self.default_parser
|
|
|
|
self._check_parser(parser)
|
|
|
|
html_doc = self.session.get(url)
|
|
return BeautifulSoup(html_doc.text, parser)
|
|
|
|
def scrape(self, parser: Union[str, None] = None) -> Any:
|
|
"""Scrape data from webpage and return it in BeautifulSoup format."""
|
|
|
|
if parser is None:
|
|
parser = self.default_parser
|
|
|
|
return self._scrape(self.web_path, parser)
|
|
|
|
def load(self) -> List[Document]:
|
|
"""Load text from the url(s) in web_path."""
|
|
docs = []
|
|
for path in self.web_paths:
|
|
soup = self._scrape(path)
|
|
text = soup.get_text()
|
|
metadata = _build_metadata(soup, path)
|
|
docs.append(Document(page_content=text, metadata=metadata))
|
|
|
|
return docs
|
|
|
|
def aload(self) -> List[Document]:
|
|
"""Load text from the urls in web_path async into Documents."""
|
|
|
|
results = self.scrape_all(self.web_paths)
|
|
docs = []
|
|
for i in range(len(results)):
|
|
soup = results[i]
|
|
text = soup.get_text()
|
|
metadata = _build_metadata(soup, self.web_paths[i])
|
|
docs.append(Document(page_content=text, metadata=metadata))
|
|
|
|
return docs
|