You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
langchain/libs/text-splitters/langchain_text_splitters/html.py

319 lines
11 KiB
Python

from __future__ import annotations
import copy
import pathlib
from io import BytesIO, StringIO
from typing import Any, Dict, Iterable, List, Optional, Tuple, TypedDict, cast
import requests
from langchain_core.documents import Document
from langchain_text_splitters.character import RecursiveCharacterTextSplitter
class ElementType(TypedDict):
"""Element type as typed dict."""
url: str
xpath: str
content: str
metadata: Dict[str, str]
class HTMLHeaderTextSplitter:
"""
Splitting HTML files based on specified headers.
Requires lxml package.
"""
def __init__(
self,
headers_to_split_on: List[Tuple[str, str]],
return_each_element: bool = False,
):
"""Create a new HTMLHeaderTextSplitter.
Args:
headers_to_split_on: list of tuples of headers we want to track mapped to
(arbitrary) keys for metadata. Allowed header values: h1, h2, h3, h4,
h5, h6 e.g. [("h1", "Header 1"), ("h2", "Header 2)].
return_each_element: Return each element w/ associated headers.
"""
# Output element-by-element or aggregated into chunks w/ common headers
self.return_each_element = return_each_element
self.headers_to_split_on = sorted(headers_to_split_on)
def aggregate_elements_to_chunks(
self, elements: List[ElementType]
) -> List[Document]:
"""Combine elements with common metadata into chunks
Args:
elements: HTML element content with associated identifying info and metadata
"""
aggregated_chunks: List[ElementType] = []
for element in elements:
if (
aggregated_chunks
and aggregated_chunks[-1]["metadata"] == element["metadata"]
):
# If the last element in the aggregated list
# has the same metadata as the current element,
# append the current content to the last element's content
aggregated_chunks[-1]["content"] += " \n" + element["content"]
else:
# Otherwise, append the current element to the aggregated list
aggregated_chunks.append(element)
return [
Document(page_content=chunk["content"], metadata=chunk["metadata"])
for chunk in aggregated_chunks
]
def split_text_from_url(self, url: str) -> List[Document]:
"""Split HTML from web URL
Args:
url: web URL
"""
r = requests.get(url)
return self.split_text_from_file(BytesIO(r.content))
def split_text(self, text: str) -> List[Document]:
"""Split HTML text string
Args:
text: HTML text
"""
return self.split_text_from_file(StringIO(text))
def split_text_from_file(self, file: Any) -> List[Document]:
"""Split HTML file
Args:
file: HTML file
"""
try:
from lxml import etree
except ImportError as e:
raise ImportError(
"Unable to import lxml, please install with `pip install lxml`."
) from e
# use lxml library to parse html document and return xml ElementTree
# Explicitly encoding in utf-8 allows non-English
# html files to be processed without garbled characters
parser = etree.HTMLParser(encoding="utf-8")
tree = etree.parse(file, parser)
# document transformation for "structure-aware" chunking is handled with xsl.
# see comments in html_chunks_with_headers.xslt for more detailed information.
xslt_path = pathlib.Path(__file__).parent / "xsl/html_chunks_with_headers.xslt"
xslt_tree = etree.parse(xslt_path)
transform = etree.XSLT(xslt_tree)
result = transform(tree)
result_dom = etree.fromstring(str(result))
# create filter and mapping for header metadata
header_filter = [header[0] for header in self.headers_to_split_on]
header_mapping = dict(self.headers_to_split_on)
# map xhtml namespace prefix
ns_map = {"h": "http://www.w3.org/1999/xhtml"}
# build list of elements from DOM
elements = []
for element in result_dom.findall("*//*", ns_map):
if element.findall("*[@class='headers']") or element.findall(
"*[@class='chunk']"
):
elements.append(
ElementType(
url=file,
xpath="".join(
[
node.text or ""
for node in element.findall("*[@class='xpath']", ns_map)
]
),
content="".join(
[
node.text or ""
for node in element.findall("*[@class='chunk']", ns_map)
]
),
metadata={
# Add text of specified headers to metadata using header
# mapping.
header_mapping[node.tag]: node.text or ""
for node in filter(
lambda x: x.tag in header_filter,
element.findall("*[@class='headers']/*", ns_map),
)
},
)
)
if not self.return_each_element:
return self.aggregate_elements_to_chunks(elements)
else:
return [
Document(page_content=chunk["content"], metadata=chunk["metadata"])
for chunk in elements
]
class HTMLSectionSplitter:
"""
Splitting HTML files based on specified tag and font sizes.
Requires lxml package.
"""
def __init__(
self,
headers_to_split_on: List[Tuple[str, str]],
xslt_path: Optional[str] = None,
**kwargs: Any,
) -> None:
"""Create a new HTMLSectionSplitter.
Args:
headers_to_split_on: list of tuples of headers we want to track mapped to
(arbitrary) keys for metadata. Allowed header values: h1, h2, h3, h4,
h5, h6 e.g. [("h1", "Header 1"), ("h2", "Header 2"].
xslt_path: path to xslt file for document transformation.
Uses a default if not passed.
Needed for html contents that using different format and layouts.
"""
self.headers_to_split_on = dict(headers_to_split_on)
if xslt_path is None:
self.xslt_path = (
pathlib.Path(__file__).parent / "xsl/converting_to_header.xslt"
).absolute()
else:
self.xslt_path = pathlib.Path(xslt_path).absolute()
self.kwargs = kwargs
def split_documents(self, documents: Iterable[Document]) -> List[Document]:
"""Split documents."""
texts, metadatas = [], []
for doc in documents:
texts.append(doc.page_content)
metadatas.append(doc.metadata)
results = self.create_documents(texts, metadatas=metadatas)
text_splitter = RecursiveCharacterTextSplitter(**self.kwargs)
return text_splitter.split_documents(results)
def split_text(self, text: str) -> List[Document]:
"""Split HTML text string
Args:
text: HTML text
"""
return self.split_text_from_file(StringIO(text))
def create_documents(
self, texts: List[str], metadatas: Optional[List[dict]] = None
) -> List[Document]:
"""Create documents from a list of texts."""
_metadatas = metadatas or [{}] * len(texts)
documents = []
for i, text in enumerate(texts):
for chunk in self.split_text(text):
metadata = copy.deepcopy(_metadatas[i])
for key in chunk.metadata.keys():
if chunk.metadata[key] == "#TITLE#":
chunk.metadata[key] = metadata["Title"]
metadata = {**metadata, **chunk.metadata}
new_doc = Document(page_content=chunk.page_content, metadata=metadata)
documents.append(new_doc)
return documents
def split_html_by_headers(
self, html_doc: str
) -> Dict[str, Dict[str, Optional[str]]]:
try:
from bs4 import BeautifulSoup, PageElement # type: ignore[import-untyped]
except ImportError as e:
raise ImportError(
"Unable to import BeautifulSoup/PageElement, \
please install with `pip install \
bs4`."
) from e
soup = BeautifulSoup(html_doc, "html.parser")
headers = list(self.headers_to_split_on.keys())
sections: Dict[str, Dict[str, Optional[str]]] = {}
headers = soup.find_all(["body"] + headers)
for i, header in enumerate(headers):
header_element: PageElement = header
if i == 0:
current_header = "#TITLE#"
current_header_tag = "h1"
section_content: List = []
else:
current_header = header_element.text.strip()
current_header_tag = header_element.name
section_content = []
for element in header_element.next_elements:
if i + 1 < len(headers) and element == headers[i + 1]:
break
if isinstance(element, str):
section_content.append(element)
content = " ".join(section_content).strip()
if content != "":
sections[current_header] = {
"content": content,
"tag_name": current_header_tag,
}
return sections
def convert_possible_tags_to_header(self, html_content: str) -> str:
if self.xslt_path is None:
return html_content
try:
from lxml import etree
except ImportError as e:
raise ImportError(
"Unable to import lxml, please install with `pip install lxml`."
) from e
# use lxml library to parse html document and return xml ElementTree
parser = etree.HTMLParser()
tree = etree.parse(StringIO(html_content), parser)
xslt_tree = etree.parse(self.xslt_path)
transform = etree.XSLT(xslt_tree)
result = transform(tree)
return str(result)
def split_text_from_file(self, file: Any) -> List[Document]:
"""Split HTML file
Args:
file: HTML file
"""
file_content = file.getvalue()
file_content = self.convert_possible_tags_to_header(file_content)
sections = self.split_html_by_headers(file_content)
return [
Document(
cast(str, sections[section_key]["content"]),
metadata={
self.headers_to_split_on[
str(sections[section_key]["tag_name"])
]: section_key
},
)
for section_key in sections.keys()
]