@ -6,158 +6,15 @@ functionality:
import json
import os
import re
from datetime import datetime
import requests
from bs4 import BeautifulSoup
from home . src . download import queue # partial import
from home . src . download . thumbnails import ThumbManager
from home . src . download . yt_dlp_base import YtWrap
from home . src . es . connect import ElasticWrap , IndexPaginate
from home . src . index . generic import YouTubeItem
from home . src . index . playlist import YoutubePlaylist
from home . src . ta . helper import clean_string , requests_headers
class ChannelScraper :
""" custom scraper using bs4 to scrape channel about page
will be able to be integrated into yt - dlp
once #2237 and #2350 are merged upstream
"""
def __init__ ( self , channel_id ) :
self . channel_id = channel_id
self . soup = False
self . yt_json = False
self . json_data = False
def get_json ( self ) :
""" main method to return channel dict """
self . get_soup ( )
self . _extract_yt_json ( )
if self . _is_deactivated ( ) :
return False
self . _parse_channel_main ( )
self . _parse_channel_meta ( )
return self . json_data
def get_soup ( self ) :
""" return soup from youtube """
print ( f " { self . channel_id } : scrape channel data from youtube " )
url = f " https://www.youtube.com/channel/ { self . channel_id } /about?hl=en "
cookies = { " CONSENT " : " YES+xxxxxxxxxxxxxxxxxxxxxxxxxxx " }
response = requests . get (
url , cookies = cookies , headers = requests_headers ( ) , timeout = 10
)
if response . ok :
channel_page = response . text
else :
print ( f " { self . channel_id } : failed to extract channel info " )
raise ConnectionError
self . soup = BeautifulSoup ( channel_page , " html.parser " )
def _extract_yt_json ( self ) :
""" parse soup and get ytInitialData json """
all_scripts = self . soup . find ( " body " ) . find_all ( " script " )
for script in all_scripts :
if " var ytInitialData = " in str ( script ) :
script_content = str ( script )
break
# extract payload
script_content = script_content . split ( " var ytInitialData = " ) [ 1 ]
json_raw = script_content . rstrip ( " ;</script> " )
self . yt_json = json . loads ( json_raw )
def _is_deactivated ( self ) :
""" check if channel is deactivated """
alerts = self . yt_json . get ( " alerts " )
if not alerts :
return False
for alert in alerts :
alert_text = alert [ " alertRenderer " ] [ " text " ] [ " simpleText " ]
print ( f " { self . channel_id } : failed to extract, { alert_text } " )
return True
def _parse_channel_main ( self ) :
""" extract maintab values from scraped channel json data """
main_tab = self . yt_json [ " header " ] [ " c4TabbedHeaderRenderer " ]
# build and return dict
self . json_data = {
" channel_active " : True ,
" channel_last_refresh " : int ( datetime . now ( ) . timestamp ( ) ) ,
" channel_subs " : self . _get_channel_subs ( main_tab ) ,
" channel_name " : main_tab [ " title " ] ,
" channel_banner_url " : self . _get_thumbnails ( main_tab , " banner " ) ,
" channel_tvart_url " : self . _get_thumbnails ( main_tab , " tvBanner " ) ,
" channel_id " : self . channel_id ,
" channel_subscribed " : False ,
}
@staticmethod
def _get_thumbnails ( main_tab , thumb_name ) :
""" extract banner url from main_tab """
try :
all_banners = main_tab [ thumb_name ] [ " thumbnails " ]
banner = sorted ( all_banners , key = lambda k : k [ " width " ] ) [ - 1 ] [ " url " ]
except KeyError :
banner = False
return banner
@staticmethod
def _get_channel_subs ( main_tab ) :
""" process main_tab to get channel subs as int """
try :
sub_text_simple = main_tab [ " subscriberCountText " ] [ " simpleText " ]
sub_text = sub_text_simple . split ( " " ) [ 0 ]
if sub_text [ - 1 ] == " K " :
channel_subs = int ( float ( sub_text . replace ( " K " , " " ) ) * 1000 )
elif sub_text [ - 1 ] == " M " :
channel_subs = int ( float ( sub_text . replace ( " M " , " " ) ) * 1000000 )
elif int ( sub_text ) > = 0 :
channel_subs = int ( sub_text )
else :
message = f " { sub_text } not dealt with "
print ( message )
except KeyError :
channel_subs = 0
return channel_subs
def _parse_channel_meta ( self ) :
""" extract meta tab values from channel payload """
# meta tab
meta_tab = self . yt_json [ " metadata " ] [ " channelMetadataRenderer " ]
all_thumbs = meta_tab [ " avatar " ] [ " thumbnails " ]
thumb_url = sorted ( all_thumbs , key = lambda k : k [ " width " ] ) [ - 1 ] [ " url " ]
# stats tab
renderer = " twoColumnBrowseResultsRenderer "
all_tabs = self . yt_json [ " contents " ] [ renderer ] [ " tabs " ]
for tab in all_tabs :
if " tabRenderer " in tab . keys ( ) :
if tab [ " tabRenderer " ] [ " title " ] == " About " :
about_tab = tab [ " tabRenderer " ] [ " content " ] [
" sectionListRenderer "
] [ " contents " ] [ 0 ] [ " itemSectionRenderer " ] [ " contents " ] [ 0 ] [
" channelAboutFullMetadataRenderer "
]
break
try :
channel_views_text = about_tab [ " viewCountText " ] [ " simpleText " ]
channel_views = int ( re . sub ( r " \ D " , " " , channel_views_text ) )
except KeyError :
channel_views = 0
self . json_data . update (
{
" channel_description " : meta_tab [ " description " ] ,
" channel_thumb_url " : thumb_url ,
" channel_views " : channel_views ,
}
)
from home . src . ta . helper import clean_string
class YoutubeChannel ( YouTubeItem ) :
@ -166,36 +23,94 @@ class YoutubeChannel(YouTubeItem):
es_path = False
index_name = " ta_channel "
yt_base = " https://www.youtube.com/channel/ "
yt_obs = {
" extract_flat " : True ,
" allow_playlist_files " : True ,
}
def __init__ ( self , youtube_id , task = False ) :
super ( ) . __init__ ( youtube_id )
self . es_path = f " { self . index_name } /_doc/ { youtube_id } "
self . all_playlists = False
self . task = task
def build_yt_url ( self ) :
""" overwrite base to use channel about page """
return f " { self . yt_base } { self . youtube_id } /about "
def build_json ( self , upload = False , fallback = False ) :
""" get from es or from youtube """
self . get_from_es ( )
if self . json_data :
return
self . get_from_youtube ( fallback )
self . get_from_youtube ( )
if not self . youtube_meta and fallback :
self . _video_fallback ( fallback )
else :
self . _process_youtube_meta ( )
self . get_channel_art ( )
if upload :
self . upload_to_es ( )
return
def get_from_youtube ( self , fallback = False ) :
""" use bs4 to scrape channel about page """
self . json_data = ChannelScraper ( self . youtube_id ) . get_json ( )
def _process_youtube_meta ( self ) :
""" extract relevant fields """
self . youtube_meta [ " thumbnails " ] . reverse ( )
channel_subs = self . youtube_meta . get ( " channel_follower_count " ) or 0
self . json_data = {
" channel_active " : True ,
" channel_description " : self . youtube_meta . get ( " description " , False ) ,
" channel_id " : self . youtube_id ,
" channel_last_refresh " : int ( datetime . now ( ) . timestamp ( ) ) ,
" channel_name " : self . youtube_meta [ " uploader " ] ,
" channel_subs " : channel_subs ,
" channel_subscribed " : False ,
" channel_tags " : self . _parse_tags ( self . youtube_meta . get ( " tags " ) ) ,
" channel_banner_url " : self . _get_banner_art ( ) ,
" channel_thumb_url " : self . _get_thumb_art ( ) ,
" channel_tvart_url " : self . _get_tv_art ( ) ,
" channel_views " : self . youtube_meta . get ( " view_count " , 0 ) ,
}
if not self . json_data and fallback :
self . _video_fallback ( fallback )
def _parse_tags ( self , tags ) :
""" parse channel tags """
if not tags :
return False
if not self . json_data :
return
joined = " " . join ( tags )
return [ i . strip ( ) for i in joined . split ( ' " ' ) if i and not i == " " ]
def _get_thumb_art ( self ) :
""" extract thumb art """
for i in self . youtube_meta [ " thumbnails " ] :
if not i . get ( " width " ) :
continue
if i . get ( " width " ) == i . get ( " height " ) :
return i [ " url " ]
return False
def _get_tv_art ( self ) :
""" extract tv artwork """
for i in self . youtube_meta [ " thumbnails " ] :
if i . get ( " id " ) == " avatar_uncropped " :
return i [ " url " ]
if not i . get ( " width " ) :
continue
if i [ " width " ] / / i [ " height " ] < 2 and not i [ " width " ] == i [ " height " ] :
return i [ " url " ]
return False
def _get_banner_art ( self ) :
""" extract banner artwork """
for i in self . youtube_meta [ " thumbnails " ] :
if not i . get ( " width " ) :
continue
if i [ " width " ] / / i [ " height " ] > 5 :
return i [ " url " ]
self . get_channel_art ( )
return False
def _video_fallback ( self , fallback ) :
""" use video metadata as fallback """
@ -209,6 +124,7 @@ class YoutubeChannel(YouTubeItem):
" channel_tvart_url " : False ,
" channel_id " : self . youtube_id ,
" channel_subscribed " : False ,
" channel_tags " : False ,
" channel_description " : False ,
" channel_thumb_url " : False ,
" channel_views " : 0 ,