You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
393 lines
13 KiB
Python
393 lines
13 KiB
Python
"""
|
|
functionality:
|
|
- get metadata from youtube for a playlist
|
|
- index and update in es
|
|
"""
|
|
|
|
import json
|
|
from datetime import datetime
|
|
|
|
from home.src.download.thumbnails import ThumbManager
|
|
from home.src.es.connect import ElasticWrap
|
|
from home.src.index.generic import YouTubeItem
|
|
from home.src.index.video import YoutubeVideo
|
|
|
|
|
|
class YoutubePlaylist(YouTubeItem):
|
|
"""represents a single youtube playlist"""
|
|
|
|
es_path = False
|
|
index_name = "ta_playlist"
|
|
yt_obs = {
|
|
"extract_flat": True,
|
|
"allow_playlist_files": True,
|
|
}
|
|
yt_base = "https://www.youtube.com/playlist?list="
|
|
|
|
def __init__(self, youtube_id):
|
|
super().__init__(youtube_id)
|
|
self.all_members = False
|
|
self.nav = False
|
|
self.all_youtube_ids = []
|
|
|
|
def build_json(self, scrape=False):
|
|
"""collection to create json_data"""
|
|
self.get_from_es()
|
|
if self.json_data:
|
|
subscribed = self.json_data.get("playlist_subscribed")
|
|
else:
|
|
subscribed = False
|
|
|
|
if scrape or not self.json_data:
|
|
self.get_from_youtube()
|
|
if not self.youtube_meta:
|
|
self.json_data = False
|
|
return
|
|
|
|
self.process_youtube_meta()
|
|
self.get_entries()
|
|
self.json_data["playlist_entries"] = self.all_members
|
|
self.json_data["playlist_subscribed"] = subscribed
|
|
|
|
def process_youtube_meta(self):
|
|
"""extract relevant fields from youtube"""
|
|
try:
|
|
playlist_thumbnail = self.youtube_meta["thumbnails"][-1]["url"]
|
|
except IndexError:
|
|
print(f"{self.youtube_id}: thumbnail extraction failed")
|
|
playlist_thumbnail = False
|
|
|
|
self.json_data = {
|
|
"playlist_id": self.youtube_id,
|
|
"playlist_active": True,
|
|
"playlist_name": self.youtube_meta["title"],
|
|
"playlist_channel": self.youtube_meta["channel"],
|
|
"playlist_channel_id": self.youtube_meta["channel_id"],
|
|
"playlist_thumbnail": playlist_thumbnail,
|
|
"playlist_description": self.youtube_meta["description"] or False,
|
|
"playlist_last_refresh": int(datetime.now().timestamp()),
|
|
"playlist_type": "regular",
|
|
}
|
|
|
|
def get_entries(self, playlistend=False):
|
|
"""get all videos in playlist"""
|
|
if playlistend:
|
|
# implement playlist end
|
|
print(playlistend)
|
|
all_members = []
|
|
for idx, entry in enumerate(self.youtube_meta["entries"]):
|
|
if self.all_youtube_ids:
|
|
downloaded = entry["id"] in self.all_youtube_ids
|
|
else:
|
|
downloaded = False
|
|
if not entry["channel"]:
|
|
continue
|
|
to_append = {
|
|
"youtube_id": entry["id"],
|
|
"title": entry["title"],
|
|
"uploader": entry["channel"],
|
|
"idx": idx,
|
|
"downloaded": downloaded,
|
|
}
|
|
all_members.append(to_append)
|
|
|
|
self.all_members = all_members
|
|
|
|
def get_playlist_art(self):
|
|
"""download artwork of playlist"""
|
|
url = self.json_data["playlist_thumbnail"]
|
|
ThumbManager(self.youtube_id, item_type="playlist").download(url)
|
|
|
|
def add_vids_to_playlist(self):
|
|
"""sync the playlist id to videos"""
|
|
script = (
|
|
'if (!ctx._source.containsKey("playlist")) '
|
|
+ "{ctx._source.playlist = [params.playlist]} "
|
|
+ "else if (!ctx._source.playlist.contains(params.playlist)) "
|
|
+ "{ctx._source.playlist.add(params.playlist)} "
|
|
+ "else {ctx.op = 'none'}"
|
|
)
|
|
|
|
bulk_list = []
|
|
for entry in self.json_data["playlist_entries"]:
|
|
video_id = entry["youtube_id"]
|
|
action = {"update": {"_id": video_id, "_index": "ta_video"}}
|
|
source = {
|
|
"script": {
|
|
"source": script,
|
|
"lang": "painless",
|
|
"params": {"playlist": self.youtube_id},
|
|
}
|
|
}
|
|
bulk_list.append(json.dumps(action))
|
|
bulk_list.append(json.dumps(source))
|
|
|
|
# add last newline
|
|
bulk_list.append("\n")
|
|
query_str = "\n".join(bulk_list)
|
|
|
|
ElasticWrap("_bulk").post(query_str, ndjson=True)
|
|
|
|
def update_playlist(self):
|
|
"""update metadata for playlist with data from YouTube"""
|
|
self.get_from_es()
|
|
subscribed = self.json_data["playlist_subscribed"]
|
|
self.get_from_youtube()
|
|
if not self.json_data:
|
|
# return false to deactivate
|
|
return False
|
|
|
|
self.json_data["playlist_subscribed"] = subscribed
|
|
self.upload_to_es()
|
|
return True
|
|
|
|
def build_nav(self, youtube_id):
|
|
"""find next and previous in playlist of a given youtube_id"""
|
|
all_entries_available = self.json_data["playlist_entries"]
|
|
all_entries = [i for i in all_entries_available if i["downloaded"]]
|
|
current = [i for i in all_entries if i["youtube_id"] == youtube_id]
|
|
# stop if not found or playlist of 1
|
|
if not current or not len(all_entries) > 1:
|
|
return
|
|
|
|
current_idx = all_entries.index(current[0])
|
|
if current_idx == 0:
|
|
previous_item = False
|
|
else:
|
|
previous_item = all_entries[current_idx - 1]
|
|
prev_id = previous_item["youtube_id"]
|
|
previous_item["vid_thumb"] = ThumbManager(prev_id).vid_thumb_path()
|
|
|
|
if current_idx == len(all_entries) - 1:
|
|
next_item = False
|
|
else:
|
|
next_item = all_entries[current_idx + 1]
|
|
next_id = next_item["youtube_id"]
|
|
next_item["vid_thumb"] = ThumbManager(next_id).vid_thumb_path()
|
|
|
|
self.nav = {
|
|
"playlist_meta": {
|
|
"current_idx": current[0]["idx"],
|
|
"playlist_id": self.youtube_id,
|
|
"playlist_name": self.json_data["playlist_name"],
|
|
"playlist_channel": self.json_data["playlist_channel"],
|
|
},
|
|
"playlist_previous": previous_item,
|
|
"playlist_next": next_item,
|
|
}
|
|
return
|
|
|
|
def delete_metadata(self):
|
|
"""delete metadata for playlist"""
|
|
self.delete_videos_metadata()
|
|
script = (
|
|
"ctx._source.playlist.removeAll("
|
|
+ "Collections.singleton(params.playlist)) "
|
|
)
|
|
data = {
|
|
"query": {
|
|
"term": {"playlist.keyword": {"value": self.youtube_id}}
|
|
},
|
|
"script": {
|
|
"source": script,
|
|
"lang": "painless",
|
|
"params": {"playlist": self.youtube_id},
|
|
},
|
|
}
|
|
_, _ = ElasticWrap("ta_video/_update_by_query").post(data)
|
|
self.del_in_es()
|
|
|
|
def is_custom_playlist(self):
|
|
self.get_from_es()
|
|
return self.json_data["playlist_type"] == "custom"
|
|
|
|
def delete_videos_metadata(self, channel_id=None):
|
|
"""delete video metadata for a specific channel"""
|
|
self.get_from_es()
|
|
playlist = self.json_data["playlist_entries"]
|
|
i = 0
|
|
while i < len(playlist):
|
|
video_id = playlist[i]["youtube_id"]
|
|
video = YoutubeVideo(video_id)
|
|
video.get_from_es()
|
|
if (
|
|
channel_id is None
|
|
or video.json_data["channel"]["channel_id"] == channel_id
|
|
):
|
|
playlist.pop(i)
|
|
self.remove_playlist_from_video(video_id)
|
|
i -= 1
|
|
i += 1
|
|
self.set_playlist_thumbnail()
|
|
self.upload_to_es()
|
|
|
|
def delete_videos_playlist(self):
|
|
"""delete playlist with all videos"""
|
|
print(f"{self.youtube_id}: delete playlist")
|
|
self.get_from_es()
|
|
all_youtube_id = [
|
|
i["youtube_id"]
|
|
for i in self.json_data["playlist_entries"]
|
|
if i["downloaded"]
|
|
]
|
|
for youtube_id in all_youtube_id:
|
|
YoutubeVideo(youtube_id).delete_media_file()
|
|
|
|
self.delete_metadata()
|
|
|
|
def create(self, name):
|
|
self.json_data = {
|
|
"playlist_id": self.youtube_id,
|
|
"playlist_active": False,
|
|
"playlist_name": name,
|
|
"playlist_last_refresh": int(datetime.now().timestamp()),
|
|
"playlist_entries": [],
|
|
"playlist_type": "custom",
|
|
"playlist_channel": None,
|
|
"playlist_channel_id": None,
|
|
"playlist_description": False,
|
|
"playlist_thumbnail": False,
|
|
"playlist_subscribed": False,
|
|
}
|
|
self.upload_to_es()
|
|
self.get_playlist_art()
|
|
return True
|
|
|
|
def add_video_to_playlist(self, video_id):
|
|
self.get_from_es()
|
|
video_metadata = self.get_video_metadata(video_id)
|
|
video_metadata["idx"] = len(self.json_data["playlist_entries"])
|
|
|
|
if not self.playlist_entries_contains(video_id):
|
|
self.json_data["playlist_entries"].append(video_metadata)
|
|
self.json_data["playlist_last_refresh"] = int(
|
|
datetime.now().timestamp()
|
|
)
|
|
self.set_playlist_thumbnail()
|
|
self.upload_to_es()
|
|
video = YoutubeVideo(video_id)
|
|
video.get_from_es()
|
|
if "playlist" not in video.json_data:
|
|
video.json_data["playlist"] = []
|
|
video.json_data["playlist"].append(self.youtube_id)
|
|
video.upload_to_es()
|
|
return True
|
|
|
|
def remove_playlist_from_video(self, video_id):
|
|
video = YoutubeVideo(video_id)
|
|
video.get_from_es()
|
|
if video.json_data is not None and "playlist" in video.json_data:
|
|
video.json_data["playlist"].remove(self.youtube_id)
|
|
video.upload_to_es()
|
|
|
|
def move_video(self, video_id, action, hide_watched=False):
|
|
self.get_from_es()
|
|
video_index = self.get_video_index(video_id)
|
|
playlist = self.json_data["playlist_entries"]
|
|
item = playlist[video_index]
|
|
playlist.pop(video_index)
|
|
if action == "remove":
|
|
self.remove_playlist_from_video(item["youtube_id"])
|
|
else:
|
|
if action == "up":
|
|
while True:
|
|
video_index = max(0, video_index - 1)
|
|
if (
|
|
not hide_watched
|
|
or video_index == 0
|
|
or (
|
|
not self.get_video_is_watched(
|
|
playlist[video_index]["youtube_id"]
|
|
)
|
|
)
|
|
):
|
|
break
|
|
elif action == "down":
|
|
while True:
|
|
video_index = min(len(playlist), video_index + 1)
|
|
if (
|
|
not hide_watched
|
|
or video_index == len(playlist)
|
|
or (
|
|
not self.get_video_is_watched(
|
|
playlist[video_index - 1]["youtube_id"]
|
|
)
|
|
)
|
|
):
|
|
break
|
|
elif action == "top":
|
|
video_index = 0
|
|
else:
|
|
video_index = len(playlist)
|
|
playlist.insert(video_index, item)
|
|
self.json_data["playlist_last_refresh"] = int(
|
|
datetime.now().timestamp()
|
|
)
|
|
|
|
for i, item in enumerate(playlist):
|
|
item["idx"] = i
|
|
|
|
self.set_playlist_thumbnail()
|
|
self.upload_to_es()
|
|
|
|
return True
|
|
|
|
def del_video(self, video_id):
|
|
playlist = self.json_data["playlist_entries"]
|
|
|
|
i = 0
|
|
while i < len(playlist):
|
|
if video_id == playlist[i]["youtube_id"]:
|
|
playlist.pop(i)
|
|
self.set_playlist_thumbnail()
|
|
i -= 1
|
|
i += 1
|
|
|
|
def get_video_index(self, video_id):
|
|
for i, child in enumerate(self.json_data["playlist_entries"]):
|
|
if child["youtube_id"] == video_id:
|
|
return i
|
|
return -1
|
|
|
|
def playlist_entries_contains(self, video_id):
|
|
return (
|
|
len(
|
|
list(
|
|
filter(
|
|
lambda x: x["youtube_id"] == video_id,
|
|
self.json_data["playlist_entries"],
|
|
)
|
|
)
|
|
)
|
|
> 0
|
|
)
|
|
|
|
def get_video_is_watched(self, video_id):
|
|
video = YoutubeVideo(video_id)
|
|
video.get_from_es()
|
|
return video.json_data["player"]["watched"]
|
|
|
|
def set_playlist_thumbnail(self):
|
|
playlist = self.json_data["playlist_entries"]
|
|
self.json_data["playlist_thumbnail"] = False
|
|
|
|
for video in playlist:
|
|
url = ThumbManager(video["youtube_id"]).vid_thumb_path()
|
|
if url is not None:
|
|
self.json_data["playlist_thumbnail"] = url
|
|
break
|
|
self.get_playlist_art()
|
|
|
|
def get_video_metadata(self, video_id):
|
|
video = YoutubeVideo(video_id)
|
|
video.get_from_es()
|
|
video_json_data = {
|
|
"youtube_id": video.json_data["youtube_id"],
|
|
"title": video.json_data["title"],
|
|
"uploader": video.json_data["channel"]["channel_name"],
|
|
"idx": 0,
|
|
"downloaded": "date_downloaded" in video.json_data
|
|
and video.json_data["date_downloaded"] > 0,
|
|
}
|
|
return video_json_data
|