mirror of
https://github.com/hwchase17/langchain
synced 2024-11-18 09:25:54 +00:00
2cd907ad7e
Description: MarkdownHeaderTextSplitter Fails to Parse Headers with non-printable characters. more #20643 The following is the official test case. Just replacing `# Foo\n\n` with `\ufeff# Foo\n\n` will cause the test case to fail. chunk metadata is empty ```python def test_md_header_text_splitter_1() -> None: """Test markdown splitter by header: Case 1.""" markdown_document = ( "\ufeff# Foo\n\n" " ## Bar\n\n" "Hi this is Jim\n\n" "Hi this is Joe\n\n" " ## Baz\n\n" " Hi this is Molly" ) headers_to_split_on = [ ("#", "Header 1"), ("##", "Header 2"), ] markdown_splitter = MarkdownHeaderTextSplitter( headers_to_split_on=headers_to_split_on, ) output = markdown_splitter.split_text(markdown_document) expected_output = [ Document( page_content="Hi this is Jim \nHi this is Joe", metadata={"Header 1": "Foo", "Header 2": "Bar"}, ), Document( page_content="Hi this is Molly", metadata={"Header 1": "Foo", "Header 2": "Baz"}, ), ] assert output == expected_output ``` twitter: @coolbeevip Co-authored-by: Bagatur <22008038+baskaryan@users.noreply.github.com>
224 lines
9.0 KiB
Python
224 lines
9.0 KiB
Python
from __future__ import annotations
|
|
|
|
from typing import Any, Dict, List, Tuple, TypedDict
|
|
|
|
from langchain_core.documents import Document
|
|
|
|
from langchain_text_splitters.base import Language
|
|
from langchain_text_splitters.character import RecursiveCharacterTextSplitter
|
|
|
|
|
|
class MarkdownTextSplitter(RecursiveCharacterTextSplitter):
|
|
"""Attempts to split the text along Markdown-formatted headings."""
|
|
|
|
def __init__(self, **kwargs: Any) -> None:
|
|
"""Initialize a MarkdownTextSplitter."""
|
|
separators = self.get_separators_for_language(Language.MARKDOWN)
|
|
super().__init__(separators=separators, **kwargs)
|
|
|
|
|
|
class MarkdownHeaderTextSplitter:
|
|
"""Splitting markdown files based on specified headers."""
|
|
|
|
def __init__(
|
|
self,
|
|
headers_to_split_on: List[Tuple[str, str]],
|
|
return_each_line: bool = False,
|
|
strip_headers: bool = True,
|
|
):
|
|
"""Create a new MarkdownHeaderTextSplitter.
|
|
|
|
Args:
|
|
headers_to_split_on: Headers we want to track
|
|
return_each_line: Return each line w/ associated headers
|
|
strip_headers: Strip split headers from the content of the chunk
|
|
"""
|
|
# Output line-by-line or aggregated into chunks w/ common headers
|
|
self.return_each_line = return_each_line
|
|
# Given the headers we want to split on,
|
|
# (e.g., "#, ##, etc") order by length
|
|
self.headers_to_split_on = sorted(
|
|
headers_to_split_on, key=lambda split: len(split[0]), reverse=True
|
|
)
|
|
# Strip headers split headers from the content of the chunk
|
|
self.strip_headers = strip_headers
|
|
|
|
def aggregate_lines_to_chunks(self, lines: List[LineType]) -> List[Document]:
|
|
"""Combine lines with common metadata into chunks
|
|
Args:
|
|
lines: Line of text / associated header metadata
|
|
"""
|
|
aggregated_chunks: List[LineType] = []
|
|
|
|
for line in lines:
|
|
if (
|
|
aggregated_chunks
|
|
and aggregated_chunks[-1]["metadata"] == line["metadata"]
|
|
):
|
|
# If the last line in the aggregated list
|
|
# has the same metadata as the current line,
|
|
# append the current content to the last lines's content
|
|
aggregated_chunks[-1]["content"] += " \n" + line["content"]
|
|
elif (
|
|
aggregated_chunks
|
|
and aggregated_chunks[-1]["metadata"] != line["metadata"]
|
|
# may be issues if other metadata is present
|
|
and len(aggregated_chunks[-1]["metadata"]) < len(line["metadata"])
|
|
and aggregated_chunks[-1]["content"].split("\n")[-1][0] == "#"
|
|
and not self.strip_headers
|
|
):
|
|
# If the last line in the aggregated list
|
|
# has different metadata as the current line,
|
|
# and has shallower header level than the current line,
|
|
# and the last line is a header,
|
|
# and we are not stripping headers,
|
|
# append the current content to the last line's content
|
|
aggregated_chunks[-1]["content"] += " \n" + line["content"]
|
|
# and update the last line's metadata
|
|
aggregated_chunks[-1]["metadata"] = line["metadata"]
|
|
else:
|
|
# Otherwise, append the current line to the aggregated list
|
|
aggregated_chunks.append(line)
|
|
|
|
return [
|
|
Document(page_content=chunk["content"], metadata=chunk["metadata"])
|
|
for chunk in aggregated_chunks
|
|
]
|
|
|
|
def split_text(self, text: str) -> List[Document]:
|
|
"""Split markdown file
|
|
Args:
|
|
text: Markdown file"""
|
|
|
|
# Split the input text by newline character ("\n").
|
|
lines = text.split("\n")
|
|
# Final output
|
|
lines_with_metadata: List[LineType] = []
|
|
# Content and metadata of the chunk currently being processed
|
|
current_content: List[str] = []
|
|
current_metadata: Dict[str, str] = {}
|
|
# Keep track of the nested header structure
|
|
# header_stack: List[Dict[str, Union[int, str]]] = []
|
|
header_stack: List[HeaderType] = []
|
|
initial_metadata: Dict[str, str] = {}
|
|
|
|
in_code_block = False
|
|
opening_fence = ""
|
|
|
|
for line in lines:
|
|
stripped_line = line.strip()
|
|
# Remove all non-printable characters from the string, keeping only visible
|
|
# text.
|
|
stripped_line = "".join(filter(str.isprintable, stripped_line))
|
|
if not in_code_block:
|
|
# Exclude inline code spans
|
|
if stripped_line.startswith("```") and stripped_line.count("```") == 1:
|
|
in_code_block = True
|
|
opening_fence = "```"
|
|
elif stripped_line.startswith("~~~"):
|
|
in_code_block = True
|
|
opening_fence = "~~~"
|
|
else:
|
|
if stripped_line.startswith(opening_fence):
|
|
in_code_block = False
|
|
opening_fence = ""
|
|
|
|
if in_code_block:
|
|
current_content.append(stripped_line)
|
|
continue
|
|
|
|
# Check each line against each of the header types (e.g., #, ##)
|
|
for sep, name in self.headers_to_split_on:
|
|
# Check if line starts with a header that we intend to split on
|
|
if stripped_line.startswith(sep) and (
|
|
# Header with no text OR header is followed by space
|
|
# Both are valid conditions that sep is being used a header
|
|
len(stripped_line) == len(sep) or stripped_line[len(sep)] == " "
|
|
):
|
|
# Ensure we are tracking the header as metadata
|
|
if name is not None:
|
|
# Get the current header level
|
|
current_header_level = sep.count("#")
|
|
|
|
# Pop out headers of lower or same level from the stack
|
|
while (
|
|
header_stack
|
|
and header_stack[-1]["level"] >= current_header_level
|
|
):
|
|
# We have encountered a new header
|
|
# at the same or higher level
|
|
popped_header = header_stack.pop()
|
|
# Clear the metadata for the
|
|
# popped header in initial_metadata
|
|
if popped_header["name"] in initial_metadata:
|
|
initial_metadata.pop(popped_header["name"])
|
|
|
|
# Push the current header to the stack
|
|
header: HeaderType = {
|
|
"level": current_header_level,
|
|
"name": name,
|
|
"data": stripped_line[len(sep) :].strip(),
|
|
}
|
|
header_stack.append(header)
|
|
# Update initial_metadata with the current header
|
|
initial_metadata[name] = header["data"]
|
|
|
|
# Add the previous line to the lines_with_metadata
|
|
# only if current_content is not empty
|
|
if current_content:
|
|
lines_with_metadata.append(
|
|
{
|
|
"content": "\n".join(current_content),
|
|
"metadata": current_metadata.copy(),
|
|
}
|
|
)
|
|
current_content.clear()
|
|
|
|
if not self.strip_headers:
|
|
current_content.append(stripped_line)
|
|
|
|
break
|
|
else:
|
|
if stripped_line:
|
|
current_content.append(stripped_line)
|
|
elif current_content:
|
|
lines_with_metadata.append(
|
|
{
|
|
"content": "\n".join(current_content),
|
|
"metadata": current_metadata.copy(),
|
|
}
|
|
)
|
|
current_content.clear()
|
|
|
|
current_metadata = initial_metadata.copy()
|
|
|
|
if current_content:
|
|
lines_with_metadata.append(
|
|
{"content": "\n".join(current_content), "metadata": current_metadata}
|
|
)
|
|
|
|
# lines_with_metadata has each line with associated header metadata
|
|
# aggregate these into chunks based on common metadata
|
|
if not self.return_each_line:
|
|
return self.aggregate_lines_to_chunks(lines_with_metadata)
|
|
else:
|
|
return [
|
|
Document(page_content=chunk["content"], metadata=chunk["metadata"])
|
|
for chunk in lines_with_metadata
|
|
]
|
|
|
|
|
|
class LineType(TypedDict):
|
|
"""Line type as typed dict."""
|
|
|
|
metadata: Dict[str, str]
|
|
content: str
|
|
|
|
|
|
class HeaderType(TypedDict):
|
|
"""Header type as typed dict."""
|
|
|
|
level: int
|
|
name: str
|
|
data: str
|