langchain/libs/community/langchain_community/document_loaders/youtube.py
Leonid Ganeline 932c52c333
community[patch]: docstrings (#16810)
- added missed docstrings
- formated docstrings to the consistent form
2024-02-09 12:48:57 -08:00

450 lines
15 KiB
Python

"""Loads YouTube transcript."""
from __future__ import annotations
import logging
from enum import Enum
from pathlib import Path
from typing import Any, Dict, List, Optional, Sequence, Union
from urllib.parse import parse_qs, urlparse
from langchain_core.documents import Document
from langchain_core.pydantic_v1 import root_validator
from langchain_core.pydantic_v1.dataclasses import dataclass
from langchain_community.document_loaders.base import BaseLoader
logger = logging.getLogger(__name__)
SCOPES = ["https://www.googleapis.com/auth/youtube.readonly"]
@dataclass
class GoogleApiClient:
"""Generic Google API Client.
To use, you should have the ``google_auth_oauthlib,youtube_transcript_api,google``
python package installed.
As the google api expects credentials you need to set up a google account and
register your Service. "https://developers.google.com/docs/api/quickstart/python"
Example:
.. code-block:: python
from langchain_community.document_loaders import GoogleApiClient
google_api_client = GoogleApiClient(
service_account_path=Path("path_to_your_sec_file.json")
)
"""
credentials_path: Path = Path.home() / ".credentials" / "credentials.json"
service_account_path: Path = Path.home() / ".credentials" / "credentials.json"
token_path: Path = Path.home() / ".credentials" / "token.json"
def __post_init__(self) -> None:
self.creds = self._load_credentials()
@root_validator
def validate_channel_or_videoIds_is_set(
cls, values: Dict[str, Any]
) -> Dict[str, Any]:
"""Validate that either folder_id or document_ids is set, but not both."""
if not values.get("credentials_path") and not values.get(
"service_account_path"
):
raise ValueError("Must specify either channel_name or video_ids")
return values
def _load_credentials(self) -> Any:
"""Load credentials."""
# Adapted from https://developers.google.com/drive/api/v3/quickstart/python
try:
from google.auth.transport.requests import Request
from google.oauth2 import service_account
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from youtube_transcript_api import YouTubeTranscriptApi # noqa: F401
except ImportError:
raise ImportError(
"You must run"
"`pip install --upgrade "
"google-api-python-client google-auth-httplib2 "
"google-auth-oauthlib "
"youtube-transcript-api` "
"to use the Google Drive loader"
)
creds = None
if self.service_account_path.exists():
return service_account.Credentials.from_service_account_file(
str(self.service_account_path)
)
if self.token_path.exists():
creds = Credentials.from_authorized_user_file(str(self.token_path), SCOPES)
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
flow = InstalledAppFlow.from_client_secrets_file(
str(self.credentials_path), SCOPES
)
creds = flow.run_local_server(port=0)
with open(self.token_path, "w") as token:
token.write(creds.to_json())
return creds
ALLOWED_SCHEMAS = {"http", "https"}
ALLOWED_NETLOCK = {
"youtu.be",
"m.youtube.com",
"youtube.com",
"www.youtube.com",
"www.youtube-nocookie.com",
"vid.plus",
}
def _parse_video_id(url: str) -> Optional[str]:
"""Parse a youtube url and return the video id if valid, otherwise None."""
parsed_url = urlparse(url)
if parsed_url.scheme not in ALLOWED_SCHEMAS:
return None
if parsed_url.netloc not in ALLOWED_NETLOCK:
return None
path = parsed_url.path
if path.endswith("/watch"):
query = parsed_url.query
parsed_query = parse_qs(query)
if "v" in parsed_query:
ids = parsed_query["v"]
video_id = ids if isinstance(ids, str) else ids[0]
else:
return None
else:
path = parsed_url.path.lstrip("/")
video_id = path.split("/")[-1]
if len(video_id) != 11: # Video IDs are 11 characters long
return None
return video_id
class TranscriptFormat(Enum):
"""Transcript format."""
TEXT = "text"
LINES = "lines"
class YoutubeLoader(BaseLoader):
"""Load `YouTube` transcripts."""
def __init__(
self,
video_id: str,
add_video_info: bool = False,
language: Union[str, Sequence[str]] = "en",
translation: Optional[str] = None,
transcript_format: TranscriptFormat = TranscriptFormat.TEXT,
continue_on_failure: bool = False,
):
"""Initialize with YouTube video ID."""
self.video_id = video_id
self.add_video_info = add_video_info
self.language = language
if isinstance(language, str):
self.language = [language]
else:
self.language = language
self.translation = translation
self.transcript_format = transcript_format
self.continue_on_failure = continue_on_failure
@staticmethod
def extract_video_id(youtube_url: str) -> str:
"""Extract video id from common YT urls."""
video_id = _parse_video_id(youtube_url)
if not video_id:
raise ValueError(
f"Could not determine the video ID for the URL {youtube_url}"
)
return video_id
@classmethod
def from_youtube_url(cls, youtube_url: str, **kwargs: Any) -> YoutubeLoader:
"""Given youtube URL, load video."""
video_id = cls.extract_video_id(youtube_url)
return cls(video_id, **kwargs)
def load(self) -> List[Document]:
"""Load documents."""
try:
from youtube_transcript_api import (
NoTranscriptFound,
TranscriptsDisabled,
YouTubeTranscriptApi,
)
except ImportError:
raise ImportError(
"Could not import youtube_transcript_api python package. "
"Please install it with `pip install youtube-transcript-api`."
)
metadata = {"source": self.video_id}
if self.add_video_info:
# Get more video meta info
# Such as title, description, thumbnail url, publish_date
video_info = self._get_video_info()
metadata.update(video_info)
try:
transcript_list = YouTubeTranscriptApi.list_transcripts(self.video_id)
except TranscriptsDisabled:
return []
try:
transcript = transcript_list.find_transcript(self.language)
except NoTranscriptFound:
transcript = transcript_list.find_transcript(["en"])
if self.translation is not None:
transcript = transcript.translate(self.translation)
transcript_pieces = transcript.fetch()
if self.transcript_format == TranscriptFormat.TEXT:
transcript = " ".join([t["text"].strip(" ") for t in transcript_pieces])
return [Document(page_content=transcript, metadata=metadata)]
elif self.transcript_format == TranscriptFormat.LINES:
return [
Document(
page_content=t["text"].strip(" "),
metadata=dict((key, t[key]) for key in t if key != "text"),
)
for t in transcript_pieces
]
else:
raise ValueError("Unknown transcript format.")
def _get_video_info(self) -> dict:
"""Get important video information.
Components are:
- title
- description
- thumbnail url,
- publish_date
- channel_author
- and more.
"""
try:
from pytube import YouTube
except ImportError:
raise ImportError(
"Could not import pytube python package. "
"Please install it with `pip install pytube`."
)
yt = YouTube(f"https://www.youtube.com/watch?v={self.video_id}")
video_info = {
"title": yt.title or "Unknown",
"description": yt.description or "Unknown",
"view_count": yt.views or 0,
"thumbnail_url": yt.thumbnail_url or "Unknown",
"publish_date": yt.publish_date.strftime("%Y-%m-%d %H:%M:%S")
if yt.publish_date
else "Unknown",
"length": yt.length or 0,
"author": yt.author or "Unknown",
}
return video_info
@dataclass
class GoogleApiYoutubeLoader(BaseLoader):
"""Load all Videos from a `YouTube` Channel.
To use, you should have the ``googleapiclient,youtube_transcript_api``
python package installed.
As the service needs a google_api_client, you first have to initialize
the GoogleApiClient.
Additionally you have to either provide a channel name or a list of videoids
"https://developers.google.com/docs/api/quickstart/python"
Example:
.. code-block:: python
from langchain_community.document_loaders import GoogleApiClient
from langchain_community.document_loaders import GoogleApiYoutubeLoader
google_api_client = GoogleApiClient(
service_account_path=Path("path_to_your_sec_file.json")
)
loader = GoogleApiYoutubeLoader(
google_api_client=google_api_client,
channel_name = "CodeAesthetic"
)
load.load()
"""
google_api_client: GoogleApiClient
channel_name: Optional[str] = None
video_ids: Optional[List[str]] = None
add_video_info: bool = True
captions_language: str = "en"
continue_on_failure: bool = False
def __post_init__(self) -> None:
self.youtube_client = self._build_youtube_client(self.google_api_client.creds)
def _build_youtube_client(self, creds: Any) -> Any:
try:
from googleapiclient.discovery import build
from youtube_transcript_api import YouTubeTranscriptApi # noqa: F401
except ImportError:
raise ImportError(
"You must run"
"`pip install --upgrade "
"google-api-python-client google-auth-httplib2 "
"google-auth-oauthlib "
"youtube-transcript-api` "
"to use the Google Drive loader"
)
return build("youtube", "v3", credentials=creds)
@root_validator
def validate_channel_or_videoIds_is_set(
cls, values: Dict[str, Any]
) -> Dict[str, Any]:
"""Validate that either folder_id or document_ids is set, but not both."""
if not values.get("channel_name") and not values.get("video_ids"):
raise ValueError("Must specify either channel_name or video_ids")
return values
def _get_transcripe_for_video_id(self, video_id: str) -> str:
from youtube_transcript_api import NoTranscriptFound, YouTubeTranscriptApi
transcript_list = YouTubeTranscriptApi.list_transcripts(video_id)
try:
transcript = transcript_list.find_transcript([self.captions_language])
except NoTranscriptFound:
for available_transcript in transcript_list:
transcript = available_transcript.translate(self.captions_language)
continue
transcript_pieces = transcript.fetch()
return " ".join([t["text"].strip(" ") for t in transcript_pieces])
def _get_document_for_video_id(self, video_id: str, **kwargs: Any) -> Document:
captions = self._get_transcripe_for_video_id(video_id)
video_response = (
self.youtube_client.videos()
.list(
part="id,snippet",
id=video_id,
)
.execute()
)
return Document(
page_content=captions,
metadata=video_response.get("items")[0],
)
def _get_channel_id(self, channel_name: str) -> str:
request = self.youtube_client.search().list(
part="id",
q=channel_name,
type="channel",
maxResults=1, # we only need one result since channel names are unique
)
response = request.execute()
channel_id = response["items"][0]["id"]["channelId"]
return channel_id
def _get_document_for_channel(self, channel: str, **kwargs: Any) -> List[Document]:
try:
from youtube_transcript_api import (
NoTranscriptFound,
TranscriptsDisabled,
)
except ImportError:
raise ImportError(
"You must run"
"`pip install --upgrade "
"youtube-transcript-api` "
"to use the youtube loader"
)
channel_id = self._get_channel_id(channel)
request = self.youtube_client.search().list(
part="id,snippet",
channelId=channel_id,
maxResults=50, # adjust this value to retrieve more or fewer videos
)
video_ids = []
while request is not None:
response = request.execute()
# Add each video ID to the list
for item in response["items"]:
if not item["id"].get("videoId"):
continue
meta_data = {"videoId": item["id"]["videoId"]}
if self.add_video_info:
item["snippet"].pop("thumbnails")
meta_data.update(item["snippet"])
try:
page_content = self._get_transcripe_for_video_id(
item["id"]["videoId"]
)
video_ids.append(
Document(
page_content=page_content,
metadata=meta_data,
)
)
except (TranscriptsDisabled, NoTranscriptFound) as e:
if self.continue_on_failure:
logger.error(
"Error fetching transscript "
+ f" {item['id']['videoId']}, exception: {e}"
)
else:
raise e
pass
request = self.youtube_client.search().list_next(request, response)
return video_ids
def load(self) -> List[Document]:
"""Load documents."""
document_list = []
if self.channel_name:
document_list.extend(self._get_document_for_channel(self.channel_name))
elif self.video_ids:
document_list.extend(
[
self._get_document_for_video_id(video_id)
for video_id in self.video_ids
]
)
else:
raise ValueError("Must specify either channel_name or video_ids")
return document_list