Async Recursive URL loader (#8502)

Description: This PR improves the function of recursive_url_loader, such
as limiting the depth of the access, and customizable extractors(from
the raw webpage to the text of the Document object), so that users can
use other tools to extract the webpage. This PR also includes the
document and test for the new loader.
Old PR closed due to project structure change. #7756

Because socket requests are not allowed, the old unit test was removed.
Issue: N/A
Dependencies: asyncio, aiohttp
Tag maintainer: @rlancemartin
Twitter handle: @ Zend_Nihility

---------

Co-authored-by: Lance Martin <lance@langchain.dev>
This commit is contained in:
Zend 2023-08-07 07:22:31 +08:00 committed by GitHub
parent 485d716c21
commit bd4865b6fe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 301 additions and 166 deletions

View File

@ -9,7 +9,7 @@
"\n",
"We may want to process load all URLs under a root directory.\n",
"\n",
"For example, let's look at the [LangChain JS documentation](https://js.langchain.com/docs/).\n",
"For example, let's look at the [Python 3.9 Document](https://docs.python.org/3.9/).\n",
"\n",
"This has many interesting child pages that we may want to read in bulk.\n",
"\n",
@ -19,13 +19,28 @@
" \n",
"We do this using the `RecursiveUrlLoader`.\n",
"\n",
"This also gives us the flexibility to exclude some children (e.g., the `api` directory with > 800 child pages)."
"This also gives us the flexibility to exclude some children, customize the extractor, and more."
]
},
{
"cell_type": "markdown",
"id": "1be8094f",
"metadata": {},
"source": [
"# Parameters\n",
"- url: str, the target url to crawl.\n",
"- exclude_dirs: Optional[str], webpage directories to exclude.\n",
"- use_async: Optional[bool], wether to use async requests, using async requests is usually faster in large tasks. However, async will disable the lazy loading feature(the function still works, but it is not lazy). By default, it is set to False.\n",
"- extractor: Optional[Callable[[str], str]], a function to extract the text of the document from the webpage, by default it returns the page as it is. It is recommended to use tools like goose3 and beautifulsoup to extract the text. By default, it just returns the page as it is.\n",
"- max_depth: Optional[int] = None, the maximum depth to crawl. By default, it is set to 2. If you need to crawl the whole website, set it to a number that is large enough would simply do the job.\n",
"- timeout: Optional[int] = None, the timeout for each request, in the unit of seconds. By default, it is set to 10.\n",
"- prevent_outside: Optional[bool] = None, whether to prevent crawling outside the root url. By default, it is set to True."
]
},
{
"cell_type": "code",
"execution_count": 1,
"id": "2e3532b2",
"execution_count": null,
"id": "23c18539",
"metadata": {},
"outputs": [],
"source": [
@ -42,13 +57,15 @@
},
{
"cell_type": "code",
"execution_count": 2,
"id": "d69e5620",
"execution_count": null,
"id": "55394afe",
"metadata": {},
"outputs": [],
"source": [
"url = \"https://js.langchain.com/docs/modules/memory/examples/\"\n",
"loader = RecursiveUrlLoader(url=url)\n",
"from bs4 import BeautifulSoup as Soup\n",
"\n",
"url = \"https://docs.python.org/3.9/\"\n",
"loader = RecursiveUrlLoader(url=url, max_depth=2, extractor=lambda x: Soup(x, \"html.parser\").text)\n",
"docs = loader.load()"
]
},
@ -61,7 +78,7 @@
{
"data": {
"text/plain": [
"12"
"'\\n\\n\\n\\n\\nPython Frequently Asked Questions — Python 3.'"
]
},
"execution_count": 3,
@ -70,19 +87,21 @@
}
],
"source": [
"len(docs)"
"docs[0].page_content[:50]"
]
},
{
"cell_type": "code",
"execution_count": 4,
"id": "89355b7c",
"id": "13bd7e16",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"'\\n\\n\\n\\n\\nBuffer Window Memory | 🦜️🔗 Langchain\\n\\n\\n\\n\\n\\nSki'"
"{'source': 'https://docs.python.org/3.9/library/index.html',\n",
" 'title': 'The Python Standard Library — Python 3.9.17 documentation',\n",
" 'language': None}"
]
},
"execution_count": 4,
@ -91,137 +110,48 @@
}
],
"source": [
"docs[0].page_content[:50]"
]
},
{
"cell_type": "code",
"execution_count": 5,
"id": "13bd7e16",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"{'source': 'https://js.langchain.com/docs/modules/memory/examples/buffer_window_memory',\n",
" 'title': 'Buffer Window Memory | 🦜️🔗 Langchain',\n",
" 'description': 'BufferWindowMemory keeps track of the back-and-forths in conversation, and then uses a window of size k to surface the last k back-and-forths to use as memory.',\n",
" 'language': 'en'}"
]
},
"execution_count": 5,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"docs[0].metadata"
"docs[-1].metadata"
]
},
{
"cell_type": "markdown",
"id": "40fc13ef",
"id": "5866e5a6",
"metadata": {},
"source": [
"Now, let's try a more extensive example, the `docs` root dir.\n",
"\n",
"We will skip everything under `api`.\n",
"\n",
"For this, we can `lazy_load` each page as we crawl the tree, using `WebBaseLoader` to load each as we go."
"However, since it's hard to perform a perfect filter, you may still see some irrelevant results in the results. You can perform a filter on the returned documents by yourself, if it's needed. Most of the time, the returned results are good enough."
]
},
{
"cell_type": "markdown",
"id": "4ec8ecef",
"metadata": {},
"source": [
"Testing on LangChain docs."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "5c938b9f",
"execution_count": 2,
"id": "349b5598",
"metadata": {},
"outputs": [],
"source": [
"url = \"https://js.langchain.com/docs/\"\n",
"exclude_dirs = [\"https://js.langchain.com/docs/api/\"]\n",
"loader = RecursiveUrlLoader(url=url, exclude_dirs=exclude_dirs)\n",
"# Lazy load each\n",
"docs = [print(doc) or doc for doc in loader.lazy_load()]"
]
},
{
"cell_type": "code",
"execution_count": 7,
"id": "30ff61d3",
"metadata": {},
"outputs": [],
"source": [
"# Load all pages\n",
"docs = loader.load()"
]
},
{
"cell_type": "code",
"execution_count": 8,
"id": "457e30f3",
"metadata": {
"scrolled": true
},
"outputs": [
{
"data": {
"text/plain": [
"188"
"8"
]
},
"execution_count": 8,
"execution_count": 2,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"url = \"https://js.langchain.com/docs/modules/memory/integrations/\"\n",
"loader = RecursiveUrlLoader(url=url)\n",
"docs = loader.load()\n",
"len(docs)"
]
},
{
"cell_type": "code",
"execution_count": 9,
"id": "bca80b4a",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"'\\n\\n\\n\\n\\nAgent Simulations | 🦜️🔗 Langchain\\n\\n\\n\\n\\n\\nSkip t'"
]
},
"execution_count": 9,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"docs[0].page_content[:50]"
]
},
{
"cell_type": "code",
"execution_count": 10,
"id": "df97cf22",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"{'source': 'https://js.langchain.com/docs/use_cases/agent_simulations/',\n",
" 'title': 'Agent Simulations | 🦜️🔗 Langchain',\n",
" 'description': 'Agent simulations involve taking multiple agents and having them interact with each other.',\n",
" 'language': 'en'}"
]
},
"execution_count": 10,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"docs[0].metadata"
]
}
],
"metadata": {

View File

@ -1,4 +1,6 @@
from typing import Iterator, List, Optional, Set
import asyncio
import re
from typing import Callable, Iterator, List, Optional, Set, Union
from urllib.parse import urljoin, urlparse
import requests
@ -13,20 +15,117 @@ class RecursiveUrlLoader(BaseLoader):
def __init__(
self,
url: str,
max_depth: Optional[int] = None,
use_async: Optional[bool] = None,
extractor: Optional[Callable[[str], str]] = None,
exclude_dirs: Optional[str] = None,
timeout: Optional[int] = None,
prevent_outside: Optional[bool] = None,
) -> None:
"""Initialize with URL to crawl and any subdirectories to exclude.
Args:
url: The URL to crawl.
exclude_dirs: A list of subdirectories to exclude.
use_async: Whether to use asynchronous loading,
if use_async is true, this function will not be lazy,
but it will still work in the expected way, just not lazy.
extractor: A function to extract the text from the html,
when extract function returns empty string, the document will be ignored.
max_depth: The max depth of the recursive loading.
timeout: The timeout for the requests, in the unit of seconds.
"""
self.url = url
self.exclude_dirs = exclude_dirs
self.use_async = use_async if use_async is not None else False
self.extractor = extractor if extractor is not None else lambda x: x
self.max_depth = max_depth if max_depth is not None else 2
self.timeout = timeout if timeout is not None else 10
self.prevent_outside = prevent_outside if prevent_outside is not None else True
def get_child_links_recursive(
self, url: str, visited: Optional[Set[str]] = None
def _get_sub_links(self, raw_html: str, base_url: str) -> List[str]:
"""This function extracts all the links from the raw html,
and convert them into absolute paths.
Args:
raw_html (str): original html
base_url (str): the base url of the html
Returns:
List[str]: sub links
"""
# Get all links that are relative to the root of the website
all_links = re.findall(r"href=[\"\'](.*?)[\"\']", raw_html)
absolute_paths = []
invalid_prefixes = ("javascript:", "mailto:", "#")
invalid_suffixes = (
".css",
".js",
".ico",
".png",
".jpg",
".jpeg",
".gif",
".svg",
)
# Process the links
for link in all_links:
# Ignore blacklisted patterns
# like javascript: or mailto:, files of svg, ico, css, js
if link.startswith(invalid_prefixes) or link.endswith(invalid_suffixes):
continue
# Some may be absolute links like https://to/path
if link.startswith("http"):
if (not self.prevent_outside) or (
self.prevent_outside and link.startswith(base_url)
):
absolute_paths.append(link)
else:
absolute_paths.append(urljoin(base_url, link))
# Some may be relative links like /to/path
if link.startswith("/") and not link.startswith("//"):
absolute_paths.append(urljoin(base_url, link))
continue
# Some may have omitted the protocol like //to/path
if link.startswith("//"):
absolute_paths.append(f"{urlparse(base_url).scheme}:{link}")
continue
# Remove duplicates
# also do another filter to prevent outside links
absolute_paths = list(
set(
[
path
for path in absolute_paths
if not self.prevent_outside
or path.startswith(base_url)
and path != base_url
]
)
)
return absolute_paths
def _gen_metadata(self, raw_html: str, url: str) -> dict:
"""Build metadata from BeautifulSoup output."""
try:
from bs4 import BeautifulSoup
except ImportError:
print("The bs4 package is required for the RecursiveUrlLoader.")
print("Please install it with `pip install bs4`.")
metadata = {"source": url}
soup = BeautifulSoup(raw_html, "html.parser")
if title := soup.find("title"):
metadata["title"] = title.get_text()
if description := soup.find("meta", attrs={"name": "description"}):
metadata["description"] = description.get("content", None)
if html := soup.find("html"):
metadata["language"] = html.get("lang", None)
return metadata
def _get_child_links_recursive(
self, url: str, visited: Optional[Set[str]] = None, depth: int = 0
) -> Iterator[Document]:
"""Recursively get all child links starting with the path of the input URL.
@ -35,26 +134,12 @@ class RecursiveUrlLoader(BaseLoader):
visited: A set of visited URLs.
"""
from langchain.document_loaders import WebBaseLoader
try:
from bs4 import BeautifulSoup
except ImportError:
raise ImportError(
"The BeautifulSoup package is required for the RecursiveUrlLoader."
)
# Construct the base and parent URLs
parsed_url = urlparse(url)
base_url = f"{parsed_url.scheme}://{parsed_url.netloc}"
parent_url = "/".join(parsed_url.path.split("/")[:-1])
current_path = parsed_url.path
if depth > self.max_depth:
return []
# Add a trailing slash if not present
if not base_url.endswith("/"):
base_url += "/"
if not parent_url.endswith("/"):
parent_url += "/"
if not url.endswith("/"):
url += "/"
# Exclude the root and parent from a list
visited = set() if visited is None else visited
@ -63,42 +148,162 @@ class RecursiveUrlLoader(BaseLoader):
if self.exclude_dirs and any(
url.startswith(exclude_dir) for exclude_dir in self.exclude_dirs
):
return visited
return []
# Get all links that are relative to the root of the website
response = requests.get(url)
soup = BeautifulSoup(response.text, "html.parser")
all_links = [link.get("href") for link in soup.find_all("a")]
# Get all links that can be accessed from the current URL
try:
response = requests.get(url, timeout=self.timeout)
except Exception:
return []
# Extract only the links that are children of the current URL
child_links = list(
{
link
for link in all_links
if link and link.startswith(current_path) and link != current_path
}
)
# Get absolute path for all root relative links listed
absolute_paths = [urljoin(base_url, link) for link in child_links]
absolute_paths = self._get_sub_links(response.text, url)
# Store the visited links and recursively visit the children
for link in absolute_paths:
# Check all unvisited links
if link not in visited:
visited.add(link)
loaded_link = WebBaseLoader(link).load()
if isinstance(loaded_link, list):
yield from loaded_link
else:
yield loaded_link
yield from self.get_child_links_recursive(link, visited)
return visited
try:
response = requests.get(link)
text = response.text
except Exception:
# unreachable link, so just ignore it
continue
loaded_link = Document(
page_content=self.extractor(text),
metadata=self._gen_metadata(text, link),
)
yield loaded_link
# If the link is a directory (w/ children) then visit it
if link.endswith("/"):
yield from self._get_child_links_recursive(link, visited, depth + 1)
return []
async def _async_get_child_links_recursive(
self, url: str, visited: Optional[Set[str]] = None, depth: int = 0
) -> List[Document]:
"""Recursively get all child links starting with the path of the input URL.
Args:
url: The URL to crawl.
visited: A set of visited URLs.
depth: To reach the current url, how many pages have been visited.
"""
try:
import aiohttp
except ImportError:
print("The aiohttp package is required for the RecursiveUrlLoader.")
print("Please install it with `pip install aiohttp`.")
if depth > self.max_depth:
return []
# Add a trailing slash if not present
if not url.endswith("/"):
url += "/"
# Exclude the root and parent from a list
visited = set() if visited is None else visited
# Exclude the links that start with any of the excluded directories
if self.exclude_dirs and any(
url.startswith(exclude_dir) for exclude_dir in self.exclude_dirs
):
return []
# Disable SSL verification because websites may have invalid SSL certificates,
# but won't cause any security issues for us.
async with aiohttp.ClientSession(
connector=aiohttp.TCPConnector(ssl=False),
timeout=aiohttp.ClientTimeout(self.timeout),
) as session:
# Some url may be invalid, so catch the exception
response: aiohttp.ClientResponse
try:
response = await session.get(url)
text = await response.text()
except aiohttp.client_exceptions.InvalidURL:
return []
# There may be some other exceptions, so catch them,
# we don't want to stop the whole process
except Exception:
return []
absolute_paths = self._get_sub_links(text, url)
# Worker will be only called within the current function
# Worker function will process the link
# then recursively call get_child_links_recursive to process the children
async def worker(link: str) -> Union[Document, None]:
try:
async with aiohttp.ClientSession(
connector=aiohttp.TCPConnector(ssl=False),
timeout=aiohttp.ClientTimeout(self.timeout),
) as session:
response = await session.get(link)
text = await response.text()
extracted = self.extractor(text)
if len(extracted) > 0:
return Document(
page_content=extracted,
metadata=self._gen_metadata(text, link),
)
else:
return None
# Despite the fact that we have filtered some links,
# there may still be some invalid links, so catch the exception
except aiohttp.client_exceptions.InvalidURL:
return None
# There may be some other exceptions, so catch them,
# we don't want to stop the whole process
except Exception:
# print(e)
return None
# The coroutines that will be executed
tasks = []
# Generate the tasks
for link in absolute_paths:
# Check all unvisited links
if link not in visited:
visited.add(link)
tasks.append(worker(link))
# Get the not None results
results = list(
filter(lambda x: x is not None, await asyncio.gather(*tasks))
)
# Recursively call the function to get the children of the children
sub_tasks = []
for link in absolute_paths:
sub_tasks.append(
self._async_get_child_links_recursive(link, visited, depth + 1)
)
# sub_tasks returns coroutines of list,
# so we need to flatten the list await asyncio.gather(*sub_tasks)
flattened = []
next_results = await asyncio.gather(*sub_tasks)
for sub_result in next_results:
if isinstance(sub_result, Exception):
# We don't want to stop the whole process, so just ignore it
# Not standard html format or invalid url or 404 may cause this
# But we can't do anything about it.
continue
if sub_result is not None:
flattened += sub_result
results += flattened
return list(filter(lambda x: x is not None, results))
def lazy_load(self) -> Iterator[Document]:
"""Lazy load web pages."""
return self.get_child_links_recursive(self.url)
"""Lazy load web pages.
When use_async is True, this function will not be lazy,
but it will still work in the expected way, just not lazy."""
if self.use_async:
results = asyncio.run(self._async_get_child_links_recursive(self.url))
if results is None:
return iter([])
else:
return iter(results)
else:
return self._get_child_links_recursive(self.url)
def load(self) -> List[Document]:
"""Load web pages."""