@ -6,7 +6,6 @@ functionality:
import json
import os
import re
from datetime import datetime
import requests
@ -24,7 +23,7 @@ class YoutubeSubtitle:
self . video = video
self . languages = False
def sub_conf_parse( self ) :
def _ sub_conf_parse( self ) :
""" add additional conf values to self """
languages_raw = self . video . config [ " downloads " ] [ " subtitle " ]
if languages_raw :
@ -32,26 +31,26 @@ class YoutubeSubtitle:
def get_subtitles ( self ) :
""" check what to do """
self . sub_conf_parse( )
self . _ sub_conf_parse( )
if not self . languages :
# no subtitles
return False
relevant_subtitles = [ ]
for lang in self . languages :
user_sub = self . get_user_subtitles( lang )
user_sub = self . _ get_user_subtitles( lang )
if user_sub :
relevant_subtitles . append ( user_sub )
continue
if self . video . config [ " downloads " ] [ " subtitle_source " ] == " auto " :
auto_cap = self . get_auto_caption( lang )
auto_cap = self . _ get_auto_caption( lang )
if auto_cap :
relevant_subtitles . append ( auto_cap )
return relevant_subtitles
def get_auto_caption( self , lang ) :
def _ get_auto_caption( self , lang ) :
""" get auto_caption subtitles """
print ( f " { self . video . youtube_id } - { lang } : get auto generated subtitles " )
all_subtitles = self . video . youtube_meta . get ( " automatic_captions " )
@ -65,7 +64,7 @@ class YoutubeSubtitle:
if not all_formats :
return False
subtitle = [ i for i in all_formats if i [ " ext " ] == " vtt " ] [ 0 ]
subtitle = [ i for i in all_formats if i [ " ext " ] == " json3 " ] [ 0 ]
subtitle . update (
{ " lang " : lang , " source " : " auto " , " media_url " : media_url }
)
@ -88,7 +87,7 @@ class YoutubeSubtitle:
return all_subtitles
def get_user_subtitles( self , lang ) :
def _ get_user_subtitles( self , lang ) :
""" get subtitles uploaded from channel owner """
print ( f " { self . video . youtube_id } - { lang } : get user uploaded subtitles " )
all_subtitles = self . _normalize_lang ( )
@ -102,7 +101,7 @@ class YoutubeSubtitle:
# no user subtitles found
return False
subtitle = [ i for i in all_formats if i [ " ext " ] == " vtt " ] [ 0 ]
subtitle = [ i for i in all_formats if i [ " ext " ] == " json3 " ] [ 0 ]
subtitle . update (
{ " lang " : lang , " source " : " user " , " media_url " : media_url }
)
@ -115,12 +114,13 @@ class YoutubeSubtitle:
for subtitle in relevant_subtitles :
dest_path = os . path . join ( videos_base , subtitle [ " media_url " ] )
source = subtitle [ " source " ]
lang = subtitle . get ( " lang " )
response = requests . get ( subtitle [ " url " ] )
if not response . ok :
print ( f " { self . video . youtube_id } : failed to download subtitle " )
continue
parser = SubtitleParser ( response . text , subtitle. get ( " lang " ) )
parser = SubtitleParser ( response . text , lang, source )
parser . process ( )
subtitle_str = parser . get_subtitle_str ( )
self . _write_subtitle_file ( dest_path , subtitle_str )
@ -145,109 +145,94 @@ class YoutubeSubtitle:
class SubtitleParser :
""" parse subtitle str from youtube """
time_reg = r " ^([0-9] {2} :?) {3} \ .[0-9] {3} --> ([0-9] {2} :?) {3} \ .[0-9] {3} "
stamp_reg = r " <([0-9] {2} :?) {3} \ .[0-9] {3} > "
tag_reg = r " </?c> "
def __init__ ( self , subtitle_str , lang ) :
self . subtitle_str = subtitle_str
def __init__ ( self , subtitle_str , lang , source ) :
self . subtitle_raw = json . loads ( subtitle_str )
self . lang = lang
self . header = False
self . parsed_cue_list = False
self . all_text_lines = False
self . matched = False
self . source = source
self . all_cues = False
def process ( self ) :
""" collection to process subtitle string """
self . _parse_cues ( )
self . _match_text_lines ( )
self . _add_id ( )
self . _timestamp_check ( )
def _parse_cues ( self ) :
""" split into cues """
all_cues = self . subtitle_str . replace ( " \n \n " , " \n " ) . split ( " \n \n " )
self . header = all_cues [ 0 ]
self . all_text_lines = [ ]
self . parsed_cue_list = [ self . _cue_cleaner ( i ) for i in all_cues [ 1 : ] ]
def _cue_cleaner ( self , cue ) :
""" parse single cue """
all_lines = cue . split ( " \n " )
cue_dict = { " lines " : [ ] }
for line in all_lines :
if re . match ( self . time_reg , line ) :
clean = re . search ( self . time_reg , line ) . group ( )
start , end = clean . split ( " --> " )
cue_dict . update ( { " start " : start , " end " : end } )
else :
clean = re . sub ( self . stamp_reg , " " , line )
clean = re . sub ( self . tag_reg , " " , clean )
cue_dict [ " lines " ] . append ( clean )
if clean . strip ( ) and clean not in self . all_text_lines [ - 4 : ] :
# remove immediate duplicates
self . all_text_lines . append ( clean )
return cue_dict
def _match_text_lines ( self ) :
""" match unique text lines with timestamps """
self . matched = [ ]
while self . all_text_lines :
check = self . all_text_lines [ 0 ]
matches = [ i for i in self . parsed_cue_list if check in i [ " lines " ] ]
new_cue = matches [ - 1 ]
new_cue [ " start " ] = matches [ 0 ] [ " start " ]
for line in new_cue [ " lines " ] :
try :
self . all_text_lines . remove ( line )
except ValueError :
continue
""" extract relevant que data """
all_events = self . subtitle_raw . get ( " events " )
if self . source == " auto " :
all_events = self . _flat_auto_caption ( all_events )
self . all_cues = [ ]
for idx , event in enumerate ( all_events ) :
cue = {
" start " : self . _ms_conv ( event [ " tStartMs " ] ) ,
" end " : self . _ms_conv ( event [ " tStartMs " ] + event [ " dDurationMs " ] ) ,
" text " : " " . join ( [ i . get ( " utf8 " ) for i in event [ " segs " ] ] ) ,
" idx " : idx + 1 ,
}
self . all_cues . append ( cue )
self . matched . append ( new_cue )
def _timestamp_check ( self ) :
""" check if end timestamp is bigger than start timestamp """
for idx , cue in enumerate ( self . matched ) :
# this
end = int ( re . sub ( " [^0-9] " , " " , cue . get ( " end " ) ) )
# next
try :
next_cue = self . matched [ idx + 1 ]
except IndexError :
@staticmethod
def _flat_auto_caption ( all_events ) :
""" flatten autocaption segments """
flatten = [ ]
for event in all_events :
if " segs " not in event . keys ( ) :
continue
text = " " . join ( [ i . get ( " utf8 " ) for i in event . get ( " segs " ) ] )
if not text . strip ( ) :
continue
if flatten :
# fix overlapping retiming issue
last_end = flatten [ - 1 ] [ " tStartMs " ] + flatten [ - 1 ] [ " dDurationMs " ]
if event [ " tStartMs " ] < last_end :
joined = flatten [ - 1 ] [ " segs " ] [ 0 ] [ " utf8 " ] + " \n " + text
flatten [ - 1 ] [ " segs " ] [ 0 ] [ " utf8 " ] = joined
continue
start_next = int ( re . sub ( " [^0-9] " , " " , next_cue . get ( " start " ) ) )
if end > start_next :
self . matched [ idx ] [ " end " ] = next_cue . get ( " start " )
event . update ( { " segs " : [ { " utf8 " : text } ] } )
flatten . append ( event )
def _add_id ( self ) :
""" add id to matched cues """
for idx , _ in enumerate ( self . matched ) :
self . matched [ idx ] [ " id " ] = idx + 1
return flatten
@staticmethod
def _ms_conv ( ms ) :
""" convert ms to timestamp """
hours = str ( ( ms / / ( 1000 * 60 * 60 ) ) % 24 ) . zfill ( 2 )
minutes = str ( ( ms / / ( 1000 * 60 ) ) % 60 ) . zfill ( 2 )
secs = str ( ( ms / / 1000 ) % 60 ) . zfill ( 2 )
millis = str ( ms % 1000 ) . zfill ( 3 )
return f " { hours } : { minutes } : { secs } . { millis } "
def get_subtitle_str ( self ) :
""" stitch cues and return processed new string """
new_subtitle_str = self . header + " \n \n "
""" create vtt text str from cues """
subtitle_str = f " WEBVTT \n Kind: captions \n Language: { self . lang } "
for cue in self . matched :
timestamp = f " { cue . get ( ' start ' ) } --> { cue . get ( ' end ' ) } "
lines = " \n " . join ( cue . get ( " lines " ) )
cue_text = f " { cue . get ( ' id ' ) } \n { timestamp } \n { lines } \n \n "
new_subtitle_str = new_subtitle_str + cue_text
for cue in self . all_cues :
stamp = f " { cue . get ( ' start ' ) } --> { cue . get ( ' end ' ) } "
cue_text = f " \n \n { cue . get ( ' idx ' ) } \n { stamp } \n { cue . get ( ' text ' ) } "
subtitle_str = subtitle_str + cue_text
return new_ subtitle_str
return subtitle_str
def create_bulk_import ( self , video , source ) :
""" process matched for es import """
""" subtitle lines for es import """
documents = self . _create_documents ( video , source )
bulk_list = [ ]
channel = video . json_data . get ( " channel " )
document = {
for document in documents :
document_id = document . get ( " subtitle_fragment_id " )
action = { " index " : { " _index " : " ta_subtitle " , " _id " : document_id } }
bulk_list . append ( json . dumps ( action ) )
bulk_list . append ( json . dumps ( document ) )
bulk_list . append ( " \n " )
query_str = " \n " . join ( bulk_list )
return query_str
def _create_documents ( self , video , source ) :
""" process documents """
documents = self . _chunk_list ( video . youtube_id )
channel = video . json_data . get ( " channel " )
meta_dict = {
" youtube_id " : video . youtube_id ,
" title " : video . json_data . get ( " title " ) ,
" subtitle_channel " : channel . get ( " channel_name " ) ,
@ -257,26 +242,35 @@ class SubtitleParser:
" subtitle_source " : source ,
}
for match in self . matched :
match_id = match . get ( " id " )
document_id = f " { video . youtube_id } - { self . lang } - { match_id } "
action = { " index " : { " _index " : " ta_subtitle " , " _id " : document_id } }
document . update (
{
" subtitle_fragment_id " : document_id ,
" subtitle_start " : match . get ( " start " ) ,
" subtitle_end " : match . get ( " end " ) ,
" subtitle_index " : match_id ,
" subtitle_line " : " " . join ( match . get ( " lines " ) ) ,
_ = [ i . update ( meta_dict ) for i in documents ]
return documents
def _chunk_list ( self , youtube_id ) :
""" join cues for bulk import """
chunk_list = [ ]
chunk = { }
for cue in self . all_cues :
if chunk :
text = f " { chunk . get ( ' subtitle_line ' ) } { cue . get ( ' text ' ) } \n "
chunk [ " subtitle_line " ] = text
else :
idx = len ( chunk_list ) + 1
chunk = {
" subtitle_index " : idx ,
" subtitle_line " : cue . get ( " text " ) ,
" subtitle_start " : cue . get ( " start " ) ,
}
)
bulk_list . append ( json . dumps ( action ) )
bulk_list . append ( json . dumps ( document ) )
bulk_list . append ( " \n " )
query_str = " \n " . join ( bulk_list )
chunk [ " subtitle_fragment_id " ] = f " { youtube_id } - { self . lang } - { idx } "
return query_str
if cue [ " idx " ] % 5 == 0 :
chunk [ " subtitle_end " ] = cue . get ( " end " )
chunk_list . append ( chunk )
chunk = { }
return chunk_list
class YoutubeVideo ( YouTubeItem , YoutubeSubtitle ) :