You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

168 lines
6.0 KiB

import functools
import logging
import re
from pathlib import Path
from typing import Any, Dict, Iterator, Union
import yaml
from langchain_core.documents import Document
from langchain_community.document_loaders.base import BaseLoader
logger = logging.getLogger(__name__)
class ObsidianLoader(BaseLoader):
"""Load `Obsidian` files from directory."""
FRONT_MATTER_REGEX = re.compile(r"^---\n(.*?)\n---\n", re.DOTALL)
TEMPLATE_VARIABLE_REGEX = re.compile(r"{{(.*?)}}", re.DOTALL)
TAG_REGEX = re.compile(r"[^\S\/]#([a-zA-Z_]+[-_/\w]*)")
DATAVIEW_LINE_REGEX = re.compile(r"^\s*(\w+)::\s*(.*)$", re.MULTILINE)
DATAVIEW_INLINE_BRACKET_REGEX = re.compile(r"\[(\w+)::\s*(.*)\]", re.MULTILINE)
DATAVIEW_INLINE_PAREN_REGEX = re.compile(r"\((\w+)::\s*(.*)\)", re.MULTILINE)
def __init__(
path: Union[str, Path],
encoding: str = "UTF-8",
collect_metadata: bool = True,
"""Initialize with a path.
path: Path to the directory containing the Obsidian files.
encoding: Charset encoding, defaults to "UTF-8"
collect_metadata: Whether to collect metadata from the front matter.
Defaults to True.
self.file_path = path
self.encoding = encoding
self.collect_metadata = collect_metadata
def _replace_template_var(
self, placeholders: Dict[str, str], match: re.Match
) -> str:
"""Replace a template variable with a placeholder."""
placeholder = f"__TEMPLATE_VAR_{len(placeholders)}__"
placeholders[placeholder] =
return placeholder
def _restore_template_vars(self, obj: Any, placeholders: Dict[str, str]) -> Any:
"""Restore template variables replaced with placeholders to original values."""
if isinstance(obj, str):
for placeholder, value in placeholders.items():
obj = obj.replace(placeholder, f"{{{{{value}}}}}")
elif isinstance(obj, dict):
for key, value in obj.items():
obj[key] = self._restore_template_vars(value, placeholders)
elif isinstance(obj, list):
for i, item in enumerate(obj):
obj[i] = self._restore_template_vars(item, placeholders)
return obj
def _parse_front_matter(self, content: str) -> dict:
"""Parse front matter metadata from the content and return it as a dict."""
if not self.collect_metadata:
return {}
match =
if not match:
return {}
placeholders: Dict[str, str] = {}
replace_template_var = functools.partial(
self._replace_template_var, placeholders
front_matter_text = self.TEMPLATE_VARIABLE_REGEX.sub(
front_matter = yaml.safe_load(front_matter_text)
front_matter = self._restore_template_vars(front_matter, placeholders)
# If tags are a string, split them into a list
if "tags" in front_matter and isinstance(front_matter["tags"], str):
front_matter["tags"] = front_matter["tags"].split(", ")
return front_matter
except yaml.parser.ParserError:
logger.warning("Encountered non-yaml frontmatter")
return {}
def _to_langchain_compatible_metadata(self, metadata: dict) -> dict:
"""Convert a dictionary to a compatible with langchain."""
result = {}
for key, value in metadata.items():
if type(value) in {str, int, float}:
result[key] = value
result[key] = str(value)
return result
def _parse_document_tags(self, content: str) -> set:
"""Return a set of all tags in within the document."""
if not self.collect_metadata:
return set()
match = self.TAG_REGEX.findall(content)
if not match:
return set()
return {tag for tag in match}
def _parse_dataview_fields(self, content: str) -> dict:
"""Parse obsidian dataview plugin fields from the content and return it
as a dict."""
if not self.collect_metadata:
return {}
return {
match[0]: match[1]
for match in self.DATAVIEW_LINE_REGEX.findall(content)
match[0]: match[1]
for match in self.DATAVIEW_INLINE_PAREN_REGEX.findall(content)
match[0]: match[1]
for match in self.DATAVIEW_INLINE_BRACKET_REGEX.findall(content)
def _remove_front_matter(self, content: str) -> str:
"""Remove front matter metadata from the given content."""
if not self.collect_metadata:
return content
return self.FRONT_MATTER_REGEX.sub("", content)
def lazy_load(self) -> Iterator[Document]:
paths = list(Path(self.file_path).glob("**/*.md"))
for path in paths:
with open(path, encoding=self.encoding) as f:
text =
front_matter = self._parse_front_matter(text)
tags = self._parse_document_tags(text)
dataview_fields = self._parse_dataview_fields(text)
text = self._remove_front_matter(text)
metadata = {
"source": str(,
"path": str(path),
"created": path.stat().st_ctime,
"last_modified": path.stat().st_mtime,
"last_accessed": path.stat().st_atime,
if tags or front_matter.get("tags"):
metadata["tags"] = ",".join(
tags | set(front_matter.get("tags", []) or [])
yield Document(page_content=text, metadata=metadata)