@ -5,36 +5,132 @@ Functionality:
"""
import json
import os
from datetime import datetime
import requests
import yt_dlp
from home . src . download . subscriptions import ChannelSubscription
from home . src . es . connect import IndexPaginate
from home . src . download . subscriptions import (
ChannelSubscription ,
PlaylistSubscription ,
)
from home . src . download . thumbnails import ThumbManager
from home . src . es . connect import ElasticWrap , IndexPaginate
from home . src . index . playlist import YoutubePlaylist
from home . src . ta . config import AppConfig
from home . src . ta . helper import DurationConverter , ignore_filelist
from home . src . ta . helper import DurationConverter
from home . src . ta . ta_redis import RedisArchivist
class PendingList :
""" manage the pending videos list """
CONFIG = AppConfig ( ) . config
ES_URL = CONFIG [ " application " ] [ " es_url " ]
ES_AUTH = CONFIG [ " application " ] [ " es_auth " ]
VIDEOS = CONFIG [ " application " ] [ " videos " ]
class PendingIndex :
""" base class holding all export methods """
def __init__ ( self ) :
self . all_channel_ids = False
self . all_downloaded = False
self . missing_from_playlists = [ ]
self . all_pending = False
self . all_ignored = False
self . all_videos = False
self . all_channels = False
self . channel_overwrites = False
self . video_overwrites = False
self . to_skip = False
def get_download ( self ) :
""" get a list of all pending videos in ta_download """
data = {
" query " : { " match_all " : { } } ,
" sort " : [ { " timestamp " : { " order " : " asc " } } ] ,
}
all_results = IndexPaginate ( " ta_download " , data ) . get_results ( )
self . all_pending = [ ]
self . all_ignored = [ ]
self . to_skip = [ ]
for result in all_results :
self . to_skip . append ( result [ " youtube_id " ] )
if result [ " status " ] == " pending " :
self . all_pending . append ( result )
elif result [ " status " ] == " ignore " :
self . all_ignored . append ( result )
def get_indexed ( self ) :
""" get a list of all videos indexed """
data = {
" query " : { " match_all " : { } } ,
" sort " : [ { " published " : { " order " : " desc " } } ] ,
}
self . all_videos = IndexPaginate ( " ta_video " , data ) . get_results ( )
for video in self . all_videos :
self . to_skip . append ( video [ " youtube_id " ] )
def get_channels ( self ) :
""" get a list of all channels indexed """
self . all_channels = [ ]
self . channel_overwrites = { }
data = {
" query " : { " match_all " : { } } ,
" sort " : [ { " channel_id " : { " order " : " asc " } } ] ,
}
channels = IndexPaginate ( " ta_channel " , data ) . get_results ( )
for channel in channels :
channel_id = channel [ " channel_id " ]
self . all_channels . append ( channel_id )
if channel . get ( " channel_overwrites " ) :
self . channel_overwrites . update (
{ channel_id : channel . get ( " channel_overwrites " ) }
)
def parse_url_list ( self , youtube_ids ) :
self . _map_overwrites ( )
def _map_overwrites ( self ) :
""" map video ids to channel ids overwrites """
self . video_overwrites = { }
for video in self . all_pending :
video_id = video [ " youtube_id " ]
channel_id = video [ " channel_id " ]
overwrites = self . channel_overwrites . get ( channel_id , False )
if overwrites :
self . video_overwrites . update ( { video_id : overwrites } )
class PendingInteract :
""" interact with items in download queue """
def __init__ ( self , video_id = False , status = False ) :
self . video_id = video_id
self . status = status
def delete_item ( self ) :
""" delete single item from pending """
path = f " ta_download/_doc/ { self . video_id } "
_ , _ = ElasticWrap ( path ) . delete ( )
def delete_by_status ( self ) :
""" delete all matching item by status """
data = { " query " : { " term " : { " status " : { " value " : self . status } } } }
path = " ta_download/_delete_by_query "
_ , _ = ElasticWrap ( path ) . post ( data = data )
def update_status ( self ) :
""" update status field of pending item """
data = { " doc " : { " status " : self . status } }
path = f " ta_download/_update/ { self . video_id } "
_ , _ = ElasticWrap ( path ) . post ( data = data )
class PendingList ( PendingIndex ) :
""" manage the pending videos list """
def __init__ ( self , youtube_ids = False ) :
super ( ) . __init__ ( )
self . youtube_ids = youtube_ids
self . to_skip = False
self . missing_videos = False
def parse_url_list ( self ) :
""" extract youtube ids from list """
missing_videos = [ ]
for entry in youtube_ids :
self . missing_videos = [ ]
self . get_download ( )
self . get_indexed ( )
for entry in self . youtube_ids :
# notify
mess_dict = {
" status " : " message:add " ,
@ -43,97 +139,89 @@ class PendingList:
" message " : " Extracting lists " ,
}
RedisArchivist ( ) . set_message ( " message:add " , mess_dict )
# extract
url = entry [ " url " ]
url_type = entry [ " type " ]
if url_type == " video " :
missing_videos . append ( url )
elif url_type == " channel " :
video_results = ChannelSubscription ( ) . get_last_youtube_videos (
url , limit = False
)
youtube_ids = [ i [ 0 ] for i in video_results ]
missing_videos = missing_videos + youtube_ids
elif url_type == " playlist " :
self . missing_from_playlists . append ( entry )
playlist = YoutubePlaylist ( url )
playlist . build_json ( )
video_results = playlist . json_data . get ( " playlist_entries " )
youtube_ids = [ i [ " youtube_id " ] for i in video_results ]
missing_videos = missing_videos + youtube_ids
return missing_videos
def add_to_pending ( self , missing_videos , ignore = False ) :
""" build the bulk json data from pending """
# check if channel is indexed
channel_handler = ChannelSubscription ( )
all_indexed = channel_handler . get_channels ( subscribed_only = False )
self . all_channel_ids = [ i [ " channel_id " ] for i in all_indexed ]
# check if already there
self . all_downloaded = self . get_all_downloaded ( )
bulk_list , all_videos_added = self . build_bulk ( missing_videos , ignore )
# add last newline
bulk_list . append ( " \n " )
query_str = " \n " . join ( bulk_list )
headers = { " Content-type " : " application/x-ndjson " }
url = self . ES_URL + " /_bulk "
request = requests . post (
url , data = query_str , headers = headers , auth = self . ES_AUTH
self . _process_entry ( entry )
def _process_entry ( self , entry ) :
""" process single entry from url list """
if entry [ " type " ] == " video " :
self . _add_video ( entry [ " url " ] )
elif entry [ " type " ] == " channel " :
self . _parse_channel ( entry [ " url " ] )
elif entry [ " type " ] == " playlist " :
self . _parse_playlist ( entry [ " url " ] )
new_thumbs = PlaylistSubscription ( ) . process_url_str (
[ entry ] , subscribed = False
)
ThumbManager ( ) . download_playlist ( new_thumbs )
else :
raise ValueError ( f " invalid url_type: { entry } " )
def _add_video ( self , url ) :
""" add video to list """
if url not in self . missing_videos and url not in self . to_skip :
self . missing_videos . append ( url )
def _parse_channel ( self , url ) :
""" add all videos of channel to list """
video_results = ChannelSubscription ( ) . get_last_youtube_videos (
url , limit = False
)
if not request . ok :
print ( request )
raise ValueError ( " failed to add video to download queue " )
return all_videos_added
def build_bulk ( self , missing_videos , ignore = False ) :
""" build the bulk lists """
youtube_ids = [ i [ 0 ] for i in video_results ]
for video_id in youtube_ids :
self . _add_video ( video_id )
def _parse_playlist ( self , url ) :
""" add all videos of playlist to list """
playlist = YoutubePlaylist ( url )
playlist . build_json ( )
video_results = playlist . json_data . get ( " playlist_entries " )
youtube_ids = [ i [ " youtube_id " ] for i in video_results ]
for video_id in youtube_ids :
self . _add_video ( video_id )
def add_to_pending ( self , status = " pending " ) :
""" add missing videos to pending list """
self . get_channels ( )
bulk_list = [ ]
all_videos_added = [ ]
for idx , youtube_id in enumerate ( missing_videos ) :
# check if already downloaded
if youtube_id in self . all_downloaded :
thumb_handler = ThumbManager ( )
for idx , youtube_id in enumerate ( self . missing_videos ) :
video_details = self . get_youtube_details ( youtube_id )
if not video_details :
continue
video = self . get_youtube_details ( youtube_id )
# skip on download error
if not video :
continue
channel_indexed = video [ " channel_id " ] in self . all_channel_ids
video [ " channel_indexed " ] = channel_indexed
if ignore :
video [ " status " ] = " ignore "
else :
video [ " status " ] = " pending "
video_details [ " status " ] = status
action = { " create " : { " _id " : youtube_id , " _index " : " ta_download " } }
bulk_list . append ( json . dumps ( action ) )
bulk_list . append ( json . dumps ( video ) )
all_videos_added . append ( ( youtube_id , video [ " vid_thumb_url " ] ) )
# notify
progress = f " { idx + 1 } / { len ( missing_videos ) } "
mess_dict = {
" status " : " message:add " ,
" level " : " info " ,
" title " : " Adding new videos to download queue. " ,
" message " : " Progress: " + progress ,
}
if idx + 1 == len ( missing_videos ) :
RedisArchivist ( ) . set_message (
" message:add " , mess_dict , expire = 4
)
else :
RedisArchivist ( ) . set_message ( " message:add " , mess_dict )
if idx + 1 % 25 == 0 :
print ( " adding to queue progress: " + progress )
bulk_list . append ( json . dumps ( video_details ) )
thumb_needed = [ ( youtube_id , video_details [ " vid_thumb_url " ] ) ]
thumb_handler . download_vid ( thumb_needed )
self . _notify_add ( idx )
# add last newline
bulk_list . append ( " \n " )
query_str = " \n " . join ( bulk_list )
_ , _ = ElasticWrap ( " _bulk " ) . post ( query_str , ndjson = True )
def _notify_add ( self , idx ) :
""" send notification for adding videos to download queue """
progress = f " { idx + 1 } / { len ( self . missing_videos ) } "
mess_dict = {
" status " : " message:add " ,
" level " : " info " ,
" title " : " Adding new videos to download queue. " ,
" message " : " Progress: " + progress ,
}
if idx + 1 == len ( self . missing_videos ) :
RedisArchivist ( ) . set_message ( " message:add " , mess_dict , expire = 4 )
else :
RedisArchivist ( ) . set_message ( " message:add " , mess_dict )
return bulk_list , all_videos_added
if idx + 1 % 25 == 0 :
print ( " adding to queue progress: " + progress )
@staticmethod
def get_youtube_details ( youtube_id ) :
def get_youtube_details ( self , youtube_id ) :
""" get details from youtubedl for single pending video """
obs = {
" default_search " : " ytsearch " ,
@ -151,113 +239,29 @@ class PendingList:
# stop if video is streaming live now
if vid [ " is_live " ] :
return False
# parse response
seconds = vid [ " duration " ]
duration_str = DurationConverter . get_str ( seconds )
return self . _parse_youtube_details ( vid )
def _parse_youtube_details ( self , vid ) :
""" parse response """
vid_id = vid . get ( " id " )
duration_str = DurationConverter . get_str ( vid [ " duration " ] )
if duration_str == " NA " :
print ( f " skip extracting duration for: { youtube_id } " )
upload_date = vid [ " upload_date " ]
upload_dt = datetime . strptime ( upload_date , " % Y % m %d " )
published = upload_dt . strftime ( " % Y- % m- %d " )
print ( f " skip extracting duration for: { vid_id } " )
published = datetime . strptime ( vid [ " upload_date " ] , " % Y % m %d " ) . strftime (
" % Y- % m- %d "
)
# build dict
youtube_details = {
" youtube_id " : youtube _id,
" youtube_id " : vid _id,
" channel_name " : vid [ " channel " ] ,
" vid_thumb_url " : vid [ " thumbnail " ] ,
" title " : vid [ " title " ] ,
" channel_id " : vid [ " channel_id " ] ,
" channel_indexed " : vid [ " channel_id " ] in self . all_channels ,
" duration " : duration_str ,
" published " : published ,
" timestamp " : int ( datetime . now ( ) . strftime ( " %s " ) ) ,
}
return youtube_details
@staticmethod
def get_all_pending ( ) :
""" get a list of all pending videos in ta_download """
data = {
" query " : { " match_all " : { } } ,
" sort " : [ { " timestamp " : { " order " : " asc " } } ] ,
}
all_results = IndexPaginate ( " ta_download " , data ) . get_results ( )
all_pending = [ ]
all_ignore = [ ]
for result in all_results :
if result [ " status " ] == " pending " :
all_pending . append ( result )
elif result [ " status " ] == " ignore " :
all_ignore . append ( result )
return all_pending , all_ignore
@staticmethod
def get_all_indexed ( ) :
""" get a list of all videos indexed """
data = {
" query " : { " match_all " : { } } ,
" sort " : [ { " published " : { " order " : " desc " } } ] ,
}
all_indexed = IndexPaginate ( " ta_video " , data ) . get_results ( )
return all_indexed
def get_all_downloaded ( self ) :
""" get a list of all videos in archive """
channel_folders = os . listdir ( self . VIDEOS )
all_channel_folders = ignore_filelist ( channel_folders )
all_downloaded = [ ]
for channel_folder in all_channel_folders :
channel_path = os . path . join ( self . VIDEOS , channel_folder )
videos = os . listdir ( channel_path )
all_videos = ignore_filelist ( videos )
youtube_vids = [ i [ 9 : 20 ] for i in all_videos ]
for youtube_id in youtube_vids :
all_downloaded . append ( youtube_id )
return all_downloaded
def delete_from_pending ( self , youtube_id ) :
""" delete the youtube_id from ta_download """
url = f " { self . ES_URL } /ta_download/_doc/ { youtube_id } "
response = requests . delete ( url , auth = self . ES_AUTH )
if not response . ok :
print ( response . text )
def delete_pending ( self , status ) :
""" delete download queue based on status value """
data = { " query " : { " term " : { " status " : { " value " : status } } } }
payload = json . dumps ( data )
url = self . ES_URL + " /ta_download/_delete_by_query "
headers = { " Content-type " : " application/json " }
response = requests . post (
url , data = payload , headers = headers , auth = self . ES_AUTH
)
if not response . ok :
print ( response . text )
def ignore_from_pending ( self , ignore_list ) :
""" build the bulk query string """
stamp = int ( datetime . now ( ) . strftime ( " %s " ) )
bulk_list = [ ]
for youtube_id in ignore_list :
action = { " update " : { " _id " : youtube_id , " _index " : " ta_download " } }
source = { " doc " : { " status " : " ignore " , " timestamp " : stamp } }
bulk_list . append ( json . dumps ( action ) )
bulk_list . append ( json . dumps ( source ) )
# add last newline
bulk_list . append ( " \n " )
query_str = " \n " . join ( bulk_list )
headers = { " Content-type " : " application/x-ndjson " }
url = self . ES_URL + " /_bulk "
request = requests . post (
url , data = query_str , headers = headers , auth = self . ES_AUTH
)
if not request . ok :
print ( request )
raise ValueError ( " failed to set video to ignore " )